Skip to content

Commit e872cd8

Browse files
authored
[GLUTEN-7028][CH][Part-9] Collecting Delta stats for parquet (#7993)
* [Feature] Implement Spark Dataset.showString * [Bug Fix] We need to run pipeline write in spark35 * [Refacotr] Rework stats collection. 1. Add DeltaStats, we don't collect partition columns stats 2. Add NativeStatCompute * [Feature] Allow collect parquet stats when writing delta parquets * [Refactor] Parameterized SQL * [UT] Verify Delta stats * [Refactor] using GlutenConfig.NATIVE_WRITER_ENABLED.key instead of spark.gluten.sql.native.writer.enabled * [Bug Fix] Only create output file when there is actual output chunks. * [Bug Fix] Minor
1 parent d5e5544 commit e872cd8

File tree

37 files changed

+1294
-1845
lines changed

37 files changed

+1294
-1845
lines changed

backends-clickhouse/pom.xml

+12
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,18 @@
426426
</sources>
427427
</configuration>
428428
</execution>
429+
<execution>
430+
<id>add-test-sources</id>
431+
<phase>generate-test-sources</phase>
432+
<goals>
433+
<goal>add-test-source</goal>
434+
</goals>
435+
<configuration>
436+
<sources>
437+
<source>src/test/delta-${delta.binary.version}</source>
438+
</sources>
439+
</configuration>
440+
</execution>
429441
</executions>
430442
</plugin>
431443
<plugin>

backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala

+146-126
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.sql.delta.files._
2929
import org.apache.spark.sql.delta.hooks.AutoCompact
3030
import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, InvariantViolationException}
3131
import org.apache.spark.sql.delta.sources.DeltaSQLConf
32-
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
3332
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution}
3433
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
3534
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, GlutenWriterColumnarRules, WriteFiles, WriteJobStatsTracker}
@@ -50,6 +49,9 @@ class ClickhouseOptimisticTransaction(
5049
override val snapshot: Snapshot)
5150
extends OptimisticTransaction(deltaLog, catalogTable, snapshot) {
5251

52+
private lazy val writingMergeTree =
53+
ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)
54+
5355
def this(
5456
deltaLog: DeltaLog,
5557
catalogTable: Option[CatalogTable],
@@ -62,119 +64,137 @@ class ClickhouseOptimisticTransaction(
6264
}
6365

6466
override def writeFiles(
67+
inputData: Dataset[_],
68+
writeOptions: Option[DeltaOptions],
69+
isOptimize: Boolean,
70+
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
71+
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
72+
if (writingMergeTree) {
73+
if (isOptimize) {
74+
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")
75+
}
76+
// TODO: update FallbackByBackendSettings for mergetree always return true
77+
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
78+
if (onePipeline)
79+
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
80+
else
81+
writeMergeTree(inputData, writeOptions, additionalConstraints)
82+
} else {
83+
if (isOptimize || !nativeWrite) {
84+
super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints)
85+
} else {
86+
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
87+
}
88+
}
89+
}
90+
91+
@deprecated("Use pipelineWriteFiles instead")
92+
private def writeMergeTree(
6593
inputData: Dataset[_],
6694
writeOptions: Option[DeltaOptions],
6795
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
6896

69-
// TODO: update FallbackByBackendSettings for mergetree always return true
70-
val onePipeline = GlutenConfig.getConf.enableNativeWriter.getOrElse(
71-
false) && CHConf.get.enableOnePipelineMergeTreeWrite
72-
73-
if (!onePipeline && ClickHouseConfig.isMergeTreeFormatEngine(metadata.configuration)) {
74-
hasWritten = true
75-
76-
val spark = inputData.sparkSession
77-
val (data, partitionSchema) = performCDCPartition(inputData)
78-
val outputPath = deltaLog.dataPath
79-
80-
val (queryExecution, output, generatedColumnConstraints, _) =
81-
normalizeData(deltaLog, writeOptions, data)
82-
83-
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
84-
val committer =
85-
new MergeTreeDelayedCommitProtocol(
86-
outputPath.toString,
87-
None,
88-
None,
89-
tableV2.dataBaseName,
90-
tableV2.tableName)
91-
92-
// val (optionalStatsTracker, _) =
93-
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
94-
val (optionalStatsTracker, _) = (None, None)
95-
96-
val constraints =
97-
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
98-
99-
SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
100-
val queryPlan = queryExecution.executedPlan
101-
val (newQueryPlan, newOutput) =
102-
MergeTreeWriterInjects.insertFakeRowAdaptor(queryPlan, output)
103-
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput)
104-
val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput)
105-
106-
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
107-
108-
if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
109-
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
110-
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
111-
BasicWriteJobStatsTracker.metrics)
112-
// registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
113-
statsTrackers.append(basicWriteJobStatsTracker)
114-
}
97+
hasWritten = true
11598

