Skip to content

Commit 0530292

Browse files
authored
[GLUTEN-7028][CH][Part-10] Collecting Delta stats for MergeTree (#8029)
* [Refactor] Add SparkRowInfoJNI for later use * [Minor] RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS and RuntimeConfig.LOGGER_LEVEL * [Minor] Using one pipeline write for spark 35, ignore some failed tests now * [Fix Bug] Only output NO_PARTITION_ID at collectStats to avoid name bug in mergeParts * [Refactor] mergeParts return MergeTreeDataPartPtr instead of vector<MergeTreeDataPartPtr> * [Minor] debug::dumpMemoryUsage and debug::printBlockHeader * [Feature] SparkMergeTreeSink add DeltaStats * [Feature] collect DeltaStats for mergetree * [Feature] one pipeline write for parquet optimize * [Feature] one pipeline write for mergetree optimize * Fix ut
1 parent dcd356c commit 0530292

File tree

48 files changed

+1949
-459
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1949
-459
lines changed

backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala

+37-28
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.util.SerializableConfiguration
3939

4040
import org.apache.commons.lang3.exception.ExceptionUtils
4141
import org.apache.hadoop.fs.Path
42+
import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
4243

4344
import scala.collection.mutable.ListBuffer
4445

@@ -69,20 +70,22 @@ class ClickhouseOptimisticTransaction(
6970
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
7071
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
7172
if (writingMergeTree) {
72-
if (isOptimize) {
73-
throw new UnsupportedOperationException("Optimize is not supported for ClickHouse")
74-
}
7573
// TODO: update FallbackByBackendSettings for mergetree always return true
7674
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
7775
if (onePipeline)
78-
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
79-
else
76+
pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints)
77+
else {
78+
if (isOptimize) {
79+
throw new UnsupportedOperationException(
80+
"Optimize is only supported in one pipeline native write mode")
81+
}
8082
writeMergeTree(inputData, writeOptions, additionalConstraints)
83+
}
8184
} else {
82-
if (isOptimize || !nativeWrite) {
83-
super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints)
85+
if (nativeWrite) {
86+
pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints)
8487
} else {
85-
pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints)
88+
super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints)
8689
}
8790
}
8891
}
@@ -217,6 +220,19 @@ class ClickhouseOptimisticTransaction(
217220
tableV2.tableName)
218221
}
219222

