Skip to content

Commit bd18cd4

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into date_add
2 parents e47ff2c + b715933 commit bd18cd4

File tree

44 files changed

+1024
-295
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1024
-295
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
<name>Spark Project Core</name>
3535
<url>http://spark.apache.org/</url>
3636
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.avro</groupId>
39+
<artifactId>avro-mapred</artifactId>
40+
<classifier>${avro.mapred.classifier}</classifier>
41+
</dependency>
3742
<dependency>
3843
<groupId>com.google.guava</groupId>
3944
<artifactId>guava</artifactId>

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.spark
1919

2020
import java.util.concurrent.ConcurrentHashMap
21-
import java.util.concurrent.atomic.AtomicBoolean
2221

2322
import scala.collection.JavaConverters._
2423
import scala.collection.mutable.LinkedHashSet
2524

25+
import org.apache.avro.{SchemaNormalization, Schema}
26+
2627
import org.apache.spark.serializer.KryoSerializer
2728
import org.apache.spark.util.Utils
2829

@@ -161,6 +162,26 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
161162
this
162163
}
163164

165+
private final val avroNamespace = "avro.schema."
166+
167+
/**
168+
* Use Kryo serialization and register the given set of Avro schemas so that the generic
169+
* record serializer can decrease network IO
170+
*/
171+
def registerAvroSchemas(schemas: Schema*): SparkConf = {
172+
for (schema <- schemas) {
173+
set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString)
174+
}
175+
this
176+
}
177+
178+
/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */
179+
def getAvroSchema: Map[Long, String] = {
180+
getAll.filter { case (k, v) => k.startsWith(avroNamespace) }
181+
.map { case (k, v) => (k.substring(avroNamespace.length).toLong, v) }
182+
.toMap
183+
}
184+
164185
/** Remove a parameter from the configuration */
165186
def remove(key: String): SparkConf = {
166187
settings.remove(key)
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.serializer
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
21+
import java.nio.ByteBuffer
22+
23+
import scala.collection.mutable
24+
25+
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
26+
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
27+
import org.apache.avro.{Schema, SchemaNormalization}
28+
import org.apache.avro.generic.{GenericData, GenericRecord}
29+
import org.apache.avro.io._
30+
import org.apache.commons.io.IOUtils
31+
32+
import org.apache.spark.{SparkException, SparkEnv}
33+
import org.apache.spark.io.CompressionCodec
34+
35+
/**
36+
* Custom serializer used for generic Avro records. If the user registers the schemas
37+
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
38+
* schema, as to reduce network IO.
39+
* Actions like parsing or compressing schemas are computationally expensive so the serializer
40+
* caches all previously seen values as to reduce the amount of work needed to do.
41+
* @param schemas a map where the keys are unique IDs for Avro schemas and the values are the
42+
* string representation of the Avro schema, used to decrease the amount of data
43+
* that needs to be serialized.
44+
*/
45+
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
46+
extends KSerializer[GenericRecord] {
47+
48+
/** Used to reduce the amount of effort to compress the schema */
49+
private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
50+
private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
51+
52+
/** Reuses the same datum reader/writer since the same schema will be used many times */
53+
private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
54+
private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
55+
56+
/** Fingerprinting is very expensive so this alleviates most of the work */
57+
private val fingerprintCache = new mutable.HashMap[Schema, Long]()
58+
private val schemaCache = new mutable.HashMap[Long, Schema]()
59+
60+
// GenericAvroSerializer can't take a SparkConf in the constructor b/c then it would become
61+
// a member of KryoSerializer, which would make KryoSerializer not Serializable. We make
62+
// the codec lazy here just b/c in some unit tests, we use a KryoSerializer w/out having
63+
// the SparkEnv set (note those tests would fail if they tried to serialize avro data).
64+
private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
65+
66+
/**
67+
* Used to compress Schemas when they are being sent over the wire.
68+
* The compression results are memoized to reduce the compression time since the
69+
* same schema is compressed many times over
70+
*/
71+
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
72+
val bos = new ByteArrayOutputStream()
73+
val out = codec.compressedOutputStream(bos)
74+
out.write(schema.toString.getBytes("UTF-8"))
75+
out.close()
76+
bos.toByteArray
77+
})
78+
79+
/**
80+
* Decompresses the schema into the actual in-memory object. Keeps an internal cache of already
81+
* seen values so to limit the number of times that decompression has to be done.
82+
*/
83+
def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, {
84+
val bis = new ByteArrayInputStream(schemaBytes.array())
85+
val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
86+
new Schema.Parser().parse(new String(bytes, "UTF-8"))
87+
})
88+
89+
/**
90+
* Serializes a record to the given output stream. It caches a lot of the internal data as
91+
* to not redo work
92+
*/
93+
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
94+
val encoder = EncoderFactory.get.binaryEncoder(output, null)
95+
val schema = datum.getSchema
96+
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
97+
SchemaNormalization.parsingFingerprint64(schema)
98+
})
99+
schemas.get(fingerprint) match {
100+
case Some(_) =>
101+
output.writeBoolean(true)
102+
output.writeLong(fingerprint)
103+
case None =>
104+
output.writeBoolean(false)
105+
val compressedSchema = compress(schema)
106+
output.writeInt(compressedSchema.length)
107+
output.writeBytes(compressedSchema)
108+
}
109+
110+
writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
111+
.asInstanceOf[DatumWriter[R]]
112+
.write(datum, encoder)
113+
encoder.flush()
114+
}
115+
116+
/**
117+
* Deserializes generic records into their in-memory form. There is internal
118+
* state to keep a cache of already seen schemas and datum readers.
119+
*/
120+
def deserializeDatum(input: KryoInput): GenericRecord = {
121+
val schema = {
122+
if (input.readBoolean()) {
123+
val fingerprint = input.readLong()
124+
schemaCache.getOrElseUpdate(fingerprint, {
125+
schemas.get(fingerprint) match {
126+
case Some(s) => new Schema.Parser().parse(s)
127+
case None =>
128+
throw new SparkException(
129+
"Error reading attempting to read avro data -- encountered an unknown " +
130+
s"fingerprint: $fingerprint, not sure what schema to use. This could happen " +
131+
"if you registered additional schemas after starting your spark context.")
132+
}
133+
})
134+
} else {
135+
val length = input.readInt()
136+
decompress(ByteBuffer.wrap(input.readBytes(length)))
137+
}
138+
}
139+
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
140+
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
141+
.asInstanceOf[DatumReader[GenericRecord]]
142+
.read(null, decoder)
143+
}
144+
145+
override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
146+
serializeDatum(datum, output)
147+
148+
override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
149+
deserializeDatum(input)
150+
}

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoException}
2727
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
2828
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
2929
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
30+
import org.apache.avro.generic.{GenericData, GenericRecord}
3031
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
3132

