Skip to content

Commit aad64cf

Browse files
committed
[GLUTEN-8872][CH][Part-2] Support Delta Deletion Vectors read for CH backend
Part-2: Support reading deletion vectors bitmap when queryring
1 parent 21abbbc commit aad64cf

File tree

26 files changed

+671
-85
lines changed

26 files changed

+671
-85
lines changed

backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala

+45
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,59 @@ package org.apache.gluten.sql.shims.delta32
1919
import org.apache.gluten.execution.GlutenPlan
2020
import org.apache.gluten.sql.shims.DeltaShims
2121

22+
import org.apache.spark.sql.delta.DeltaParquetFileFormat
23+
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
24+
import org.apache.spark.sql.delta.util.JsonUtils
2225
import org.apache.spark.sql.execution.SparkPlan
26+
import org.apache.spark.sql.execution.datasources.PartitionedFile
2327
import org.apache.spark.sql.perf.DeltaOptimizedWriterTransformer
2428

29+
import org.apache.hadoop.fs.Path
30+
31+
import java.util.{HashMap => JHashMap, Map => JMap}
32+
33+
import scala.collection.JavaConverters._
34+
2535
class Delta32Shims extends DeltaShims {
2636
override def supportDeltaOptimizedWriterExec(plan: SparkPlan): Boolean =
2737
DeltaOptimizedWriterTransformer.support(plan)
2838

2939
override def offloadDeltaOptimizedWriterExec(plan: SparkPlan): GlutenPlan = {
3040
DeltaOptimizedWriterTransformer.from(plan)
3141
}
42+
43+
/**
44+
* decode ZeroMQ Base85 encoded file path
45+
*
46+
* TODO: native size needs to support the ZeroMQ Base85
47+
*/
48+
override def convertRowIndexFilterIdEncoded(
49+
partitionColsCnt: Int,
50+
file: PartitionedFile,
51+
otherConstantMetadataColumnValues: JMap[String, Object]): JMap[String, Object] = {
52+
val newOtherConstantMetadataColumnValues: JMap[String, Object] =
53+
new JHashMap[String, Object]
54+
for ((k, v) <- otherConstantMetadataColumnValues.asScala) {
55+
if (k.equalsIgnoreCase(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) {
56+
val decoded = JsonUtils.fromJson[DeletionVectorDescriptor](v.toString)
57+
var filePath = new Path(file.filePath.toString()).getParent
58+
for (_ <- 0 until partitionColsCnt) {
59+
filePath = filePath.getParent
60+
}
61+
val decodedPath = decoded.absolutePath(filePath)
62+
val newDeletionVectorDescriptor = decoded.copy(
63+
decoded.storageType,
64+
decodedPath.toUri.toASCIIString,
65+
decoded.offset,
66+
decoded.sizeInBytes,
67+
decoded.cardinality,
68+
decoded.maxRowIndex
69+
)
70+
newOtherConstantMetadataColumnValues.put(k, JsonUtils.toJson(newDeletionVectorDescriptor))
71+
} else {
72+
newOtherConstantMetadataColumnValues.put(k, v)
73+
}
74+
}
75+
newOtherConstantMetadataColumnValues
76+
}
3277
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.gluten.delta
18+
19+
import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
20+
21+
import org.apache.spark.SparkConf
22+
import org.apache.spark.sql.delta.files.TahoeFileIndex
23+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
24+
25+
// Some sqls' line length exceeds 100
26+
// scalastyle:off line.size.limit
27+
28+
class GlutenDeltaParquetDeletionVectorSuite
29+
extends GlutenClickHouseTPCHAbstractSuite
30+
with AdaptiveSparkPlanHelper {
31+
32+
override protected val needCopyParquetToTablePath = true
33+
34+
override protected val tablesPath: String = basePath + "/tpch-data"
35+
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
36+
override protected val queriesResults: String = rootPath + "mergetree-queries-output"
37+
38+
// import org.apache.gluten.backendsapi.clickhouse.CHConfig._
39+
40+
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
41+
override protected def sparkConf: SparkConf = {
42+
super.sparkConf
43+
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
44+
.set("spark.io.compression.codec", "LZ4")
45+
.set("spark.sql.shuffle.partitions", "5")
46+
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
47+
.set("spark.sql.adaptive.enabled", "true")
48+
.set("spark.sql.files.maxPartitionBytes", "20000000")
49+
.set("spark.sql.storeAssignmentPolicy", "legacy")
50+
// .setCHConfig("use_local_format", true)
51+
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
52+
}
53+
54+
override protected def createTPCHNotNullTables(): Unit = {
55+
createNotNullTPCHTablesInParquet(tablesPath)
56+
}
57+
58+
private val q1SchemaString: String =
59+
s""" l_orderkey bigint,
60+
| l_partkey bigint,
61+
| l_suppkey bigint,
62+
| l_linenumber bigint,
63+
| l_quantity double,
64+
| l_extendedprice double,
65+
| l_discount double,
66+
| l_tax double,
67+
| l_returnflag string,
68+
| l_linestatus string,
69+
| l_shipdate date,
70+
| l_commitdate date,
71+
| l_receiptdate date,
72+
| l_shipinstruct string,
73+
| l_shipmode string,
74+
| l_comment string""".stripMargin
75+
76+
test("test parquet table delete with the delta DV") {
77+
spark.sql(s"""
78+
|set spark.gluten.enabled=false;
79+
|""".stripMargin)
80+
spark.sql(s"""
81+
|DROP TABLE IF EXISTS lineitem_delta_parquet_delete_dv;
82+
|""".stripMargin)
83+
84+
spark.sql(s"""
85+
|CREATE TABLE IF NOT EXISTS lineitem_delta_parquet_delete_dv
86+
|($q1SchemaString)
87+
|USING delta
88+
|TBLPROPERTIES (delta.enableDeletionVectors='true')
89+
|LOCATION '$basePath/lineitem_delta_parquet_delete_dv'
90+
|""".stripMargin)
91+
92+
spark.sql(s"""
93+
| insert into table lineitem_delta_parquet_delete_dv
94+
| select /*+ REPARTITION(6) */ * from lineitem
95+
|""".stripMargin)
96+
97+
val df1 = spark.sql(s"""
98+
| delete from lineitem_delta_parquet_delete_dv
99+
| where mod(l_orderkey, 3) = 1 and l_orderkey < 100
100+
|""".stripMargin)
101+
spark.sql(s"""
102+
|set spark.gluten.enabled=true;
103+
|""".stripMargin)
104+
105+
val df = spark.sql(s"""
106+
| select sum(l_linenumber) from lineitem_delta_parquet_delete_dv
107+
|""".stripMargin)
108+
val result = df.collect()
109+
assert(
110+
result.apply(0).get(0) === 1802335
111+
)
112+
val scanExec = collect(df.queryExecution.executedPlan) {
113+
case f: FileSourceScanExecTransformer => f
114+
}
115+
val parquetScan = scanExec.head
116+
val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
117+
val addFiles = fileIndex.matchingFiles(Nil, Nil)
118+
assert(addFiles.size === 6)
119+
120+
spark.sql(s"""
121+
|set spark.gluten.enabled=false;
122+
|""".stripMargin)
123+
spark.sql(s"""
124+
| delete from lineitem_delta_parquet_delete_dv where mod(l_orderkey, 3) = 2
125+
|""".stripMargin)
126+
spark.sql(s"""
127+
|set spark.gluten.enabled=true;
128+
|""".stripMargin)
129+
130+
val df3 = spark.sql(s"""
131+
| select sum(l_linenumber) from lineitem_delta_parquet_delete_dv
132+
|""".stripMargin)
133+
assert(
134+
df3.collect().apply(0).get(0) === 1200560
135+
)
136+
}
137+
138+
test("test parquet partition table delete with the delta DV") {
139+
withSQLConf(("spark.sql.sources.partitionOverwriteMode", "dynamic")) {
140+
spark.sql(s"""
141+
|set spark.gluten.enabled=false;
142+
|""".stripMargin)
143+
spark.sql(s"""
144+
|DROP TABLE IF EXISTS lineitem_delta_partition_parquet_delete_dv;
145+
|""".stripMargin)
146+
147+
spark.sql(s"""
148+
|CREATE TABLE IF NOT EXISTS lineitem_delta_partition_parquet_delete_dv
149+
|($q1SchemaString)
150+
|USING delta
151+
|PARTITIONED BY (l_returnflag)
152+
|TBLPROPERTIES (delta.enableDeletionVectors='true')
153+
|LOCATION '$basePath/lineitem_delta_partition_parquet_delete_dv'
154+
|""".stripMargin)
155+
156+
spark.sql(s"""
157+
| insert into table lineitem_delta_partition_parquet_delete_dv
158+
| select /*+ REPARTITION(6) */ * from lineitem
159+
|""".stripMargin)
160+
161+
val df1 = spark.sql(s"""
162+
| delete from lineitem_delta_partition_parquet_delete_dv
163+
| where mod(l_orderkey, 3) = 1
164+
|""".stripMargin)
165+
spark.sql(s"""
166+
|set spark.gluten.enabled=true;
167+
|""".stripMargin)
168+
169+
val df =
170+
spark.sql(s"""
171+
| select sum(l_linenumber) from lineitem_delta_partition_parquet_delete_dv
172+
|""".stripMargin)
173+
val result = df.collect()
174+
assert(
175+
result.apply(0).get(0) === 1201486
176+
)
177+
val scanExec = collect(df.queryExecution.executedPlan) {
178+
case f: FileSourceScanExecTransformer => f
179+
}
180+
assert(scanExec.nonEmpty)
181+
}
182+
}
183+
}
184+
// scalastyle:off line.size.limit

backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package org.apache.gluten.sql.shims
1919
import org.apache.gluten.execution.GlutenPlan
2020

2121
import org.apache.spark.sql.execution.SparkPlan
22+
import org.apache.spark.sql.execution.datasources.PartitionedFile
23+
24+
import java.util.{HashMap => JHashMap, Map => JMap}
2225

2326
sealed abstract class ShimDescriptor
2427

@@ -29,4 +32,10 @@ trait DeltaShims {
2932
throw new UnsupportedOperationException(
3033
s"Can't transform ColumnarDeltaOptimizedWriterExec from ${plan.getClass.getSimpleName}")
3134
}
35+
36+
def convertRowIndexFilterIdEncoded(
37+
partitionColsCnt: Int,
38+
file: PartitionedFile,
39+
otherConstantMetadataColumnValues: JMap[String, Object]): JMap[String, Object] =
40+
new JHashMap[String, Object]()
3241
}

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala

+12-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.gluten.execution._
2222
import org.apache.gluten.expression.ConverterUtils
2323
import org.apache.gluten.logging.LogLevelUtil
2424
import org.apache.gluten.metrics.IMetrics
25-
import org.apache.gluten.sql.shims.SparkShimLoader
25+
import org.apache.gluten.sql.shims.{DeltaShimLoader, SparkShimLoader}
2626
import org.apache.gluten.substrait.plan.PlanNode
2727
import org.apache.gluten.substrait.rel._
2828
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -159,6 +159,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
159159
val modificationTimes = new JArrayList[JLong]()
160160
val partitionColumns = new JArrayList[JMap[String, String]]
161161
val metadataColumns = new JArrayList[JMap[String, String]]
162+
val otherMetadataColumns = new JArrayList[JMap[String, Object]]
162163
f.files.foreach {
163164
file =>
164165
paths.add(new URI(file.filePath.toString()).toASCIIString)
@@ -203,6 +204,14 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
203204
fileSizes.add(0)
204205
modificationTimes.add(0)
205206
}
207+
208+
val otherConstantMetadataColumnValues =
209+
DeltaShimLoader.getDeltaShims.convertRowIndexFilterIdEncoded(
210+
partitionColumn.size(),
211+
file,
212+
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
213+
)
214+
otherMetadataColumns.add(otherConstantMetadataColumnValues)
206215
}
207216
val preferredLocations =
208217
CHAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())
@@ -217,7 +226,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
217226
metadataColumns,
218227
fileFormat,
219228
preferredLocations.toList.asJava,
220-
mapAsJavaMap(properties)
229+
mapAsJavaMap(properties),
230+
otherMetadataColumns
221231
)
222232
case _ =>
223233
throw new UnsupportedOperationException(s"Unsupported input partition: $partition.")

backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,19 @@ case class CHInputPartitionsUtil(
7070
SparkShimLoader.getSparkShims.getFileStatus(partition).flatMap {
7171
file =>
7272
// getPath() is very expensive so we only want to call it once in this block:
73-
val filePath = file.getPath
73+
val filePath = file._1.getPath
7474

7575
if (shouldProcess(filePath)) {
7676
val isSplitable =
7777
relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)
7878
SparkShimLoader.getSparkShims.splitFiles(
7979
relation.sparkSession,
80-
file,
80+
file._1,
8181
filePath,
8282
isSplitable,
8383
maxSplitBytes,
84-
partition.values
84+
partition.values,
85+
file._2
8586
)
8687
} else {
8788
Seq.empty

backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ case class GlutenCacheFilesCommand(
121121
new JArrayList[JMap[String, String]](),
122122
ReadFileFormat.ParquetReadFormat, // ignore format in backend
123123
new JArrayList[String](),
124-
new JHashMap[String, String]()
124+
new JHashMap[String, String](),
125+
new JArrayList[JMap[String, Object]]()
125126
)
126127

127128
(executorId, localFile)

0 commit comments

Comments
 (0)