Skip to content

Commit b554236

Browse files
committed
update
2 parents ff6aec7 + eac2785 commit b554236

File tree

301 files changed

+11988
-3603
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

301 files changed

+11988
-3603
lines changed

.github/workflows/velox_backend.yml

+177-57
Large diffs are not rendered by default.

backends-clickhouse/pom.xml

+32-13
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,28 @@
9292
</dependency>
9393
</dependencies>
9494
</profile>
95+
<profile>
96+
<id>delta</id>
97+
<dependencies>
98+
<dependency>
99+
<groupId>org.apache.gluten</groupId>
100+
<artifactId>gluten-delta</artifactId>
101+
<version>${project.version}</version>
102+
</dependency>
103+
<dependency>
104+
<groupId>org.apache.gluten</groupId>
105+
<artifactId>gluten-delta</artifactId>
106+
<version>${project.version}</version>
107+
<type>test-jar</type>
108+
<scope>test</scope>
109+
</dependency>
110+
<dependency>
111+
<groupId>io.delta</groupId>
112+
<artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
113+
<scope>provided</scope>
114+
</dependency>
115+
</dependencies>
116+
</profile>
95117
</profiles>
96118

97119
<dependencies>
@@ -464,15 +486,18 @@
464486
<includes>
465487
<include>src/main/scala/**/*.scala</include>
466488
<include>src/test/scala/**/*.scala</include>
467-
<include>src-delta-${delta.binary.version}/main/delta/**/*.scala</include>
468-
<include>src-delta-${delta.binary.version}/test/delta/**/*.scala</include>
489+
<include>src-delta/main/scala/**/*.scala</include>
490+
<include>src-delta/test/scala/**/*.scala</include>
491+
<include>src-delta-${delta.binary.version}/main/scala/**/*.scala</include>
492+
<include>src-delta-${delta.binary.version}/test/scala/**/*.scala</include>
469493
</includes>
470494
<excludes>
471-
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/commands/*.scala</exclude>
472-
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
473-
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/stats/*.scala</exclude>
474-
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
475-
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/Snapshot.scala</exclude>
495+
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/commands/*.scala</exclude>
496+
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
497+
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/stats/*.scala</exclude>
498+
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
499+
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/Snapshot.scala</exclude>
500+
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala</exclude>
476501
</excludes>
477502
</scala>
478503
</configuration>
@@ -490,12 +515,6 @@
490515
<goals>
491516
<goal>test</goal>
492517
</goals>
493-
<configuration>
494-
<systemProperties>
495-
<clickhouse.lib.path>${clickhouse.lib.path}</clickhouse.lib.path>
496-
<tpcds.data.path>${tpcds.data.path}</tpcds.data.path>
497-
</systemProperties>
498-
</configuration>
499518
</execution>
500519
</executions>
501520
</plugin>

backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.shuffle
1818

19-
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
19+
import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConfig}
2020
import org.apache.gluten.execution.ColumnarNativeIterator
2121
import org.apache.gluten.memory.CHThreadGroup
2222
import org.apache.gluten.vectorized._
@@ -29,7 +29,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
2929
import org.apache.celeborn.client.ShuffleClient
3030
import org.apache.celeborn.common.CelebornConf
3131
import org.apache.celeborn.common.protocol.ShuffleMode
32-
import org.apache.gluten.config.GlutenConfig
3332

3433
import java.io.IOException
3534
import java.util.Locale
@@ -80,10 +79,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
8079
nativeBufferSize,
8180
capitalizedCompressionCodec,
8281
compressionLevel,
83-
GlutenConfig.get.chColumnarShuffleSpillThreshold,
82+
CHConfig.get.chColumnarShuffleSpillThreshold,
8483
CHBackendSettings.shuffleHashAlgorithm,
8584
celebornPartitionPusher,
86-
GlutenConfig.get.chColumnarForceMemorySortShuffle
85+
CHConfig.get.chColumnarForceMemorySortShuffle
8786
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
8887
)
8988

Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.gluten.sql.shims.delta20.Delta20ShimProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.sql.shims.delta20
18+
19+
import org.apache.gluten.sql.shims.{DeltaShimProvider, DeltaShims}
20+
21+
class Delta20ShimProvider extends DeltaShimProvider {
22+
23+
override def createShim: DeltaShims = {
24+
new Delta20Shims()
25+
}
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.sql.shims.delta20
18+
19+
import org.apache.gluten.sql.shims.DeltaShims
20+
21+
class Delta20Shims extends DeltaShims {}

backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.delta
1818

19-
import org.apache.gluten.backendsapi.clickhouse.CHConf
19+
import org.apache.gluten.backendsapi.clickhouse.CHConfig
2020

2121
import org.apache.spark.SparkException
2222
import org.apache.spark.sql.Dataset
@@ -110,7 +110,7 @@ class ClickhouseOptimisticTransaction(
110110
spark.conf.getAll.foreach(
111111
entry => {
112112
if (
113-
CHConf.startWithSettingsPrefix(entry._1)
113+
CHConfig.startWithSettingsPrefix(entry._1)
114114
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
115115
) {
116116
options += (entry._1 -> entry._2)

backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDi
2929
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
3030
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
3131
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
32+
import org.apache.spark.sql.types.StructType
3233
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3334
import org.apache.spark.util.collection.BitSet
35+
3436
import org.apache.hadoop.fs.Path
35-
import org.apache.spark.sql.types.StructType
3637

3738
import java.{util => ju}
39+
3840
import scala.collection.JavaConverters._
3941

4042
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.gluten.sql.shims.delta23.Delta23ShimProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.sql.shims.delta23
18+
19+
import org.apache.gluten.sql.shims.{DeltaShimProvider, DeltaShims}
20+
21+
class Delta23ShimProvider extends DeltaShimProvider {
22+
23+
override def createShim: DeltaShims = {
24+
new Delta23Shims()
25+
}
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.sql.shims.delta23
18+
19+
import org.apache.gluten.sql.shims.DeltaShims
20+
21+
class Delta23Shims extends DeltaShims {}

backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.delta
1818

19-
import org.apache.gluten.backendsapi.clickhouse.CHConf
19+
import org.apache.gluten.backendsapi.clickhouse.CHConfig
2020

2121
import org.apache.spark.SparkException
2222
import org.apache.spark.sql.Dataset
@@ -111,7 +111,7 @@ class ClickhouseOptimisticTransaction(
111111
spark.conf.getAll.foreach(
112112
entry => {
113113
if (
114-
CHConf.startWithSettingsPrefix(entry._1)
114+
CHConfig.startWithSettingsPrefix(entry._1)
115115
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
116116
) {
117117
options += (entry._1 -> entry._2)

backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDi
2929
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
3030
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
3131
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
32+
import org.apache.spark.sql.types.StructType
3233
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3334
import org.apache.spark.util.collection.BitSet
35+
3436
import org.apache.hadoop.fs.Path
35-
import org.apache.spark.sql.types.StructType
3637

3738
import java.{util => ju}
39+
3840
import scala.collection.JavaConverters._
3941

4042
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.gluten.sql.shims.delta32.Delta32ShimProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.sql.shims.delta32
18+
19+
import org.apache.gluten.sql.shims.{DeltaShimProvider, DeltaShims}
20+
21+
class Delta32ShimProvider extends DeltaShimProvider {
22+
23+
override def createShim: DeltaShims = {
24+
new Delta32Shims()
25+
}
26+
27+
}

gluten-ut/common/src/test/scala/org/apache/gluten/utils/SystemParameters.scala backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala

+10-16
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,19 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.gluten.utils
17+
package org.apache.gluten.sql.shims.delta32
1818

19-
object SystemParameters {
19+
import org.apache.gluten.execution.GlutenPlan
20+
import org.apache.gluten.sql.shims.DeltaShims
2021

21-
val CLICKHOUSE_LIB_PATH_KEY = "clickhouse.lib.path"
22-
val CLICKHOUSE_LIB_PATH_DEFAULT_VALUE = "/usr/local/clickhouse/lib/libch.so"
22+
import org.apache.spark.sql.execution.SparkPlan
23+
import org.apache.spark.sql.perf.DeltaOptimizedWriterTransformer
2324

24-
val TPCDS_DATA_PATH_KEY = "tpcds.data.path"
25-
val TPCDS_DATA_PATH_DEFAULT_VALUE = "/data/tpcds-data-sf1"
25+
class Delta32Shims extends DeltaShims {
26+
override def supportDeltaOptimizedWriterExec(plan: SparkPlan): Boolean =
27+
DeltaOptimizedWriterTransformer.support(plan)
2628

27-
def getClickHouseLibPath: String = {
28-
System.getProperty(
29-
SystemParameters.CLICKHOUSE_LIB_PATH_KEY,
30-
SystemParameters.CLICKHOUSE_LIB_PATH_DEFAULT_VALUE)
31-
}
32-
33-
def getTpcdsDataPath: String = {
34-
System.getProperty(
35-
SystemParameters.TPCDS_DATA_PATH_KEY,
36-
SystemParameters.TPCDS_DATA_PATH_DEFAULT_VALUE)
29+
override def offloadDeltaOptimizedWriterExec(plan: SparkPlan): GlutenPlan = {
30+
DeltaOptimizedWriterTransformer.from(plan)
3731
}
3832
}

backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.delta
1818

19-
import org.apache.gluten.backendsapi.clickhouse.CHConf
19+
import org.apache.gluten.backendsapi.clickhouse.CHConfig
2020

2121
import org.apache.spark.SparkException
2222
import org.apache.spark.sql.Dataset
@@ -71,7 +71,7 @@ class ClickhouseOptimisticTransaction(
7171
val nativeWrite = GlutenConfig.get.enableNativeWriter.getOrElse(false)
7272
if (writingMergeTree) {
7373
// TODO: update FallbackByBackendSettings for mergetree always return true
74-
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
74+
val onePipeline = nativeWrite && CHConfig.get.enableOnePipelineMergeTreeWrite
7575
if (onePipeline)
7676
pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints)
7777
else {
@@ -155,7 +155,7 @@ class ClickhouseOptimisticTransaction(
155155
spark.conf.getAll.foreach(
156156
entry => {
157157
if (
158-
CHConf.startWithSettingsPrefix(entry._1)
158+
CHConfig.startWithSettingsPrefix(entry._1)
159159
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
160160
) {
161161
options += (entry._1 -> entry._2)
@@ -267,7 +267,6 @@ class ClickhouseOptimisticTransaction(
267267
// TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
268268
val checkInvariants = empty2NullPlan
269269

270-
// TODO: DeltaOptimizedWriterExec
271270
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
272271
// evenly-balanced data files already.
273272
val physicalPlan =

backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePart
3333
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
3434
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
3535
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
36+
import org.apache.spark.sql.types.StructType
3637
import org.apache.spark.util.collection.BitSet
38+
3739
import org.apache.hadoop.fs.Path
38-
import org.apache.spark.sql.types.StructType
3940

4041
import java.{util => ju}
42+
4143
import scala.collection.JavaConverters._
4244

4345
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))

0 commit comments

Comments
 (0)