diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 251cc16acdf43..14576bbcab821 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -233,8 +233,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { val batchMetadataFile = batchIdToPath(batchId) + logError(s"### batchMetadataFile: ${batchMetadataFile.toString}") if ((metadataCacheEnabled && batchCache.containsKey(batchId)) || fileManager.exists(batchMetadataFile)) { + logError(s"### ${path}: ${metadataCacheEnabled} " + + s"${batchCache.containsKey(batchId)} " + + s"${fileManager.exists(batchMetadataFile)}") false } else { write(batchMetadataFile, fn) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index a7ea02d957830..c25aad0181982 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.{WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -88,6 +88,22 @@ class MicroBatchExecution( @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + lazy val operatorStateMetadatas: Map[Long, OperatorStateMetadataLog] = { + populateOperatorStateMetadatas(getLatestExecutionContext().executionPlan.executedPlan) + } + + private def populateOperatorStateMetadatas(plan: SparkPlan): + Map[Long, OperatorStateMetadataLog] = { + plan.flatMap { + case s: StateStoreWriter => s.stateInfo.map { info => + val metadataPath = s.getOperatorStateMetadataPath() + info.operatorId -> new OperatorStateMetadataLog(sparkSession, + metadataPath.toString) + } + case _ => Seq.empty + }.toMap + } + protected def getTrigger(): TriggerExecutor = { assert(sources.nonEmpty, "sources should have been retrieved from the plan!") trigger match { @@ -907,6 +923,15 @@ class MicroBatchExecution( if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) { throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } + execCtx.executionPlan.executedPlan.collect { + case s: StateStoreWriter => + val metadata = s.operatorStateMetadata() + val id = metadata.operatorInfo.operatorId + val metadataFile = operatorStateMetadatas(id) + if (!metadataFile.add(execCtx.batchId, metadata)) { + throw new Exception("Could not add metadata file") + } + } } committedOffsets ++= execCtx.endOffsets } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala new file mode 100644 index 0000000000000..9860989d4bea0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream} +import java.nio.charset.StandardCharsets._ + +import org.apache.hadoop.fs.FSDataOutputStream + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataV2} + + +class OperatorStateMetadataLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[OperatorStateMetadata](sparkSession, path) { + + override protected def serialize(metadata: OperatorStateMetadata, out: OutputStream): Unit = { + val fsDataOutputStream = out.asInstanceOf[FSDataOutputStream] + OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata) + } + + override protected def deserialize(in: InputStream): OperatorStateMetadata = { + // called inside a try-finally where the underlying stream is closed in the caller + // create buffered reader from input stream + val bufferedReader = new BufferedReader(new InputStreamReader(in, UTF_8)) + OperatorStateMetadataV2.deserialize(bufferedReader) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala index 05012b4724d46..221ce0d35c2ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala @@ -30,7 +30,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.SparkContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil} +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil, OperatorStateMetadataLog} import org.apache.spark.util.AccumulatorV2 /** @@ -64,8 +64,10 @@ trait OperatorStateMetadata { } object OperatorStateMetadata { - def metadataFilePath(stateCheckpointPath: Path): Path = + def metadataFilePath(stateCheckpointPath: Path): Path = { + print(s"### stateCheckpointPath: ${stateCheckpointPath.toString}") new Path(new Path(stateCheckpointPath, "_metadata"), "metadata") + } } case class OperatorStateMetadataV1( operatorInfo: OperatorInfoV1, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index d7e8428318b85..9ca8424e5194c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit._ import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.hadoop.fs.Path + import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.AnalysisException @@ -70,6 +72,11 @@ trait StatefulOperator extends SparkPlan { throw new IllegalStateException("State location not present for execution") } } + + def getOperatorStateMetadataPath(): Path = { + new Path(new Path(stateInfo.get.checkpointLocation, + getStateInfo.operatorId.toString), "operatorStateMetadata") + } } /**