Skip to content

Commit 9f306b4

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-6439] Add configuration to enable log column lineage (#1238)
* [CARMEL-6439] Add configuration to enable log column lineage * remove log * useless condition * skip empty lineage
1 parent 5f91f87 commit 9f306b4

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,20 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
2828
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
2929
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
3030
import org.apache.spark.sql.catalyst.plans.QueryPlan
31-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
31+
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReturnAnswer}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
3434
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
3535
import org.apache.spark.sql.catalyst.util.truncatedString
3636
import org.apache.spark.sql.execution.QueryExecution.skipAuthTag
3737
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, EnsureRepartitionForWriting, InsertAdaptiveSparkPlan}
3838
import org.apache.spark.sql.execution.bucketing.{AdjustScanPartitionSizeDynamically, DisableUnnecessaryBucketedScan}
39+
import org.apache.spark.sql.execution.command.DataWritingCommand
3940
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
4041
import org.apache.spark.sql.execution.exchange.{EliminateShuffleExec, EnsureRequirements, ExchangePushDownThroughAggregate}
4142
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
4243
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
44+
import org.apache.spark.sql.expressions.lineage.AttributeLineageUtils
4345
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
4446
import org.apache.spark.sql.streaming.OutputMode
4547
import org.apache.spark.util.Utils
@@ -237,6 +239,7 @@ class QueryExecution(
237239
if (SQLConf.get.uiPlanWithMetrics) {
238240
if (tracker.alreadyExecuted) {
239241
append(tracker.getRealExecutionInfo)
242+
return
240243
}
241244
append(stringWithStats)
242245
if (tracker.hasView) {
@@ -245,6 +248,36 @@ class QueryExecution(
245248
if (tracker.hasTempView) {
246249
append(tracker.formattedTempViewUsage())
247250
}
251+
if (sparkSession.sessionState.conf.getConfString(
252+
"spark.sql.logColumnLineage.enable", "false").equalsIgnoreCase("true")) {
253+
if (optimizedPlan.isInstanceOf[DataWritingCommand] ||
254+
optimizedPlan.isInstanceOf[CreateTableAsSelect]) {
255+
append("\n=== Column Lineage Start ===\n")
256+
val startTime = System.currentTimeMillis()
257+
var columns = 0
258+
try {
259+
val (outputColumnNames, query) = optimizedPlan match {
260+
case dw: DataWritingCommand => (dw.outputColumnNames, dw.query)
261+
case ctas: CreateTableAsSelect => (ctas.query.output.map(_.name), ctas.query)
262+
}
263+
columns = outputColumnNames.length
264+
outputColumnNames.zipWithIndex.foreach { cidx =>
265+
val lineages = AttributeLineageUtils.getAttributeOrigins(
266+
query, cidx._2, keepOperations = false)
267+
if (lineages.nonEmpty) {
268+
append(cidx._1 + ": " + lineages.map(_.getColumn).mkString("|") + "\n")
269+
}
270+
}
271+
} catch {
272+
case _: Exception =>
273+
append("Fail to get the column lineage information\n")
274+
}
275+
val cost = System.currentTimeMillis() - startTime
276+
append(s"Total table columns: $columns columns\n")
277+
append(s"Lineage time cost: $cost ms\n")
278+
append("=== Column Lineage End ===\n")
279+
}
280+
}
248281
append(tracker.formattedRulesByTime())
249282
} else {
250283
val (verbose, addSuffix) = (true, false)

sql/core/src/main/scala/org/apache/spark/sql/expressions/lineage/AttributeOrigin.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ case class AttributeOrigin(
5050
s"keepOrigin=${!isDerived}, affectedOperations=${affectedOperations.length})"
5151
}
5252

53+
def getColumn: String = {
54+
val tbl = leafNode match {
55+
case l: LogicalRelation if l.catalogTable.isDefined =>
56+
l.catalogTable.get.identifier.unquotedString
57+
case f: FileSourceScanExec if f.tableIdentifier.isDefined =>
58+
f.tableIdentifier.get.unquotedString
59+
case _ => leafNode.nodeName
60+
}
61+
s"($tbl, ${leafNode.output(index).name})"
62+
}
63+
5364
def isOriginalTableColumn: Boolean = !isDerived
5465

5566
def affectedOperations: Seq[Expression] = {

0 commit comments

Comments
 (0)