Skip to content

Commit 7a46336

Browse files
committed
map-reduce using scala and hadoop
0 parents  commit 7a46336

File tree

9 files changed

+215
-0
lines changed

9 files changed

+215
-0
lines changed

.github/workflows/blank.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# This is a basic workflow to help you get started with Actions
2+
3+
name: Practice-Scala-CI
4+
5+
# Controls when the action will run.
6+
on:
7+
# Triggers the workflow on push or pull request events but only for the master branch
8+
push:
9+
branches: [ master ]
10+
pull_request:
11+
branches: [ master ]
12+
13+
# Allows you to run this workflow manually from the Actions tab
14+
workflow_dispatch:
15+
16+
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
17+
jobs:
18+
# This workflow contains a single job called "build"
19+
build:
20+
# The type of runner that the job will run on
21+
runs-on: ubuntu-latest
22+
23+
# Steps represent a sequence of tasks that will be executed as part of the job
24+
steps:
25+
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
26+
- uses: actions/checkout@v2
27+
28+
# Runs a single command using the runners shell
29+
- name: Run assembly
30+
run: sbt assembly

.gitignore

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
*~
2+
*#
3+
src_managed
4+
activemq-data
5+
project/plugins/project
6+
project/boot/*
7+
*/project/build/target
8+
*/project/boot
9+
lib_managed
10+
etags
11+
tags
12+
TAGS
13+
reports
14+
dist
15+
bin
16+
target
17+
deploy/*.jar
18+
data
19+
out
20+
logs
21+
.#*
22+
.codefellow
23+
storage
24+
.ensime
25+
_dump
26+
.manager
27+
manifest.mf
28+
semantic.cache
29+
tm*.log
30+
tm*.lck
31+
tm.out
32+
*.tm.epoch
33+
.DS_Store
34+
*.iws
35+
*.ipr
36+
*.iml
37+
run-codefellow
38+
.project
39+
.settings
40+
.classpath
41+
.idea
42+
.scala_dependencies
43+
.target
44+
.cache
45+
multiverse.log
46+
.eprj
47+
/lib
48+
.bsp

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
## MapReduce using Scala
2+
3+
Run this command to generate a fat-jar `WordCountUsingHadoop-1.0.jar` :
4+
```sbt assembly```
5+
6+
Demo run:
7+
8+
```Demo-WordCountOnHadoop:]$ echo "Hello world hello hello" > input.txt
9+
Demo-WordCoundOnHadoop:]$ cat input.txt
10+
Hello world hello hello
11+
Demo-WordCountOnHadoop:]$ docker exec -it hadoop-namenode bash
12+
(docker) [root@hadoop-namenode /]# hadoop fs -ls /user/root/
13+
(docker) [root@hadoop-namenode /]# hadoop fs -copyFromLocal input.txt /user/root/input
14+
(docker) [root@hadoop-namenode /]# hadoop jar WordCountUsingHadoop-1.0.jar /user/root/input output
15+
(docker) [root@hadoop-namenode /]# hadoop fs -ls /user/root/
16+
Found 2 items
17+
-rw-r--r-- 2 root supergroup 24 2021-05-24 05:53 /user/root/input
18+
drwxr-xr-x - root supergroup 0 2021-05-24 05:57 /user/root/output
19+
(docker) [root@hadoop-namenode /]# hadoop fs -ls /user/root/output
20+
Found 2 items
21+
-rw-r--r-- 2 root supergroup 0 2021-05-24 05:57 /user/root/output/_SUCCESS
22+
-rw-r--r-- 2 root supergroup 24 2021-05-24 05:57 /user/root/output/part-r-00000
23+
(docker) [root@hadoop-namenode /]# hadoop fs -cat /user/root/output/part-r-00000
24+
Hello 1
25+
hello 2
26+
world 1
27+
```

build.sbt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
ThisBuild / scalaVersion := "2.13.3"
2+
3+
lazy val root = (project in file(".")).
4+
settings(
5+
name := "MapReduce Using Scala and Hadoop",
6+
version := "1.0",
7+
mainClass in Compile := Some("com.mapreduce.hadoop.WordCountUsingHadoop"),
8+
mainClass in assembly := Some("com.mapreduce.hadoop.WordCountUsingHadoop")
9+
)
10+
11+
libraryDependencies ++= Seq(
12+
"org.apache.hadoop" % "hadoop-core" % "1.2.1",
13+
"org.scalatest" %% "scalatest" % "3.0.8" % "test"
14+
)
15+
16+
assemblyJarName in assembly := "WordCountUsingHadoop-1.0.jar"
17+
18+
// META-INF discarding
19+
assemblyMergeStrategy in assembly := {
20+
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
21+
case x => MergeStrategy.first
22+
}

project/assembly.sbt

Whitespace-only changes.

project/build.properties

Whitespace-only changes.

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.mapreduce.hadoop
2+
3+
import java.lang
4+
5+
import org.apache.hadoop.conf.Configuration
6+
import org.apache.hadoop.fs.Path
7+
import org.apache.hadoop.io.{IntWritable, Text}
8+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
9+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
10+
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
11+
/**
12+
* Created by Saurav Sahu : 23 May 2021
13+
*/
14+
object WordCountUsingHadoop {
15+
16+
val wordDelimiter = " "
17+
18+
def getWordsInLine(value : Text) = value.toString.split(wordDelimiter)
19+
20+
type Mapper_T = Mapper[Object /* InputData */, Text /* Line */, Text /* Word */, IntWritable /* OccurrenceCount */]
21+
22+
class LineTokenizerMapper extends Mapper_T {
23+
val word = new Text
24+
val singleOccurrence = new IntWritable(1)
25+
override def map(key: Object, value: Text, context: Mapper_T#Context): Unit = {
26+
getWordsInLine(value).foreach(token =>{
27+
word.set(token)
28+
context.write(word, singleOccurrence)
29+
})
30+
}
31+
}
32+
33+
def calculateSum(values: lang.Iterable[IntWritable]) = {
34+
var iterator = values.iterator
35+
var total = 0
36+
while (iterator.hasNext) total += iterator.next.get()
37+
total
38+
}
39+
40+
type Reducer_T = Reducer[Text /* Word */, IntWritable /* Occurrence */, Text /* Word */, IntWritable /* Occurrences */]
41+
42+
class OccurrencesReducer extends Reducer_T{
43+
val totalOccurrences = new IntWritable(1)
44+
override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer_T#Context): Unit = {
45+
totalOccurrences.set(calculateSum(values))
46+
context.write(key, totalOccurrences)
47+
}
48+
}
49+
50+
def main(args: Array[String]): Unit ={
51+
val mapReduceHadoopJob = Job.getInstance(new Configuration, "Word Count Using Hadoop")
52+
mapReduceHadoopJob.setJarByClass(this.getClass)
53+
mapReduceHadoopJob.setMapperClass(classOf[LineTokenizerMapper])
54+
mapReduceHadoopJob.setReducerClass(classOf[OccurrencesReducer])
55+
mapReduceHadoopJob.setOutputKeyClass(classOf[Text])
56+
mapReduceHadoopJob.setOutputValueClass(classOf[IntWritable])
57+
FileInputFormat.addInputPath(mapReduceHadoopJob, new Path(args(0)))
58+
FileOutputFormat.setOutputPath(mapReduceHadoopJob, new Path(args(1)))
59+
System.exit(mapReduceHadoopJob.waitForCompletion(true).compare(true))
60+
}
61+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.mapreduce.hadoop
2+
3+
import java.util
4+
5+
import org.apache.hadoop.io.{IntWritable, Text}
6+
import org.scalatest.FunSuite
7+
8+
/**
9+
* Created by Saurav 24 May 2021
10+
*/
11+
12+
class WordCountUsingHadoopTest extends FunSuite {
13+
test("calculate sum of iterables") {
14+
val iterable = new util.ArrayList[IntWritable](util.Arrays.asList(
15+
new IntWritable(1),
16+
new IntWritable(2),
17+
new IntWritable(1)))
18+
19+
assert(WordCountUsingHadoop.calculateSum(iterable) == 4)
20+
}
21+
22+
test("test getWordsInLine") {
23+
val line = "Hello World, how are you"
24+
assert(WordCountUsingHadoop.getWordsInLine(new Text(line)).sameElements(Array("Hello", "World,", "how", "are", "you")))
25+
}
26+
}

0 commit comments

Comments
 (0)