@@ -5,99 +5,108 @@ import com.eed3si9n.expecty.Expecty.expect
55class HadoopTests extends munit.FunSuite {
66 protected lazy val extraOptions : Seq [String ] = TestUtil .extraOptions
77
8- test(" simple map-reduce" ) {
9- TestUtil .retryOnCi() {
10- val inputs = TestInputs (
11- os.rel / " WordCount.java" ->
12- """ //> using dep org.apache.hadoop:hadoop-client-api:3.3.3
13- |
14- |// from https://hadoop.apache.org/docs/r3.3.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
15- |
16- |package foo;
17- |
18- |import java.io.IOException;
19- |import java.util.StringTokenizer;
20- |
21- |import org.apache.hadoop.conf.Configuration;
22- |import org.apache.hadoop.fs.Path;
23- |import org.apache.hadoop.io.IntWritable;
24- |import org.apache.hadoop.io.Text;
25- |import org.apache.hadoop.mapreduce.Job;
26- |import org.apache.hadoop.mapreduce.Mapper;
27- |import org.apache.hadoop.mapreduce.Reducer;
28- |import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
29- |import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
30- |
31- |public class WordCount {
32- |
33- | public static class TokenizerMapper
34- | extends Mapper<Object, Text, Text, IntWritable>{
35- |
36- | private final static IntWritable one = new IntWritable(1);
37- | private Text word = new Text();
38- |
39- | public void map(Object key, Text value, Context context
40- | ) throws IOException, InterruptedException {
41- | StringTokenizer itr = new StringTokenizer(value.toString());
42- | while (itr.hasMoreTokens()) {
43- | word.set(itr.nextToken());
44- | context.write(word, one);
45- | }
46- | }
47- | }
48- |
49- | public static class IntSumReducer
50- | extends Reducer<Text,IntWritable,Text,IntWritable> {
51- | private IntWritable result = new IntWritable();
52- |
53- | public void reduce(Text key, Iterable<IntWritable> values,
54- | Context context
55- | ) throws IOException, InterruptedException {
56- | int sum = 0;
57- | for (IntWritable val : values) {
58- | sum += val.get();
59- | }
60- | result.set(sum);
61- | context.write(key, result);
62- | }
63- | }
64- |
65- | public static void main(String[] args) throws Exception {
66- | Configuration conf = new Configuration();
67- | Job job = Job.getInstance(conf, "word count");
68- | job.setJarByClass(WordCount.class);
69- | job.setMapperClass(TokenizerMapper.class);
70- | job.setCombinerClass(IntSumReducer.class);
71- | job.setReducerClass(IntSumReducer.class);
72- | job.setOutputKeyClass(Text.class);
73- | job.setOutputValueClass(IntWritable.class);
74- | FileInputFormat.addInputPath(job, new Path(args[0]));
75- | FileOutputFormat.setOutputPath(job, new Path(args[1]));
76- | System.exit(job.waitForCompletion(true) ? 0 : 1);
77- | }
78- |}
79- |""" .stripMargin
80- )
81- inputs.fromRoot { root =>
82- val res = os.proc(
83- TestUtil .cli,
84- " --power" ,
85- " run" ,
86- TestUtil .extraOptions,
87- " ." ,
88- " --hadoop" ,
89- " --command" ,
90- " --scratch-dir" ,
91- " tmp" ,
92- " --" ,
93- " foo"
8+ for {
9+ withTestScope <- Seq (true , false )
10+ scopeDescription = if (withTestScope) " test scope" else " main scope"
11+ inputPath =
12+ if (withTestScope) os.rel / " test" / " WordCount.java" else os.rel / " main" / " WordCount.java"
13+ directiveKey = if (withTestScope) " test.dep" else " dep"
14+ scopeOptions = if (withTestScope) Seq (" --test" ) else Nil
15+ }
16+ test(s " simple map-reduce ( $scopeDescription) " ) {
17+ TestUtil .retryOnCi() {
18+ val inputs = TestInputs (
19+ inputPath ->
20+ s """ //> using $directiveKey org.apache.hadoop:hadoop-client-api:3.3.3
21+ |
22+ |// from https://hadoop.apache.org/docs/r3.3.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
23+ |
24+ |package foo;
25+ |
26+ |import java.io.IOException;
27+ |import java.util.StringTokenizer;
28+ |
29+ |import org.apache.hadoop.conf.Configuration;
30+ |import org.apache.hadoop.fs.Path;
31+ |import org.apache.hadoop.io.IntWritable;
32+ |import org.apache.hadoop.io.Text;
33+ |import org.apache.hadoop.mapreduce.Job;
34+ |import org.apache.hadoop.mapreduce.Mapper;
35+ |import org.apache.hadoop.mapreduce.Reducer;
36+ |import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
37+ |import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
38+ |
39+ |public class WordCount {
40+ |
41+ | public static class TokenizerMapper
42+ | extends Mapper<Object, Text, Text, IntWritable>{
43+ |
44+ | private final static IntWritable one = new IntWritable(1);
45+ | private Text word = new Text();
46+ |
47+ | public void map(Object key, Text value, Context context
48+ | ) throws IOException, InterruptedException {
49+ | StringTokenizer itr = new StringTokenizer(value.toString());
50+ | while (itr.hasMoreTokens()) {
51+ | word.set(itr.nextToken());
52+ | context.write(word, one);
53+ | }
54+ | }
55+ | }
56+ |
57+ | public static class IntSumReducer
58+ | extends Reducer<Text,IntWritable,Text,IntWritable> {
59+ | private IntWritable result = new IntWritable();
60+ |
61+ | public void reduce(Text key, Iterable<IntWritable> values,
62+ | Context context
63+ | ) throws IOException, InterruptedException {
64+ | int sum = 0;
65+ | for (IntWritable val : values) {
66+ | sum += val.get();
67+ | }
68+ | result.set(sum);
69+ | context.write(key, result);
70+ | }
71+ | }
72+ |
73+ | public static void main(String[] args) throws Exception {
74+ | Configuration conf = new Configuration();
75+ | Job job = Job.getInstance(conf, "word count");
76+ | job.setJarByClass(WordCount.class);
77+ | job.setMapperClass(TokenizerMapper.class);
78+ | job.setCombinerClass(IntSumReducer.class);
79+ | job.setReducerClass(IntSumReducer.class);
80+ | job.setOutputKeyClass(Text.class);
81+ | job.setOutputValueClass(IntWritable.class);
82+ | FileInputFormat.addInputPath(job, new Path(args[0]));
83+ | FileOutputFormat.setOutputPath(job, new Path(args[1]));
84+ | System.exit(job.waitForCompletion(true) ? 0 : 1);
85+ | }
86+ |}
87+ | """ .stripMargin
9488 )
95- .call(cwd = root)
96- val command = res.out.lines()
97- pprint.err.log(command)
98- expect(command.take(2 ) == Seq (" hadoop" , " jar" ))
99- expect(command.takeRight(2 ) == Seq (" foo.WordCount" , " foo" ))
89+ inputs.fromRoot { root =>
90+ val res = os.proc(
91+ TestUtil .cli,
92+ " --power" ,
93+ " run" ,
94+ TestUtil .extraOptions,
95+ " ." ,
96+ " --hadoop" ,
97+ " --command" ,
98+ " --scratch-dir" ,
99+ " tmp" ,
100+ scopeOptions,
101+ " --" ,
102+ " foo"
103+ )
104+ .call(cwd = root)
105+ val command = res.out.lines()
106+ pprint.err.log(command)
107+ expect(command.take(2 ) == Seq (" hadoop" , " jar" ))
108+ expect(command.takeRight(2 ) == Seq (" foo.WordCount" , " foo" ))
109+ }
100110 }
101111 }
102- }
103112}
0 commit comments