@@ -380,19 +380,37 @@ case class TransformWithStateExec(
380380 )
381381 }
382382
383+ private def fetchOperatorStateMetadataLog (
384+ hadoopConf : Configuration ,
385+ checkpointDir : String ,
386+ operatorId : Long ): OperatorStateMetadataLog = {
387+ val checkpointPath = new Path (checkpointDir, operatorId.toString)
388+ val operatorStateMetadataPath = OperatorStateMetadataV2 .metadataFilePath(checkpointPath)
389+ new OperatorStateMetadataLog (hadoopConf, operatorStateMetadataPath.toString)
390+ }
391+
383392 override def validateAndMaybeEvolveStateSchema (
384393 hadoopConf : Configuration ,
385394 batchId : Long ,
386395 stateSchemaVersion : Int ): Array [String ] = {
387396 assert(stateSchemaVersion >= 3 )
388- val newColumnFamilySchemas = getColFamilySchemas()
397+ val newSchemas = getColFamilySchemas()
389398 val schemaFile = new StateSchemaV3File (
390399 hadoopConf, stateSchemaDirPath(StateStoreId .DEFAULT_STORE_NAME ).toString)
391400 // TODO: Read the schema path from the OperatorStateMetadata file
392401 // and validate it with the new schema
393402
403+ val operatorStateMetadataLog = fetchOperatorStateMetadataLog(
404+ hadoopConf, getStateInfo.checkpointLocation, getStateInfo.operatorId)
405+ val mostRecentLog = operatorStateMetadataLog.getLatest()
406+ val oldSchemas = mostRecentLog.map(_._2.asInstanceOf [OperatorStateMetadataV2 ])
407+ .map(_.stateStoreInfo.map(_.stateSchemaFilePath)).getOrElse(Array .empty)
408+ .flatMap { schemaPath =>
409+ schemaFile.getWithPath(new Path (schemaPath))
410+ }.toList
411+ validateSchemas(oldSchemas, newSchemas)
394412 // Write the new schema to the schema file
395- val schemaPath = schemaFile.addWithUUID(batchId, newColumnFamilySchemas .values.toList)
413+ val schemaPath = schemaFile.addWithUUID(batchId, newSchemas .values.toList)
396414 Array (schemaPath.toString)
397415 }
398416
0 commit comments