Skip to content

Commit f971d6c

Browse files
MLnickmateiz
authored andcommitted
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it. This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark. # Overview The basics are as follows: 1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark 2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives) 3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString``` 4. ```PickleSerializer``` on the Python side deserializes. This works "out the box" for simple ```Writable```s: * ```Text``` * ```IntWritable```, ```DoubleWritable```, ```FloatWritable``` * ```NullWritable``` * ```BooleanWritable``` * ```BytesWritable``` * ```MapWritable``` It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added). I've tested it out with ```ESInputFormat``` as an example and it works very nicely: ```python conf = {"es.resource" : "index/type" } rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) rdd.first() ``` I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box. # Some things still outstanding: 1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~ 2. ~~I see from apache#363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~ 3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~ 4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR Author: Nick Pentreath <[email protected]> Closes apache#455 from MLnick/pyspark-inputformats and squashes the following commits: 268df7e [Nick Pentreath] Documentation changes mer @pwendell comments 761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry. 4c972d8 [Nick Pentreath] Add license headers d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats cde6af9 [Nick Pentreath] Parameterize converter trait 5ebacfa [Nick Pentreath] Update docs for PySpark input formats a985492 [Nick Pentreath] Move Converter examples to own package 365d0be [Nick Pentreath] Make classes private[python]. Add docs and @experimental annotation to Converter interface. eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests 1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight 3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python b65606f [Nick Pentreath] Add converter interface 5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None 085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs 43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide 94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods 1a4a1d6 [Nick Pentreath] Address @mateiz style comments 01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase 9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 84fe8e3 [Nick Pentreath] Python programming guide space formatting d0f52b6 [Nick Pentreath] Python programming guide 7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 93ef995 [Nick Pentreath] Add back context.py changes 9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py 077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py 5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 35b8e3a [Nick Pentreath] Another fix for test ordering bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e001b94 [Nick Pentreath] Fix test failures due to ordering 78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide 64eb051 [Nick Pentreath] Scalastyle fix e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests 1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir 17a656b [Nick Pentreath] remove binary sequencefile for tests f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark 450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 31a2fff [Nick Pentreath] Scalastyle fixes fc5099e [Nick Pentreath] Add Apache license headers 4e08983 [Nick Pentreath] Clean up docs for PySpark context methods b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies 951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats f6aac55 [Nick Pentreath] Bring back msgpack 9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering 7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging 25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps 65360d5 [Nick Pentreath] Adding test SequenceFiles 0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats d72bf18 [Nick Pentreath] msgpack dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e67212a [Nick Pentreath] Add back msgpack dependency f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 97ef708 [Nick Pentreath] Remove old writeToStream 2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data. 174f520 [Nick Pentreath] Add back graphx settings 703ee65 [Nick Pentreath] Add back msgpack 619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats eb40036 [Nick Pentreath] Remove unused comment lines 4d7ef2e [Nick Pentreath] Fix indentation f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments 0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer 4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names 818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD 4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up 4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
1 parent 6f2db8c commit f971d6c

File tree

13 files changed

+1140
-6
lines changed

13 files changed