223+
/**
224+
* Writes out the dataframe in pipeline mode after performing schema validation.Returns a list of
225+
* actions to append these files to the reservoir.
226+
*
227+
* @param inputData
228+
* Data to write out.
229+
* @param writeOptions
230+
* Options to decide how to write out the data.
231+
* @param isOptimize
232+
* Whether the operation writing this is Optimize or not.
233+
* @param additionalConstraints
234+
* Additional constraints on the write.
235+
*/
220236
private def pipelineWriteFiles(
221237
inputData: Dataset[_],
222238
writeOptions: Option[DeltaOptions],
@@ -232,21 +248,13 @@ class ClickhouseOptimisticTransaction(
232248
normalizeData(deltaLog, writeOptions, data)
233249
val partitioningColumns = getPartitioningColumns(partitionSchema, output)
234250

235-
val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.
236-
237-
val (committer, collectStats) = fileFormat.toString match {
238-
case "MergeTree" => (getCommitter2(outputPath), false)
239-
case _ => (getCommitter(outputPath), true)
240-
}
251+
val committer = if (writingMergeTree) getCommitter2(outputPath) else getCommitter(outputPath)
241252

242253
// If Statistics Collection is enabled, then create a stats tracker that will be injected during
243254
// the FileFormatWriter.write call below and will collect per-file stats using
244255
// StatisticsCollection
245-
val (optionalStatsTracker, _) = if (collectStats) {
256+
val (optionalStatsTracker, _) =
246257
getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)
247-
} else {
248-
(None, None)
249-
}
250258

251259
val constraints =
252260
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints
@@ -259,18 +267,18 @@ class ClickhouseOptimisticTransaction(
259267
// TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
260268
val checkInvariants = empty2NullPlan
261269

270+
// TODO: DeltaOptimizedWriterExec
262271
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
263272
// evenly-balanced data files already.
264-
// TODO: val physicalPlan =
265-
// if (
266-
// !isOptimize &&
267-
// shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
268-
// ) {
269-
// DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
270-
// } else {
271-
// checkInvariants
272-
// }
273-
val physicalPlan = checkInvariants
273+
val physicalPlan =
274+
if (
275+
!isOptimize &&
276+
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
277+
) {
278+
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
279+
} else {
280+
checkInvariants
281+
}
274282

275283
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
276284

@@ -296,6 +304,7 @@ class ClickhouseOptimisticTransaction(
296304
}.toMap
297305
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)
298306

307+
val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats.
299308
val executedPlan = DeltaV1Writes(
300309
spark,
301310
physicalPlan,

backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala

+9-51
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,12 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
3939
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
4040
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
4141
import org.apache.spark.sql.execution.command.RunnableCommand
42-
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
43-
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
4442
import org.apache.spark.sql.execution.metric.SQLMetric
4543
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
4644
import org.apache.spark.sql.types._
4745
import org.apache.spark.util.{SystemClock, ThreadUtils}
4846

49-
/**
50-
* Gluten overwrite Delta:
51-
*
52-
* This file is copied from Delta 3.2.1. It is modified in:
53-
* 1. getDeltaTable supports to get ClickHouseTableV2
54-
* 2. runOptimizeBinJobClickhouse
55-
* 3. groupFilesIntoBinsClickhouse
56-
*/
47+
// TODO: Remove this file once we needn't support bucket
5748

5849
/** Base class defining abstract optimize command */
5950
abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand {
@@ -152,10 +143,7 @@ case class OptimizeTableCommand(
152143
copy(child = newChild)(zOrderBy)
153144

154145
override def run(sparkSession: SparkSession): Seq[Row] = {
155-
// --- modified start
156-
val table = OptimizeTableCommandOverwrites.getDeltaTable(child, "OPTIMIZE")
157-
// --- modified end
158-
146+
val table = getDeltaTable(child, "OPTIMIZE")
159147
val txn = table.startTransaction()
160148
if (txn.readVersion == -1) {
161149
throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString)
@@ -268,12 +256,6 @@ class OptimizeExecutor(
268256

269257
def optimize(): Seq[Row] = {
270258
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
271-
272-
// --- modified start
273-
val isMergeTreeFormat = ClickHouseConfig
274-
.isMergeTreeFormatEngine(txn.deltaLog.unsafeVolatileMetadata.configuration)
275-
// --- modified end
276-
277259
val minFileSize = optimizeContext.minFileSize.getOrElse(
278260
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
279261
val maxFileSize = optimizeContext.maxFileSize.getOrElse(
@@ -288,39 +270,15 @@ class OptimizeExecutor(
288270
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles)
289271
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
290272
}
291-
// --- modified start
292-
val maxThreads =
293-
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
294-
val (updates, jobs) = if (isMergeTreeFormat) {
295-
val partitionsToCompact = filesToProcess
296-
.groupBy(file => (file.asInstanceOf[AddMergeTreeParts].bucketNum, file.partitionValues))
297-
.toSeq
298-
val jobs = OptimizeTableCommandOverwrites
299-
.groupFilesIntoBinsClickhouse(partitionsToCompact, maxFileSize)
300-
val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) {
301-
partitionBinGroup =>
302-
// --- modified start
303-
OptimizeTableCommandOverwrites.runOptimizeBinJobClickhouse(
304-
txn,
305-
partitionBinGroup._1._2,
306-
partitionBinGroup._1._1,
307-
partitionBinGroup._2,
308-
maxFileSize)
309-
// --- modified end
310-
}.flatten
311-
// uniform the jobs type
312-
(updates, jobs.map(v => (v._1._2 ++ Map("bucketNum" -> v._1.toString()), v._2)))
313-
} else {
314-
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
273+
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
315274

316-
val jobs = groupFilesIntoBins(partitionsToCompact)
275+
val jobs = groupFilesIntoBins(partitionsToCompact)
317276

318-
val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup =>
319-
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)
320-
}.flatten
321-
(updates, jobs)
322-
}
323-
// --- modified end
277+
val maxThreads =
278+
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
279+
val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup =>
280+
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)
281+
}.flatten
324282

325283
val addedFiles = updates.collect { case a: AddFile => a }
326284
val removedFiles = updates.collect { case r: RemoveFile => r }

backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala

+26-15
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Projection, UnsafeProjection}
2727
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, DeclarativeAggregate}
2828
import org.apache.spark.sql.delta.files.{FileDelayedCommitProtocol, MergeTreeDelayedCommitProtocol2}
29-
import org.apache.spark.sql.delta.stats.DeltaFileStatistics
29+
import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker}
3030
import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, WriteJobDescription, WriteTaskResult}
31-
import org.apache.spark.sql.types.StringType
3231
import org.apache.spark.util.Utils
3332

