Skip to content

Commit 25da1ca

Browse files
committed
Add generator for nulls, bools, bytes and maps
1 parent 65360d5 commit 25da1ca

File tree

1 file changed

+46
-6
lines changed

1 file changed

+46
-6
lines changed

core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ import org.apache.hadoop.io._
55
import scala.Array
66
import java.io.{DataOutput, DataInput}
77

8+
/**
9+
* A class to test MsgPack serialization on the Scala side, that will be deserialized
10+
* in Python
11+
* @param str
12+
* @param int
13+
* @param double
14+
*/
815
case class TestWritable(var str: String, var int: Int, var double: Double) extends Writable {
916
def this() = this("", 0, 0.0)
1017

@@ -21,23 +28,38 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
2128
}
2229
}
2330

31+
/**
32+
* Main method to generate SequenceFile test data and write to the python 'test_support' directory.
33+
* Be sure to set the SPARK_HOME environment variable appropriately
34+
*/
2435
object WriteInputFormatTestDataGenerator extends App {
2536
import SparkContext._
2637

2738
val sc = new SparkContext("local[2]", "test")
2839

2940
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
30-
val textPath = s"$sparkHome/python/test_support/data/sftext/"
31-
val intPath = s"$sparkHome/python/test_support/data/sfint/"
32-
val doublePath = s"$sparkHome/python/test_support/data/sfdouble/"
33-
val arrPath = s"$sparkHome/python/test_support/data/sfarray/"
34-
val classPath = s"$sparkHome/python/test_support/data/sfclass/"
41+
val basePath = s"$sparkHome/python/test_support/data/"
42+
val textPath = s"$basePath/sftext/"
43+
val intPath = s"$basePath/sfint/"
44+
val doublePath = s"$basePath/sfdouble/"
45+
val arrPath = s"$basePath/sfarray/"
46+
val mapPath = s"$basePath/sfmap/"
47+
val classPath = s"$basePath/sfclass/"
48+
val bytesPath = s"$basePath/sfbytes/"
49+
val boolPath = s"$basePath/sfbool"
50+
val nullPath = s"$basePath/sfnull"
3551

36-
val intKeys = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
52+
// Create test data for IntWritable, DoubleWritable, Text, BytesWritable, BooleanWritable and NullWritable
53+
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
3754
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
3855
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
3956
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
57+
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
58+
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
59+
sc.parallelize(bools).saveAsSequenceFile(boolPath)
60+
sc.parallelize(intKeys).map{ case (k, v) => (new IntWritable(k), NullWritable.get()) }.saveAsSequenceFile(nullPath)
4061

62+
// Create test data for ArrayWritable
4163
val data = Seq(
4264
(1, Array(1.0, 2.0, 3.0)),
4365
(2, Array(3.0, 4.0, 5.0)),
@@ -49,6 +71,24 @@ object WriteInputFormatTestDataGenerator extends App {
4971
}
5072
.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
5173

74+
// Create test data for MapWritable, with keys DoubleWritable and values Text
75+
val mapData = Seq(
76+
(1, Map(2.0 -> "aa")),
77+
(2, Map(3.0 -> "bb")),
78+
(2, Map(1.0 -> "cc")),
79+
(3, Map(2.0 -> "dd")),
80+
(2, Map(1.0 -> "aa")),
81+
(1, Map(3.0 -> "bb"))
82+
)
83+
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
84+
val mw = new MapWritable()
85+
val k = m.keys.head
86+
val v = m.values.head
87+
mw.put(new DoubleWritable(k), new Text(v))
88+
(new IntWritable(i), mw)
89+
}.saveAsSequenceFile(mapPath)
90+
91+
// Create test data for arbitrary custom writable TestWritable
5292
val testClass = Seq(
5393
("1", TestWritable("test1", 123, 54.0)),
5494
("2", TestWritable("test2", 456, 8762.3)),

0 commit comments

Comments
 (0)