+1140
-6
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.python
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.Logging
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.io._
24+
import scala.util.{Failure, Success, Try}
25+
import org.apache.spark.annotation.Experimental
26+
27+
28+
/**
29+
* :: Experimental ::
30+
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
31+
* transformation code by overriding the convert method.
32+
*/
33+
@Experimental
34+
trait Converter[T, U] extends Serializable {
35+
def convert(obj: T): U
36+
}
37+
38+
private[python] object Converter extends Logging {
39+
40+
def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
41+
converterClass.map { cc =>
42+
Try {
43+
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
44+
logInfo(s"Loaded converter: $cc")
45+
c
46+
} match {
47+
case Success(c) => c
48+
case Failure(err) =>
49+
logError(s"Failed to load converter: $cc")
50+
throw err
51+
}
52+
}.getOrElse { new DefaultConverter }
53+
}
54+
}
55+
56+
/**
57+
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
58+
* Other objects are passed through without conversion.
59+
*/
60+
private[python] class DefaultConverter extends Converter[Any, Any] {
61+
62+
/**
63+
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
64+
* object representation
65+
*/
66+
private def convertWritable(writable: Writable): Any = {
67+
import collection.JavaConversions._
68+
writable match {
69+
case iw: IntWritable => iw.get()
70+
case dw: DoubleWritable => dw.get()
71+
case lw: LongWritable => lw.get()
72+
case fw: FloatWritable => fw.get()
73+
case t: Text => t.toString
74+
case bw: BooleanWritable => bw.get()
75+
case byw: BytesWritable => byw.getBytes
76+
case n: NullWritable => null
77+
case aw: ArrayWritable => aw.get().map(convertWritable(_))
78+
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
79+
(convertWritable(k), convertWritable(v))
80+
}.toMap)
81+
case other => other
82+
}
83+
}
84+
85+
def convert(obj: Any): Any = {
86+
obj match {
87+
case writable: Writable =>
88+
convertWritable(writable)
89+
case _ =>
90+
obj
91+
}
92+
}
93+
}
94+
95+
/** Utilities for working with Python objects <-> Hadoop-related objects */
96+
private[python] object PythonHadoopUtil {
97+
98+
/**
99+
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
100+
*/
101+
def mapToConf(map: java.util.Map[String, String]): Configuration = {
102+
import collection.JavaConversions._
103+
val conf = new Configuration()
104+
map.foreach{ case (k, v) => conf.set(k, v) }
105+
conf
106+
}
107+
108+
/**
109+
* Merges two configurations, returns a copy of left with keys from right overwriting
110+
* any matching keys in left
111+
*/
112+
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
113+
import collection.JavaConversions._
114+
val copy = new Configuration(left)
115+
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
116+
copy
117+
}
118+
119+
/**
120+
* Converts an RDD of key-value pairs, where key and/or value could be instances of
121+
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
122+
*/
123+
def convertRDD[K, V](rdd: RDD[(K, V)],
124+
keyConverter: Converter[Any, Any],
125+
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
126+
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
127+
}
128+
129+
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import scala.util.Try
2828

2929
import net.razorvine.pickle.{Pickler, Unpickler}
3030

31+
import org.apache.hadoop.conf.Configuration
32+
import org.apache.hadoop.mapred.{InputFormat, JobConf}
33+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3134
import org.apache.spark._
3235
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
3336
import org.apache.spark.broadcast.Broadcast
@@ -266,7 +269,7 @@ private object SpecialLengths {
266269
val TIMING_DATA = -3
267270
}
268271

