Skip to content

Commit c2bf8f0

Browse files
authoredDec 20, 2024··
[VL] Delta support / Hudi support as Gluten components (#8282)
1 parent 4e0aab0 commit c2bf8f0

File tree

24 files changed

+206
-226
lines changed

24 files changed

+206
-226
lines changed
 

‎backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala

+23
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
2525
import org.apache.gluten.expression.WindowFunctionsBuilder
2626
import org.apache.gluten.extension.ValidationResult
2727
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc}
28+
import org.apache.gluten.substrait.rel.LocalFilesNode
2829
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
2930
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
3031

@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
3435
import org.apache.spark.sql.catalyst.expressions._
3536
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
3637
import org.apache.spark.sql.catalyst.plans._
38+
import org.apache.spark.sql.connector.read.Scan
3739
import org.apache.spark.sql.execution.SparkPlan
3840
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
3941
import org.apache.spark.sql.execution.datasources.FileFormat
@@ -198,6 +200,27 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
198200
}
199201
}
200202

203+
override def getSubstraitReadFileFormatV1(
204+
fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
205+
fileFormat.getClass.getSimpleName match {
206+
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
207+
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
208+
case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
209+
case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
210+
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
211+
case _ => ReadFileFormat.UnknownFormat
212+
}
213+
}
214+
215+
override def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat = {
216+
scan.getClass.getSimpleName match {
217+
case "OrcScan" => ReadFileFormat.OrcReadFormat
218+
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
219+
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
220+
case _ => ReadFileFormat.UnknownFormat
221+
}
222+
}
223+
201224
override def supportWriteFilesExec(
202225
format: FileFormat,
203226
fields: Array[StructField],

‎backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala

-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ object CHRuleApi {
9393

9494
// Legacy: Post-transform rules.
9595
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
96-
injector.injectPostTransform(c => intercept(RewriteTransformer.apply(c.session)))
9796
injector.injectPostTransform(_ => PushDownFilterToScan)
9897
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
9998
injector.injectPostTransform(_ => EnsureLocalSortRequirements)

‎backends-velox/src-delta/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDeltaComponent

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gluten.component
19+
20+
import org.apache.gluten.backendsapi.velox.VeloxBackend
21+
import org.apache.gluten.execution.OffloadDeltaScan
22+
import org.apache.gluten.extension.DeltaPostTransformRules
23+
import org.apache.gluten.extension.columnar.enumerated.RasOffload
24+
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
25+
import org.apache.gluten.extension.columnar.validator.Validators
26+
import org.apache.gluten.extension.injector.Injector
27+
28+
import org.apache.spark.sql.execution.FileSourceScanExec
29+
30+
class VeloxDeltaComponent extends Component {
31+
override def name(): String = "velox-delta"
32+
override def buildInfo(): Component.BuildInfo =
33+
Component.BuildInfo("VeloxDelta", "N/A", "N/A", "N/A")
34+
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
35+
override def injectRules(injector: Injector): Unit = {
36+
val legacy = injector.gluten.legacy
37+
val ras = injector.gluten.ras
38+
legacy.injectTransform {
39+
c =>
40+
val offload = Seq(OffloadDeltaScan())
41+
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
42+
}
43+
ras.injectRasRule {
44+
c =>
45+
RasOffload.Rule(
46+
RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
47+
Validators.newValidator(c.glutenConf),
48+
Nil)
49+
}
50+
DeltaPostTransformRules.rules.foreach {
51+
r =>
52+
legacy.injectPostTransform(_ => r)
53+
ras.injectPostTransform(_ => r)
54+
}
55+
}
56+
}

‎backends-velox/src-hudi/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxHudiComponent

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gluten.component
19+
20+
import org.apache.gluten.backendsapi.velox.VeloxBackend
21+
import org.apache.gluten.execution.OffloadHudiScan
22+
import org.apache.gluten.extension.columnar.enumerated.RasOffload
23+
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
24+
import org.apache.gluten.extension.columnar.validator.Validators
25+
import org.apache.gluten.extension.injector.Injector
26+
27+
import org.apache.spark.sql.execution.FileSourceScanExec
28+
29+
class VeloxHudiComponent extends Component {
30+
override def name(): String = "velox-hudi"
31+
override def buildInfo(): Component.BuildInfo =
32+
Component.BuildInfo("VeloxHudi", "N/A", "N/A", "N/A")
33+
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
34+
override def injectRules(injector: Injector): Unit = {
35+
val legacy = injector.gluten.legacy
36+
val ras = injector.gluten.ras
37+
legacy.injectTransform {
38+
c =>
39+
val offload = Seq(OffloadHudiScan())
40+
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
41+
}
42+
ras.injectRasRule {
43+
c =>
44+
RasOffload.Rule(
45+
RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
46+
Validators.newValidator(c.glutenConf),
47+
Nil)
48+
}
49+
}
50+
}

‎backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala

+23-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.gluten.expression.WindowFunctionsBuilder
2727
import org.apache.gluten.extension.ValidationResult
2828
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc}
2929
import org.apache.gluten.sql.shims.SparkShimLoader
30+
import org.apache.gluten.substrait.rel.LocalFilesNode
3031
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
3132
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}
3233
import org.apache.gluten.utils._
@@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, De
3637
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Percentile}
3738
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
3839
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
40+
import org.apache.spark.sql.connector.read.Scan
3941
import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, SparkPlan}
4042
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
4143
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -173,7 +175,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
173175
}
174176
validateTypes(typeValidator)
175177
}
176-
case _ => Some(s"Unsupported file format for $format.")
178+
case _ => Some(s"Unsupported file format $format.")
177179
}
178180
}
179181