116-
// Iceberg spec requires partition columns in data files
117-
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
118-
// Retain only a minimal selection of Spark writer options to avoid any potential
119-
// compatibility issues
120-
var options = (writeOptions match {
121-
case None => Map.empty[String, String]
122-
case Some(writeOptions) =>
123-
writeOptions.options.filterKeys {
124-
key =>
125-
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
126-
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
127-
}.toMap
128-
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)
129-
130-
spark.conf.getAll.foreach(
131-
entry => {
132-
if (
133-
CHConf.startWithSettingsPrefix(entry._1)
134-
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
135-
) {
136-
options += (entry._1 -> entry._2)
137-
}
138-
})
139-
140-
try {
141-
val format = tableV2.getFileFormat(protocol, metadata)
142-
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName()))
143-
MergeTreeFileFormatWriter.write(
144-
sparkSession = spark,
145-
plan = newQueryPlan,
146-
fileFormat = format,
147-
// formats.
148-
committer = committer,
149-
outputSpec = outputSpec,
150-
// scalastyle:off deltahadoopconfiguration
151-
hadoopConf = spark.sessionState
152-
.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
153-
// scalastyle:on deltahadoopconfiguration
154-
partitionColumns = partitioningColumns,
155-
bucketSpec =
156-
tableV2.normalizedBucketSpec(output.map(_.name), spark.sessionState.conf.resolver),
157-
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
158-
options = options,
159-
constraints = constraints
160-
)
161-
} catch {
162-
case s: SparkException =>
163-
// Pull an InvariantViolationException up to the top level if it was the root cause.
164-
val violationException = ExceptionUtils.getRootCause(s)
165-
if (violationException.isInstanceOf[InvariantViolationException]) {
166-
throw violationException
167-
} else {
168-
throw s
169-
}
170-
} finally {
171-
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
172-
}
99+
val spark = inputData.sparkSession
100+
val (data, partitionSchema) = performCDCPartition(inputData)
101+
val outputPath = deltaLog.dataPath
102+
103+
val (queryExecution, output, generatedColumnConstraints, _) =
104+
normalizeData(deltaLog, writeOptions, data)
105+
106+
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
107+
val committer =
108+
new MergeTreeDelayedCommitProtocol(
109+
outputPath.toString,
110+
None,
111+
None,
112+
tableV2.dataBaseName,
113+
tableV2.tableName)
114+
115+
// val (optionalStatsTracker, _) =
116+
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
117+
val (optionalStatsTracker, _) = (None, None)
118+
119+
val constraints =
120+
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
121+
122+
SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) {
123+
val queryPlan = queryExecution.executedPlan
124+
val (newQueryPlan, newOutput) =
125+
MergeTreeWriterInjects.insertFakeRowAdaptor(queryPlan, output)
126+
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, newOutput)
127+
val partitioningColumns = getPartitioningColumns(partitionSchema, newOutput)
128+
129+
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
130+
131+
if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
132+
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
133+
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
134+
BasicWriteJobStatsTracker.metrics)
135+
// registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
136+
statsTrackers.append(basicWriteJobStatsTracker)
137+
}
138+
139+
// Iceberg spec requires partition columns in data files
140+
val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
141+
// Retain only a minimal selection of Spark writer options to avoid any potential
142+
// compatibility issues
143+
var options = (writeOptions match {
144+
case None => Map.empty[String, String]
145+
case Some(writeOptions) =>
146+
writeOptions.options.filterKeys {
147+
key =>
148+
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
149+
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
150+
}.toMap
151+
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)
152+
153+
spark.conf.getAll.foreach(
154+
entry => {
155+
if (
156+
CHConf.startWithSettingsPrefix(entry._1)
157+
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
158+
) {
159+
options += (entry._1 -> entry._2)
160+
}
161+
})
162+
163+
try {
164+
val format = tableV2.getFileFormat(protocol, metadata)
165+
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, Some(format.shortName()))
166+
MergeTreeFileFormatWriter.write(
167+
sparkSession = spark,
168+
plan = newQueryPlan,
169+
fileFormat = format,
170+
// formats.
171+
committer = committer,
172+
outputSpec = outputSpec,
173+
// scalastyle:off deltahadoopconfiguration
174+
hadoopConf = spark.sessionState
175+
.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options),
176+
// scalastyle:on deltahadoopconfiguration
177+
partitionColumns = partitioningColumns,
178+
bucketSpec =
179+
tableV2.normalizedBucketSpec(output.map(_.name), spark.sessionState.conf.resolver),
180+
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
181+
options = options,
182+
constraints = constraints
183+
)
184+
} catch {
185+
case s: SparkException =>
186+
// Pull an InvariantViolationException up to the top level if it was the root cause.
187+
val violationException = ExceptionUtils.getRootCause(s)
188+
if (violationException.isInstanceOf[InvariantViolationException]) {
189+
throw violationException
190+
} else {
191+
throw s
192+
}
193+
} finally {
194+
GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
173195
}
174-
committer.addedStatuses.toSeq ++ committer.changeFiles
175-
} else {
176-
super.writeFiles(inputData, writeOptions, additionalConstraints)
177196
}
197+
committer.addedStatuses.toSeq ++ committer.changeFiles
178198
}
179199

