Skip to content

Commit 980a46f

Browse files
committed
[SPARK-28191][SS] New data source - state - reader part
1 parent 3139d64 commit 980a46f

File tree

12 files changed

+1171
-0
lines changed

12 files changed

+1171
-0
lines changed

sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ org.apache.spark.sql.execution.datasources.noop.NoopDataSource
55
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
66
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
77
org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
8+
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2
89
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
910
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
1011
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2.state
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.fs.{FileUtil, Path}
22+
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog, OffsetSeqMetadata}
25+
26+
/**
27+
* Providing features to deal with checkpoint, like creating savepoint.
28+
*/
29+
object CheckpointUtil {
30+
31+
/**
32+
* Create savepoint from existing checkpoint.
33+
* OffsetLog and CommitLog will be purged based on newLastBatchId.
34+
* Use `additionalMetadataConf` to modify metadata configuration: you may want to modify it
35+
* when rescaling state, or migrate state format version.
36+
* e.g. when rescaling, pass Map(SQLConf.SHUFFLE_PARTITIONS.key -> newShufflePartitions.toString)
37+
*
38+
* @param sparkSession spark session
39+
* @param checkpointRoot the root path of existing checkpoint
40+
* @param newCheckpointRoot the root path of new savepoint - target directory should be empty
41+
* @param newLastBatchId the new last batch ID - it needs to be one of committed batch ID
42+
* @param additionalMetadataConf the configuration to add to existing metadata configuration
43+
* @param excludeState whether to exclude state directory
44+
*/
45+
def createSavePoint(
46+
sparkSession: SparkSession,
47+
checkpointRoot: String,
48+
newCheckpointRoot: String,
49+
newLastBatchId: Long,
50+
additionalMetadataConf: Map[String, String],
51+
excludeState: Boolean = false): Unit = {
52+
val hadoopConf = sparkSession.sessionState.newHadoopConf()
53+
54+
val src = new Path(resolve(hadoopConf, checkpointRoot))
55+
val srcFs = src.getFileSystem(hadoopConf)
56+
val dst = new Path(resolve(hadoopConf, newCheckpointRoot))
57+
val dstFs = dst.getFileSystem(hadoopConf)
58+
59+
if (dstFs.listFiles(dst, false).hasNext) {
60+
throw new IllegalArgumentException("Destination directory should be empty.")
61+
}
62+
63+
dstFs.mkdirs(dst)
64+
65+
// copy content of src directory to dst directory
66+
srcFs.listStatus(src).foreach { fs =>
67+
val path = fs.getPath
68+
val fileName = path.getName
69+
if (fileName == "state" && excludeState) {
70+
// pass
71+
} else {
72+
FileUtil.copy(srcFs, path, dstFs, new Path(dst, fileName),
73+
false, false, hadoopConf)
74+
}
75+
}
76+
77+
val offsetLog = new OffsetSeqLog(sparkSession, new Path(dst, "offsets").toString)
78+
val logForBatch = offsetLog.get(newLastBatchId) match {
79+
case Some(log) => log
80+
case None => throw new IllegalStateException("offset log for batch should be exist")
81+
}
82+
83+
val newMetadata = logForBatch.metadata match {
84+
case Some(md) =>
85+
val newMap = md.conf ++ additionalMetadataConf
86+
Some(md.copy(conf = newMap))
87+
case None =>
88+
Some(OffsetSeqMetadata(conf = additionalMetadataConf))
89+
}
90+
91+
val newLogForBatch = logForBatch.copy(metadata = newMetadata)
92+
93+
// we will restart from last batch + 1: overwrite the last batch with new configuration
94+
offsetLog.purgeAfter(newLastBatchId - 1)
95+
offsetLog.add(newLastBatchId, newLogForBatch)
96+
97+
val commitLog = new CommitLog(sparkSession, new Path(dst, "commits").toString)
98+
commitLog.purgeAfter(newLastBatchId)
99+
100+
// state doesn't expose purge mechanism as its interface
101+
// assuming state would work with overwriting batch files when it replays previous batch
102+
}
103+
104+
private def resolve(hadoopConf: Configuration, cpLocation: String): String = {
105+
val checkpointPath = new Path(cpLocation)
106+
val fs = checkpointPath.getFileSystem(hadoopConf)
107+
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
108+
}
109+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2.state
18+
19+
import org.apache.spark.sql.types.{DataType, StructType}
20+
21+
object SchemaUtil {
22+
def getSchemaAsDataType(schema: StructType, fieldName: String): DataType = {
23+
schema(schema.getFieldIndex(fieldName).get).dataType
24+
}
25+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2.state
18+
19+
import org.apache.spark.sql.{AnalysisException, SparkSession}
20+
import org.apache.spark.sql.execution.streaming.state.StateStoreId
21+
import org.apache.spark.sql.sources.DataSourceRegister
22+
import org.apache.spark.sql.sources.v2.{Table, TableProvider}
23+
import org.apache.spark.sql.types.StructType
24+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
25+
26+
class StateDataSourceV2 extends TableProvider with DataSourceRegister {
27+
28+
import StateDataSourceV2._
29+
30+
lazy val session = SparkSession.active
31+
32+
override def shortName(): String = "state"
33+
34+
override def getTable(options: CaseInsensitiveStringMap): Table =
35+
throw new UnsupportedOperationException("Schema should be explicitly specified.")
36+
37+
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
38+
val checkpointLocation = Option(options.get(PARAM_CHECKPOINT_LOCATION)).orElse {
39+
throw new AnalysisException(s"'$PARAM_CHECKPOINT_LOCATION' must be specified.")
40+
}.get
41+
42+
val version = Option(options.get(PARAM_VERSION)).map(_.toInt).orElse {
43+
throw new AnalysisException(s"'$PARAM_VERSION' must be specified.")
44+
}.get
45+
46+
val operatorId = Option(options.get(PARAM_OPERATOR_ID)).map(_.toInt).orElse {
47+
throw new AnalysisException(s"'$PARAM_OPERATOR_ID' must be specified.")
48+
}.get
49+
50+
val storeName = Option(options.get(PARAM_STORE_NAME))
51+
.orElse(Some(StateStoreId.DEFAULT_STORE_NAME)).get
52+
53+
new StateTable(session, schema, checkpointLocation, version, operatorId, storeName)
54+
}
55+
}
56+
57+
object StateDataSourceV2 {
58+
val PARAM_CHECKPOINT_LOCATION = "checkpointLocation"
59+
val PARAM_VERSION = "version"
60+
val PARAM_OPERATOR_ID = "operatorId"
61+
val PARAM_STORE_NAME = "storeName"
62+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2.state
18+
19+
import org.apache.spark.sql.Row
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
22+
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, UnsafeRow}
23+
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
24+
import org.apache.spark.sql.sources.v2.reader.PartitionReader
25+
import org.apache.spark.sql.types.StructType
26+
import org.apache.spark.util.SerializableConfiguration
27+
28+
class StatePartitionReader(
29+
storeConf: StateStoreConf,
30+
hadoopConf: SerializableConfiguration,
31+
partition: StateStoreInputPartition,
32+
schema: StructType) extends PartitionReader[InternalRow] {
33+
34+
private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
35+
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType]
36+
37+
private lazy val iter = {
38+
val stateStoreId = StateStoreId(partition.stateCheckpointRootLocation,
39+
partition.operatorId, partition.partition, partition.storeName)
40+
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
41+
42+
val store = StateStore.get(stateStoreProviderId, keySchema, valueSchema,
43+
indexOrdinal = None, version = partition.version, storeConf = storeConf,
44+
hadoopConf = hadoopConf.value)
45+
46+
store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
47+
}
48+
49+
private var current: InternalRow = _
50+
51+
override def next(): Boolean = {
52+
if (iter.hasNext) {
53+
current = iter.next()
54+
true
55+
} else {
56+
current = null
57+
false
58+
}
59+
}
60+
61+
override def get(): InternalRow = current
62+
63+
override def close(): Unit = {
64+
current = null
65+
}
66+
67+
private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
68+
val row = new GenericInternalRow(2)
69+
row.update(0, pair._1)
70+
row.update(1, pair._2)
71+
row
72+
}
73+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2.state
18+
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.execution.streaming.state.StateStoreConf
21+
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory}
22+
import org.apache.spark.sql.types.StructType
23+
import org.apache.spark.util.SerializableConfiguration
24+
25+
class StatePartitionReaderFactory(
26+
storeConf: StateStoreConf,
27+
hadoopConf: SerializableConfiguration,
28+
schema: StructType) extends PartitionReaderFactory {
29+
30+
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
31+
val part = partition match {
32+
case p: StateStoreInputPartition => p
33+
case e => throw new IllegalStateException("Expected StateStorePartition but other type of " +
34+
s"partition passed - $e")
35+
}
36+
37+
new StatePartitionReader(storeConf, hadoopConf, part, schema)
38+
}
39+
}

0 commit comments

Comments
 (0)