@@ -194,6 +196,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
194196
.toSeq
195197
}
196198

199+
override def getSubstraitReadFileFormatV1(
200+
fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
201+
fileFormat.getClass.getSimpleName match {
202+
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
203+
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
204+
case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
205+
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
206+
case _ => ReadFileFormat.UnknownFormat
207+
}
208+
}
209+
210+
override def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat = {
211+
scan.getClass.getSimpleName match {
212+
case "OrcScan" => ReadFileFormat.OrcReadFormat
213+
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
214+
case "DwrfScan" => ReadFileFormat.DwrfReadFormat
215+
case _ => ReadFileFormat.UnknownFormat
216+
}
217+
}
218+
197219
override def supportWriteFilesExec(
198220
format: FileFormat,
199221
fields: Array[StructField],

‎backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala

-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ object VeloxRuleApi {
8585
injector.injectPostTransform(_ => UnionTransformerRule())
8686
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
8787
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
88-
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
8988
injector.injectPostTransform(_ => PushDownFilterToScan)
9089
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
9190
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
@@ -163,7 +162,6 @@ object VeloxRuleApi {
163162
injector.injectPostTransform(_ => UnionTransformerRule())
164163
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
165164
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
166-
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
167165
injector.injectPostTransform(_ => PushDownFilterToScan)
168166
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
169167
injector.injectPostTransform(_ => EnsureLocalSortRequirements)

‎gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
/**
3939
* A trivial memory consumer implementation used by Gluten.
4040
*
41-
* @deprecated Use {@link TreeMemoryConsumers#shared()} instead.
41+
* @deprecated Use {@link TreeMemoryConsumers} instead.
4242
*/
4343
@Deprecated
4444
public class RegularMemoryConsumer extends MemoryConsumer

‎gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister

-1
This file was deleted.

‎gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules

-1
This file was deleted.

‎gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala ‎gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
*/
1717
package org.apache.gluten.execution
1818

19-
import org.apache.spark.sql.execution.FileSourceScanExec
19+
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
2020

21-
class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
21+
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
2222

23-
override val scanClassName: String = "HoodieParquetFileFormat"
24-
25-
override def createDataSourceTransformer(
26-
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
27-
HudiScanTransformer(batchScan)
23+
case class OffloadDeltaScan() extends OffloadSingleNode {
24+
override def offload(plan: SparkPlan): SparkPlan = plan match {
25+
case scan: FileSourceScanExec
26+
if scan.relation.fileFormat.getClass.getName == "org.apache.spark.sql.delta.DeltaParquetFileFormat" =>
27+
DeltaScanTransformer(scan)
28+
case other => other
2829
}
2930
}

‎gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala ‎gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala

+5-8
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
package org.apache.gluten.extension
1818

1919
import org.apache.gluten.execution.{DeltaFilterExecTransformer, DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
20-
import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, filterRule, projectRule, pushDownInputFileExprRule}
21-
import org.apache.gluten.extension.columnar.RewriteTransformerRules
20+
import org.apache.gluten.extension.columnar.transition.RemoveTransitions
2221

2322
import org.apache.spark.sql.SparkSession
2423
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName}
@@ -30,12 +29,10 @@ import org.apache.spark.sql.execution.datasources.FileFormat
3029

3130
import scala.collection.mutable.ListBuffer
3231

33-
class DeltaRewriteTransformerRules extends RewriteTransformerRules {
34-
override def rules: Seq[Rule[SparkPlan]] =
35-
columnMappingRule :: filterRule :: projectRule :: pushDownInputFileExprRule :: Nil
36-
}
37-
38-
object DeltaRewriteTransformerRules {
32+
object DeltaPostTransformRules {
33+
def rules: Seq[Rule[SparkPlan]] =
34+
RemoveTransitions :: columnMappingRule :: filterRule :: projectRule ::
35+
pushDownInputFileExprRule :: Nil
3936

4037
private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
4138
TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")

‎gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister

-1
This file was deleted.

‎gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala ‎gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala

+13-7
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,22 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.gluten.execution
1817

19-
import org.apache.spark.sql.execution.FileSourceScanExec
18+
package org.apache.gluten.execution
2019

21-
class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
20+
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
2221

23-
override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"
22+
import org.apache.spark.sql.execution.SparkPlan
2423

25-
override def createDataSourceTransformer(
26-
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
27-
DeltaScanTransformer(batchScan)
24+
/** Since https://github.com/apache/incubator-gluten/pull/6049. */
25+
case class OffloadHudiScan() extends OffloadSingleNode {
26+
override def offload(plan: SparkPlan): SparkPlan = {
27+
plan match {
28+
// Hudi has multiple file format definitions whose names end with "HoodieParquetFileFormat".
29+
case scan: org.apache.spark.sql.execution.FileSourceScanExec
30+
if scan.relation.fileFormat.getClass.getName.endsWith("HoodieParquetFileFormat") =>
31+
HudiScanTransformer(scan)
32+
case other => other
33+
}
2834
}
2935
}

‎gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala

+6
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package org.apache.gluten.backendsapi
1919
import org.apache.gluten.GlutenConfig
2020
import org.apache.gluten.extension.ValidationResult
2121
import org.apache.gluten.extension.columnar.transition.Convention
22+
import org.apache.gluten.substrait.rel.LocalFilesNode
2223
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
2324

2425
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2526
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
2627
import org.apache.spark.sql.catalyst.plans._
28+
import org.apache.spark.sql.connector.read.Scan
2729
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
2830
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
2931
import org.apache.spark.sql.types.StructField
@@ -39,6 +41,10 @@ trait BackendSettingsApi {
3941
rootPaths: Seq[String],
4042
properties: Map[String, String]): ValidationResult = ValidationResult.succeeded
4143

44+
def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat
45+
46+
def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat
47+
4248
def supportWriteFilesExec(
4349
format: FileFormat,
4450
fields: Array[StructField],

‎gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala

+2-7
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,8 @@ abstract class BatchScanExecTransformerBase(
175175
@transient protected lazy val filteredFlattenPartitions: Seq[InputPartition] =
176176
filteredPartitions.flatten
177177

178-
@transient override lazy val fileFormat: ReadFileFormat = scan.getClass.getSimpleName match {
179-
case "OrcScan" => ReadFileFormat.OrcReadFormat
180-
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
181-
case "DwrfScan" => ReadFileFormat.DwrfReadFormat
182-
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
183-
case _ => ReadFileFormat.UnknownFormat
184-
}
178+
@transient override lazy val fileFormat: ReadFileFormat =
179+
BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
185180

186181
override def simpleString(maxFields: Int): String = {
187182
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)

‎gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala

-55
This file was deleted.

‎gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala

+1-9
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,7 @@ abstract class FileSourceScanExecTransformerBase(
190190
}
191191

192192
@transient override lazy val fileFormat: ReadFileFormat =
193-
relation.fileFormat.getClass.getSimpleName match {
194-
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
195-
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
196-
case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
197-
case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
198-
case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
199-
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
200-
case _ => ReadFileFormat.UnknownFormat
201-
}
193+
BackendsApiManager.getSettings.getSubstraitReadFileFormatV1(relation.fileFormat)
202194

203195
override def simpleString(maxFields: Int): String = {
204196
val metadataEntries = metadata.toSeq.sorted.map {

‎gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala

+16-63
Original file line numberDiff line numberDiff line change
@@ -21,76 +21,29 @@ import org.apache.gluten.sql.shims.SparkShimLoader
2121
import org.apache.spark.sql.execution.FileSourceScanExec
2222
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2323

24-
import java.util.ServiceLoader
25-
import java.util.concurrent.ConcurrentHashMap
26-
27-
import scala.collection.JavaConverters._
28-
2924
object ScanTransformerFactory {
3025

31-
private val scanTransformerMap = new ConcurrentHashMap[String, Class[_]]()
32-
3326
def createFileSourceScanTransformer(
3427
scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
35-
val fileFormat = scanExec.relation.fileFormat
36-
lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
37-
case Some(clz) =>
38-
clz
39-
.getDeclaredConstructor()
40-
.newInstance()
41-
.asInstanceOf[DataSourceScanTransformerRegister]
42-
.createDataSourceTransformer(scanExec)
43-
case _ =>
44-
FileSourceScanExecTransformer(
45-
scanExec.relation,
46-
scanExec.output,
47-
scanExec.requiredSchema,
48-
scanExec.partitionFilters,
49-
scanExec.optionalBucketSet,
50-
scanExec.optionalNumCoalescedBuckets,
51-
scanExec.dataFilters,
52-
scanExec.tableIdentifier,
53-
scanExec.disableBucketedScan
54-
)
55-
}
28+
FileSourceScanExecTransformer(
29+
scanExec.relation,
30+
scanExec.output,
31+
scanExec.requiredSchema,
32+
scanExec.partitionFilters,
33+
scanExec.optionalBucketSet,
34+
scanExec.optionalNumCoalescedBuckets,
35+
scanExec.dataFilters,
36+
scanExec.tableIdentifier,
37+
scanExec.disableBucketedScan
38+
)
5639
}
5740

5841
def createBatchScanTransformer(batchScanExec: BatchScanExec): BatchScanExecTransformerBase = {
59-
val scan = batchScanExec.scan
60-
lookupDataSourceScanTransformer(scan.getClass.getName) match {
61-
case Some(clz) =>
62-
clz
63-
.getDeclaredConstructor()
64-
.newInstance()
65-
.asInstanceOf[DataSourceScanTransformerRegister]
66-
.createDataSourceV2Transformer(batchScanExec)
67-
case _ =>
68-
BatchScanExecTransformer(
69-
batchScanExec.output,
70-
batchScanExec.scan,
71-
batchScanExec.runtimeFilters,
72-
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
73-
)
74-
}
75-
}
76-
77-
private def lookupDataSourceScanTransformer(scanClassName: String): Option[Class[_]] = {
78-
val clz = scanTransformerMap.computeIfAbsent(
79-
scanClassName,
80-
_ => {
81-
val loader = Option(Thread.currentThread().getContextClassLoader)
82-
.getOrElse(getClass.getClassLoader)
83-
val serviceLoader = ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
84-
serviceLoader.asScala
85-
.filter(service => scanClassName.contains(service.scanClassName))
86-
.toList match {
87-
case head :: Nil =>
88-
// there is exactly one registered alias
89-
head.getClass
90-
case _ => null
91-
}
92-
}
42+
BatchScanExecTransformer(
43+
batchScanExec.output,
44+
batchScanExec.scan,
45+
batchScanExec.runtimeFilters,
46+
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
9347
)
94-
Option(clz)
9548
}
9649
}

‎gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala

-53
This file was deleted.

‎gluten-ut/common/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
<plugin>
3131
<groupId>org.apache.maven.plugins</groupId>
3232
<artifactId>maven-resources-plugin</artifactId>
33-
<version>3.0.1</version>
3433
</plugin>
3534
<plugin>
3635
<groupId>net.alchim31.maven</groupId>

‎gluten-ut/spark32/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
<plugin>
7272
<groupId>org.apache.maven.plugins</groupId>
7373
<artifactId>maven-resources-plugin</artifactId>
74-
<version>3.0.1</version>
7574
</plugin>
7675
<plugin>
7776
<groupId>net.alchim31.maven</groupId>

‎pom.xml

+2-7
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@
340340
<iceberg.version>1.5.0</iceberg.version>
341341
<delta.package.name>delta-spark</delta.package.name>
342342
<delta.version>3.2.0</delta.version>
343-
<delta.binary.version>32</delta.binary.version>
344-
<hudi.version>0.15.0</hudi.version>
343+
<delta.binary.version>32</delta.binary.version>
344+
<hudi.version>0.15.0</hudi.version>
345345
<fasterxml.version>2.15.1</fasterxml.version>
346346
<hadoop.version>3.3.4</hadoop.version>
347347
<antlr4.version>4.9.3</antlr4.version>
@@ -1200,11 +1200,6 @@
12001200
<version>${fasterxml.version}</version>
12011201
<scope>provided</scope>
12021202
</dependency>
1203-
<dependency>
1204-
<groupId>org.apache.maven.plugins</groupId>
1205-
<artifactId>maven-source-plugin</artifactId>
1206-
<version>3.2.1</version>
1207-
</dependency>
12081203
</dependencies>
12091204
</dependencyManagement>
12101205

0 commit comments

Comments
 (0)
Please sign in to comment.