180200
private def shouldOptimizeWrite(
@@ -188,16 +208,21 @@ class ClickhouseOptimisticTransaction(
188208
override protected def getCommitter(outputPath: Path): DelayedCommitProtocol =
189209
new FileDelayedCommitProtocol("delta", outputPath.toString, None, deltaDataSubdir)
190210

191-
override def writeFiles(
211+
private def getCommitter2(outputPath: Path): DelayedCommitProtocol = {
212+
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
213+
new MergeTreeDelayedCommitProtocol2(
214+
outputPath.toString,
215+
None,
216+
deltaDataSubdir,
217+
tableV2.dataBaseName,
218+
tableV2.tableName)
219+
}
220+
221+
private def pipelineWriteFiles(
192222
inputData: Dataset[_],
193223
writeOptions: Option[DeltaOptions],
194224
isOptimize: Boolean,
195225
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
196-
197-
if (isOptimize) {
198-
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")
199-
}
200-
201226
hasWritten = true
202227

203228
val spark = inputData.sparkSession
@@ -229,24 +254,19 @@ class ClickhouseOptimisticTransaction(
229254
WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, Map.empty)
230255

231256
val queryExecution = new QueryExecution(spark, write)
232-
val committer = fileFormat.toString match {
233-
case "MergeTree" =>
234-
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
235-
new MergeTreeDelayedCommitProtocol2(
236-
outputPath.toString,
237-
None,
238-
deltaDataSubdir,
239-
tableV2.dataBaseName,
240-
tableV2.tableName)
241-
case _ => getCommitter(outputPath)
257+
val (committer, collectStats) = fileFormat.toString match {
258+
case "MergeTree" => (getCommitter2(outputPath), false)
259+
case _ => (getCommitter(outputPath), true)
242260
}
243261

244262
// If Statistics Collection is enabled, then create a stats tracker that will be injected during
245263
// the FileFormatWriter.write call below and will collect per-file stats using
246264
// StatisticsCollection
247-
// val (optionalStatsTracker, _) =
248-
// getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
249-
val optionalStatsTracker: Option[DeltaJobStatisticsTracker] = None
265+
val (optionalStatsTracker, _) = if (collectStats) {
266+
getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
267+
} else {
268+
(None, None)
269+
}
250270

251271
val constraints =
252272
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints

0 commit comments

Comments
 (0)