269-
private[spark] object PythonRDD {
272+
private[spark] object PythonRDD extends Logging {
270273
val UTF8 = Charset.forName("UTF-8")
271274

272275
/**
@@ -346,6 +349,180 @@ private[spark] object PythonRDD {
346349
}
347350
}
348351

352+
/**
353+
* Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
354+
* key and value class.
355+
* A key and/or value converter class can optionally be passed in
356+
* (see [[org.apache.spark.api.python.Converter]])
357+
*/
358+
def sequenceFile[K, V](
359+
sc: JavaSparkContext,
360+
path: String,
361+
keyClassMaybeNull: String,
362+
valueClassMaybeNull: String,
363+
keyConverterClass: String,
364+
valueConverterClass: String,
365+
minSplits: Int) = {
366+
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
367+
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
368+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
369+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
370+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
371+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
372+
373+
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
374+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
375+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
376+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
377+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
378+
}
379+
380+
/**
381+
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
382+
* key and value class.
383+
* A key and/or value converter class can optionally be passed in
384+
* (see [[org.apache.spark.api.python.Converter]])
385+
*/
386+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
387+
sc: JavaSparkContext,
388+
path: String,
389+
inputFormatClass: String,
390+
keyClass: String,
391+
valueClass: String,
392+
keyConverterClass: String,
393+
valueConverterClass: String,
394+
confAsMap: java.util.HashMap[String, String]) = {
395+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
396+
val baseConf = sc.hadoopConfiguration()
397+
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
398+
val rdd =
399+
newAPIHadoopRDDFromClassNames[K, V, F](sc,
400+
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
401+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
402+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
403+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
404+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
405+
}
406+
407+
/**
408+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
409+
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
410+
* key and value class.
411+
* A key and/or value converter class can optionally be passed in
412+
* (see [[org.apache.spark.api.python.Converter]])
413+
*/
414+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
415+
sc: JavaSparkContext,
416+
inputFormatClass: String,
417+
keyClass: String,
418+
valueClass: String,
419+
keyConverterClass: String,
420+
valueConverterClass: String,
421+
confAsMap: java.util.HashMap[String, String]) = {
422+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
423+
val rdd =
424+
newAPIHadoopRDDFromClassNames[K, V, F](sc,
425+
None, inputFormatClass, keyClass, valueClass, conf)
426+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
427+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
428+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
429+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
430+
}
431+
432+
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
433+
sc: JavaSparkContext,
434+
path: Option[String] = None,
435+
inputFormatClass: String,
436+
keyClass: String,
437+
valueClass: String,
438+
conf: Configuration) = {
439+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
440+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
441+
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
442+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
443+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
444+
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
445+
val rdd = if (path.isDefined) {
446+
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
447+
} else {
448+
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
449+
}
450+
rdd
451+
}
452+
453+
/**
454+
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
455+
* key and value class.
456+
* A key and/or value converter class can optionally be passed in
457+
* (see [[org.apache.spark.api.python.Converter]])
458+
*/
459+
def hadoopFile[K, V, F <: InputFormat[K, V]](
460+
sc: JavaSparkContext,
461+
path: String,
462+
inputFormatClass: String,
463+
keyClass: String,
464+
valueClass: String,
465+
keyConverterClass: String,
466+
valueConverterClass: String,
467+
confAsMap: java.util.HashMap[String, String]) = {
468+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
469+
val baseConf = sc.hadoopConfiguration()
470+
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
471+
val rdd =
472+
hadoopRDDFromClassNames[K, V, F](sc,
473+
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
474+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
475+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
476+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
477+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
478+
}
479+
480+
/**
481+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
482+
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
483+
* key and value class
484+
* A key and/or value converter class can optionally be passed in
485+
* (see [[org.apache.spark.api.python.Converter]])
486+
*/
487+
def hadoopRDD[K, V, F <: InputFormat[K, V]](
488+
sc: JavaSparkContext,
489+
inputFormatClass: String,
490+
keyClass: String,
491+
valueClass: String,
492+
keyConverterClass: String,
493+
valueConverterClass: String,
494+
confAsMap: java.util.HashMap[String, String]) = {
495+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
496+
val rdd =
497+
hadoopRDDFromClassNames[K, V, F](sc,
498+
None, inputFormatClass, keyClass, valueClass, conf)
499+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
500+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
501+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
502+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
503+
}
504+
505+
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
506+
sc: JavaSparkContext,
507+
path: Option[String] = None,
508+
inputFormatClass: String,
509+
keyClass: String,
510+
valueClass: String,
511+
conf: Configuration) = {
512+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
513+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
514+
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
515+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
516+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
517+
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
518+
val rdd = if (path.isDefined) {
519+
sc.sc.hadoopFile(path.get, fc, kc, vc)
520+
} else {
521+
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
522+
}
523+
rdd
524+
}
525+
349526
def writeUTF(str: String, dataOut: DataOutputStream) {
350527
val bytes = str.getBytes(UTF8)
351528
dataOut.writeInt(bytes.length)

0 commit comments

Comments
 (0)