3233
import org.apache.spark._
@@ -73,6 +74,8 @@ class KryoSerializer(conf: SparkConf)
7374
.split(',')
7475
.filter(!_.isEmpty)
7576

77+
private val avroSchemas = conf.getAvroSchema
78+
7679
def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
7780

7881
def newKryo(): Kryo = {
@@ -101,6 +104,9 @@ class KryoSerializer(conf: SparkConf)
101104
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
102105
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
103106

107+
kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
108+
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
109+
104110
try {
105111
// scalastyle:off classforname
106112
// Use the default classloader when calling the user registrator.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.serializer
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
21+
import java.nio.ByteBuffer
22+
23+
import com.esotericsoftware.kryo.io.{Output, Input}
24+
import org.apache.avro.{SchemaBuilder, Schema}
25+
import org.apache.avro.generic.GenericData.Record
26+
27+
import org.apache.spark.{SparkFunSuite, SharedSparkContext}
28+
29+
class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
30+
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
31+
32+
val schema : Schema = SchemaBuilder
33+
.record("testRecord").fields()
34+
.requiredString("data")
35+
.endRecord()
36+
val record = new Record(schema)
37+
record.put("data", "test data")
38+
39+
test("schema compression and decompression") {
40+
val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
41+
assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
42+
}
43+
44+
test("record serialization and deserialization") {
45+
val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
46+
47+
val outputStream = new ByteArrayOutputStream()
48+
val output = new Output(outputStream)
49+
genericSer.serializeDatum(record, output)
50+
output.flush()
51+
output.close()
52+
53+
val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
54+
assert(genericSer.deserializeDatum(input) === record)
55+
}
56+
57+
test("uses schema fingerprint to decrease message size") {
58+
val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
59+
60+
val output = new Output(new ByteArrayOutputStream())
61+
62+
val beginningNormalPosition = output.total()
63+
genericSerFull.serializeDatum(record, output)
64+
output.flush()
65+
val normalLength = output.total - beginningNormalPosition
66+
67+
conf.registerAvroSchemas(schema)
68+
val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
69+
val beginningFingerprintPosition = output.total()
70+
genericSerFinger.serializeDatum(record, output)
71+
val fingerprintLength = output.total - beginningFingerprintPosition
72+
73+
assert(fingerprintLength < normalLength)
74+
}
75+
76+
test("caches previously seen schemas") {
77+
val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
78+
val compressedSchema = genericSer.compress(schema)
79+
val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
80+
81+
assert(compressedSchema.eq(genericSer.compress(schema)))
82+
assert(decompressedScheam.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
83+
}
84+
}

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,28 +127,25 @@ object Pregel extends Logging {
127127
var prevG: Graph[VD, ED] = null
128128
var i = 0
129129
while (activeMessages > 0 && i < maxIterations) {
130-
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
131-
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
132-
// Update the graph with the new vertices.
130+
// Receive the messages and update the vertices.
133131
prevG = g
134-
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
135-
g.cache()
132+
g = g.joinVertices(messages)(vprog).cache()
136133

137134
val oldMessages = messages
138-
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
139-
// get to send messages. We must cache messages so it can be materialized on the next line,
140-
// allowing us to uncache the previous iteration.
141-
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache()
142-
// The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
143-
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
144-
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
135+
// Send new messages, skipping edges where neither side received a message. We must cache
136+
// messages so it can be materialized on the next line, allowing us to uncache the previous
137+
// iteration.
138+
messages = g.mapReduceTriplets(
139+
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
140+
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
141+
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
142+
// and the vertices of g).
145143
activeMessages = messages.count()
146144

147145
logInfo("Pregel finished iteration " + i)
148146

149147
// Unpersist the RDDs hidden by newly-materialized RDDs
150148
oldMessages.unpersist(blocking = false)
151-
newVerts.unpersist(blocking = false)
152149
prevG.unpersistVertices(blocking = false)
153150
prevG.edges.unpersist(blocking = false)
154151
// count the iteration

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.spark.sql.catalyst.InternalRow;
2121
import org.apache.spark.sql.types.Decimal;
22-
import org.apache.spark.unsafe.types.Interval;
22+
import org.apache.spark.unsafe.types.CalendarInterval;
2323
import org.apache.spark.unsafe.types.UTF8String;
2424

2525
public interface SpecializedGetters {
@@ -46,7 +46,7 @@ public interface SpecializedGetters {
4646

4747
byte[] getBinary(int ordinal);
4848

49-
Interval getInterval(int ordinal);
49+
CalendarInterval getInterval(int ordinal);
5050

5151
InternalRow getStruct(int ordinal, int numFields);
5252

0 commit comments

Comments
 (0)