3433
import scala.collection.mutable.ArrayBuffer
@@ -38,7 +37,7 @@ case class DeltaFileCommitInfo(committer: FileDelayedCommitProtocol)
3837
val addedFiles: ArrayBuffer[(Map[String, String], String)] =
3938
new ArrayBuffer[(Map[String, String], String)]
4039
override def apply(stat: NativeFileWriteResult): Unit = {
41-
if (stat.partition_id == "__NO_PARTITION_ID__") {
40+
if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
4241
addedFiles.append((Map.empty[String, String], stat.filename))
4342
} else {
4443
val partitionValues = committer.parsePartitions(stat.partition_id)
@@ -61,14 +60,15 @@ case class NativeDeltaStats(projection: Projection) extends (InternalRow => Unit
6160

6261
def result: DeltaFileStatistics = DeltaFileStatistics(results.toMap)
6362
}
64-
case class FileDeltaColumnarWrite(
65-
override val jobTrackerID: String,
66-
override val description: WriteJobDescription,
67-
override val committer: FileDelayedCommitProtocol)
68-
extends CHColumnarWrite[FileDelayedCommitProtocol]
69-
with Logging {
7063

71-
private lazy val nativeDeltaStats: Option[NativeDeltaStats] = {
64+
trait SupportNativeDeltaStats[T <: FileCommitProtocol] extends CHColumnarWrite[T] {
65+
66+
private lazy val deltaWriteJobStatsTracker: Option[DeltaJobStatisticsTracker] =
67+
description.statsTrackers
68+
.find(_.isInstanceOf[DeltaJobStatisticsTracker])
69+
.map(_.asInstanceOf[DeltaJobStatisticsTracker])
70+
71+
lazy val nativeDeltaStats: Option[NativeDeltaStats] = {
7272
deltaWriteJobStatsTracker
7373
.map(
7474
delta => {
@@ -77,10 +77,7 @@ case class FileDeltaColumnarWrite(
7777
if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
7878
ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression
7979
}
80-
val z = Seq(
81-
AttributeReference("filename", StringType, nullable = false)(),
82-
AttributeReference("partition_id", StringType, nullable = false)())
83-
val s =
80+
val vanillaSchema =
8481
delta.statsColExpr
8582
.collect {
8683
case ae: AggregateExpression
@@ -92,10 +89,24 @@ case class FileDeltaColumnarWrite(
9289
NativeDeltaStats(
9390
UnsafeProjection.create(
9491
exprs = Seq(r),
95-
inputSchema = z :++ s
92+
inputSchema = nativeStatsSchema(vanillaSchema)
9693
))
9794
})
9895
}
96+
97+
def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference]
98+
}
99+
100+
case class FileDeltaColumnarWrite(
101+
override val jobTrackerID: String,
102+
override val description: WriteJobDescription,
103+
override val committer: FileDelayedCommitProtocol)
104+
extends SupportNativeDeltaStats[FileDelayedCommitProtocol]
105+
with Logging {
106+
107+
override def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] =
108+
NativeFileWriteResult.nativeStatsSchema(vanilla)
109+
99110
override def doSetupNativeTask(): Unit = {
100111
assert(description.path == committer.outputPath)
101112
val nameSpec = CreateFileNameSpec(taskAttemptContext, description)

0 commit comments

Comments
 (0)