Skip to content

Commit 67acea8

Browse files
committed
removing the columnFamilyAccumulator
1 parent 37526de commit 67acea8

File tree

3 files changed

+8
-66
lines changed

3 files changed

+8
-66
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class StatefulProcessorHandleImpl(
8585
isStreaming: Boolean = true,
8686
batchTimestampMs: Option[Long] = None,
8787
metrics: Map[String, SQLMetric] = Map.empty,
88-
existingColFamilies: Map[String, ColumnFamilyAccumulator] = Map.empty)
88+
existingColFamilies: Map[String, ColumnFamilySchemaV1] = Map.empty)
8989
extends StatefulProcessorHandle with Logging {
9090
import StatefulProcessorHandleState._
9191

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,19 @@ case class TransformWithStateExec(
9393
"operatorPropsFromExecutor"
9494
)
9595

96-
private lazy val colFamilyAccumulators: Map[String, ColumnFamilyAccumulator] =
96+
private lazy val colFamilySchemas: Map[String, ColumnFamilySchemaV1] =
9797
initializeColFamilyAccumulators()
9898

99-
private def initializeColFamilyAccumulators(): Map[String, ColumnFamilyAccumulator] = {
99+
private def initializeColFamilyAccumulators(): Map[String, ColumnFamilySchemaV1] = {
100100
val stateCheckpointPath = new Path(stateInfo.get.checkpointLocation,
101101
getStateInfo.operatorId.toString)
102102
val hadoopConf = session.sqlContext.sessionState.newHadoopConf()
103103

104104
val reader = new SchemaV3Reader(stateCheckpointPath, hadoopConf)
105105

106106
reader.read.map { colFamilyMetadata =>
107-
val acc = ColumnFamilyAccumulator.create(colFamilyMetadata, sparkContext)
108-
colFamilyMetadata.asInstanceOf[ColumnFamilySchemaV1].columnFamilyName -> acc
107+
val schemaV1 = colFamilyMetadata.asInstanceOf[ColumnFamilySchemaV1]
108+
schemaV1.columnFamilyName -> schemaV1
109109
}.toMap
110110
}
111111

@@ -430,7 +430,7 @@ case class TransformWithStateExec(
430430

431431
override protected def doExecute(): RDD[InternalRow] = {
432432
metrics // force lazy init at driver
433-
colFamilyAccumulators
433+
colFamilySchemas
434434

435435
validateTimeMode()
436436

@@ -549,7 +549,7 @@ case class TransformWithStateExec(
549549
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
550550
val processorHandle = new StatefulProcessorHandleImpl(
551551
store, getStateInfo.queryRunId, keyEncoder, timeMode,
552-
isStreaming, batchTimestampMs, metrics, colFamilyAccumulators)
552+
isStreaming, batchTimestampMs, metrics, colFamilySchemas)
553553
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
554554
statefulProcessor.setHandle(processorHandle)
555555
statefulProcessor.init(outputMode, timeMode)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ import org.json4s.JsonDSL._
2626
import org.json4s.jackson.JsonMethods
2727
import org.json4s.jackson.JsonMethods.{compact, render}
2828

29-
import org.apache.spark.SparkContext
3029
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil}
3130
import org.apache.spark.sql.types.StructType
32-
import org.apache.spark.util.{AccumulatorV2, Utils}
31+
import org.apache.spark.util.Utils
3332

3433
sealed trait ColumnFamilySchema extends Serializable {
3534
def jsonValue: JsonAST.JObject
@@ -56,63 +55,6 @@ case class ColumnFamilySchemaV1(
5655
}
5756
}
5857

59-
class ColumnFamilyAccumulator(
60-
columnFamilyMetadata: ColumnFamilySchema) extends
61-
AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema] {
62-
63-
private var _value: ColumnFamilySchema = columnFamilyMetadata
64-
/**
65-
* Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
66-
* value; for a list accumulator, Nil is zero value.
67-
*/
68-
override def isZero: Boolean = _value == null
69-
70-
/**
71-
* Creates a new copy of this accumulator.
72-
*/
73-
override def copy(): AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema] = {
74-
new ColumnFamilyAccumulator(_value)
75-
}
76-
77-
/**
78-
* Resets this accumulator, which is zero value. i.e. call `isZero` must
79-
* return true.
80-
*/
81-
override def reset(): Unit = {
82-
_value = null
83-
}
84-
85-
/**
86-
* Takes the inputs and accumulates.
87-
*/
88-
override def add(v: ColumnFamilySchema): Unit = {
89-
_value = v
90-
}
91-
92-
/**
93-
* Merges another same-type accumulator into this one and update its state, i.e. this should be
94-
* merge-in-place.
95-
*/
96-
override def merge(other: AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema]): Unit = {
97-
_value = other.value
98-
}
99-
100-
/**
101-
* Defines the current value of this accumulator
102-
*/
103-
override def value: ColumnFamilySchema = _value
104-
}
105-
106-
object ColumnFamilyAccumulator {
107-
def create(
108-
columnFamilyMetadata: ColumnFamilySchema,
109-
sparkContext: SparkContext): ColumnFamilyAccumulator = {
110-
val acc = new ColumnFamilyAccumulator(columnFamilyMetadata)
111-
acc.register(sparkContext)
112-
acc
113-
}
114-
}
115-
11658
object ColumnFamilySchemaV1 {
11759
def fromJson(json: List[Map[String, Any]]): List[ColumnFamilySchema] = {
11860
assert(json.isInstanceOf[List[_]])

0 commit comments

Comments
 (0)