diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala index 77160f49ce58..f4735ff9eccb 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala @@ -19,9 +19,19 @@ package org.apache.gluten.sql.shims.delta32 import org.apache.gluten.execution.GlutenPlan import org.apache.gluten.sql.shims.DeltaShims +import org.apache.spark.sql.delta.DeltaParquetFileFormat +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.util.JsonUtils import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.perf.DeltaOptimizedWriterTransformer +import org.apache.hadoop.fs.Path + +import java.util.{HashMap => JHashMap, Map => JMap} + +import scala.collection.JavaConverters._ + class Delta32Shims extends DeltaShims { override def supportDeltaOptimizedWriterExec(plan: SparkPlan): Boolean = DeltaOptimizedWriterTransformer.support(plan) @@ -29,4 +39,39 @@ class Delta32Shims extends DeltaShims { override def offloadDeltaOptimizedWriterExec(plan: SparkPlan): GlutenPlan = { DeltaOptimizedWriterTransformer.from(plan) } + + /** + * decode ZeroMQ Base85 encoded file path + * + * TODO: native size needs to support the ZeroMQ Base85 + */ + override def convertRowIndexFilterIdEncoded( + partitionColsCnt: Int, + file: PartitionedFile, + otherConstantMetadataColumnValues: JMap[String, Object]): JMap[String, Object] = { + val newOtherConstantMetadataColumnValues: JMap[String, Object] = + new JHashMap[String, Object] + for ((k, v) <- otherConstantMetadataColumnValues.asScala) { + if (k.equalsIgnoreCase(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) { + val decoded = JsonUtils.fromJson[DeletionVectorDescriptor](v.toString) + var filePath = new Path(file.filePath.toString()).getParent + for (_ <- 0 until partitionColsCnt) { + filePath = filePath.getParent + } + val decodedPath = decoded.absolutePath(filePath) + val newDeletionVectorDescriptor = decoded.copy( + decoded.storageType, + decodedPath.toUri.toASCIIString, + decoded.offset, + decoded.sizeInBytes, + decoded.cardinality, + decoded.maxRowIndex + ) + newOtherConstantMetadataColumnValues.put(k, JsonUtils.toJson(newDeletionVectorDescriptor)) + } else { + newOtherConstantMetadataColumnValues.put(k, v) + } + } + newOtherConstantMetadataColumnValues + } } diff --git a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala new file mode 100644 index 000000000000..0e0b01d12fa7 --- /dev/null +++ b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.gluten.delta + +import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class GlutenDeltaParquetDeletionVectorSuite + extends GlutenClickHouseTPCHAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + // import org.apache.gluten.backendsapi.clickhouse.CHConfig._ + + /** Run Gluten + ClickHouse Backend with SortShuffleManager */ + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.io.compression.codec", "LZ4") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.files.maxPartitionBytes", "20000000") + .set("spark.sql.storeAssignmentPolicy", "legacy") + // .setCHConfig("use_local_format", true) + .set("spark.databricks.delta.retentionDurationCheck.enabled", "false") + } + + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + private val q1SchemaString: String = + s""" l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string""".stripMargin + + test("test parquet table delete with the delta DV") { + spark.sql(s""" + |set spark.gluten.enabled=false; + |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_delta_parquet_delete_dv; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_delta_parquet_delete_dv + |($q1SchemaString) + |USING delta + |TBLPROPERTIES (delta.enableDeletionVectors='true') + |LOCATION '$basePath/lineitem_delta_parquet_delete_dv' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_delta_parquet_delete_dv + | select /*+ REPARTITION(6) */ * from lineitem + |""".stripMargin) + + val df1 = spark.sql(s""" + | delete from lineitem_delta_parquet_delete_dv + | where mod(l_orderkey, 3) = 1 and l_orderkey < 100 + |""".stripMargin) + spark.sql(s""" + |set spark.gluten.enabled=true; + |""".stripMargin) + + val df = spark.sql(s""" + | select sum(l_linenumber) from lineitem_delta_parquet_delete_dv + |""".stripMargin) + val result = df.collect() + assert( + result.apply(0).get(0) === 1802335 + ) + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + val parquetScan = scanExec.head + val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil) + assert(addFiles.size === 6) + + spark.sql(s""" + |set spark.gluten.enabled=false; + |""".stripMargin) + spark.sql(s""" + | delete from lineitem_delta_parquet_delete_dv where mod(l_orderkey, 3) = 2 + |""".stripMargin) + spark.sql(s""" + |set spark.gluten.enabled=true; + |""".stripMargin) + + val df3 = spark.sql(s""" + | select sum(l_linenumber) from lineitem_delta_parquet_delete_dv + |""".stripMargin) + assert( + df3.collect().apply(0).get(0) === 1200560 + ) + } + + test("test parquet partition table delete with the delta DV") { + withSQLConf(("spark.sql.sources.partitionOverwriteMode", "dynamic")) { + spark.sql(s""" + |set spark.gluten.enabled=false; + |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_delta_partition_parquet_delete_dv; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_delta_partition_parquet_delete_dv + |($q1SchemaString) + |USING delta + |PARTITIONED BY (l_returnflag) + |TBLPROPERTIES (delta.enableDeletionVectors='true') + |LOCATION '$basePath/lineitem_delta_partition_parquet_delete_dv' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_delta_partition_parquet_delete_dv + | select /*+ REPARTITION(6) */ * from lineitem + |""".stripMargin) + + val df1 = spark.sql(s""" + | delete from lineitem_delta_partition_parquet_delete_dv + | where mod(l_orderkey, 3) = 1 + |""".stripMargin) + spark.sql(s""" + |set spark.gluten.enabled=true; + |""".stripMargin) + + val df = + spark.sql(s""" + | select sum(l_linenumber) from lineitem_delta_partition_parquet_delete_dv + |""".stripMargin) + val result = df.collect() + assert( + result.apply(0).get(0) === 1201486 + ) + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.nonEmpty) + } + } +} +// scalastyle:off line.size.limit diff --git a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala index 032054c412fb..09de110817e6 100644 --- a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala +++ b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala @@ -19,6 +19,9 @@ package org.apache.gluten.sql.shims import org.apache.gluten.execution.GlutenPlan import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import java.util.{HashMap => JHashMap, Map => JMap} sealed abstract class ShimDescriptor @@ -29,4 +32,10 @@ trait DeltaShims { throw new UnsupportedOperationException( s"Can't transform ColumnarDeltaOptimizedWriterExec from ${plan.getClass.getSimpleName}") } + + def convertRowIndexFilterIdEncoded( + partitionColsCnt: Int, + file: PartitionedFile, + otherConstantMetadataColumnValues: JMap[String, Object]): JMap[String, Object] = + new JHashMap[String, Object]() } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 308b3fa4f5c7..7de4e1e8324c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.metrics.IMetrics -import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.sql.shims.{DeltaShimLoader, SparkShimLoader} import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel._ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat @@ -159,6 +159,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] val metadataColumns = new JArrayList[JMap[String, String]] + val otherMetadataColumns = new JArrayList[JMap[String, Object]] f.files.foreach { file => paths.add(new URI(file.filePath.toString()).toASCIIString) @@ -203,6 +204,14 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { fileSizes.add(0) modificationTimes.add(0) } + + val otherConstantMetadataColumnValues = + DeltaShimLoader.getDeltaShims.convertRowIndexFilterIdEncoded( + partitionColumn.size(), + file, + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + ) + otherMetadataColumns.add(otherConstantMetadataColumnValues) } val preferredLocations = CHAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations()) @@ -217,7 +226,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { metadataColumns, fileFormat, preferredLocations.toList.asJava, - mapAsJavaMap(properties) + mapAsJavaMap(properties), + otherMetadataColumns ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition: $partition.") diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala index 412d8773b9e0..39f86429e2a6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala @@ -70,18 +70,19 @@ case class CHInputPartitionsUtil( SparkShimLoader.getSparkShims.getFileStatus(partition).flatMap { file => // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath + val filePath = file._1.getPath if (shouldProcess(filePath)) { val isSplitable = relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath) SparkShimLoader.getSparkShims.splitFiles( relation.sparkSession, - file, + file._1, filePath, isSplitable, maxSplitBytes, - partition.values + partition.values, + file._2 ) } else { Seq.empty diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala index c39c68524d15..0a4659591e27 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala @@ -121,7 +121,8 @@ case class GlutenCacheFilesCommand( new JArrayList[JMap[String, String]](), ReadFileFormat.ParquetReadFormat, // ignore format in backend new JArrayList[String](), - new JHashMap[String, String]() + new JHashMap[String, String](), + new JArrayList[JMap[String, Object]]() ) (executorId, localFile) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 1be7b8d7356d..68f8b0e8a50a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -65,7 +65,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { fileSizes, modificationTimes, partitionColumns, - metadataColumns) = + metadataColumns, + otherMetadataColumns) = constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) @@ -80,7 +81,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { metadataColumns, fileFormat, preferredLocations.toList.asJava, - mapAsJavaMap(properties) + mapAsJavaMap(properties), + otherMetadataColumns ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") @@ -104,7 +106,15 @@ class VeloxIteratorApi extends IteratorApi with Logging { }.toArray val locations = partitions.flatMap(p => SoftAffinity.getFilePartitionLocations(p.asInstanceOf[FilePartition])) - val (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns) = + val ( + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + otherMetadataColumns) = constructSplitInfo(partitionSchema, partitionFiles, metadataColumnNames) LocalFilesBuilder.makeLocalFiles( partitionIndex, @@ -117,7 +127,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { metadataColumns, fileFormat, locations.toList.asJava, - mapAsJavaMap(properties) + mapAsJavaMap(properties), + otherMetadataColumns ) } @@ -151,6 +162,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] val metadataColumns = new JArrayList[JMap[String, String]] + val otherMetadataColumns = new JArrayList[JMap[String, Object]] files.foreach { file => paths.add(unescapePathName(file.filePath.toString)) @@ -190,8 +202,18 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionColumn.put(schema.names(i), partitionColumnValue) } partitionColumns.add(partitionColumn) + otherMetadataColumns.add( + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)) } - (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns) + ( + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + otherMetadataColumns) } override def injectWriteFilesTempPath(path: String, fileName: String): Unit = { diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt b/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt index 5783898bc650..42487234175e 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt +++ b/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt @@ -27,12 +27,18 @@ target_compile_options( if(ENABLE_HDFS) target_link_libraries( substrait_source - PUBLIC boost::headers_only ch_contrib::protobuf clickhouse_common_io - ch_contrib::hdfs substrait ch_contrib::roaring) + PUBLIC boost::headers_only + ch_contrib::protobuf + clickhouse_common_io + ch_contrib::hdfs + substrait + ch_contrib::roaring + ch_contrib::rapidjson) else() target_link_libraries( - substrait_source PUBLIC boost::headers_only ch_contrib::protobuf - clickhouse_common_io substrait ch_contrib::roaring) + substrait_source + PUBLIC boost::headers_only ch_contrib::protobuf clickhouse_common_io + substrait ch_contrib::roaring ch_contrib::rapidjson) endif() target_include_directories( substrait_source SYSTEM BEFORE diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp index d0c7a06e0e5e..58dd911354c7 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "DeltaDVRoaringBitmapArray.h" + +#include #include #include -#include -#include -#include #include #include -#include +#include +#include +#include namespace DB { @@ -37,66 +39,72 @@ namespace local_engine { using namespace DB; -std::pair DeltaDVRoaringBitmapArray::decompose_high_low_bytes(UInt64 value) { +std::pair DeltaDVRoaringBitmapArray::decompose_high_low_bytes(UInt64 value) +{ return {static_cast(value >> 32), static_cast(value & 0xFFFFFFFF)}; } -UInt64 DeltaDVRoaringBitmapArray::compose_from_high_low_bytes(UInt32 high, UInt32 low) { +UInt64 DeltaDVRoaringBitmapArray::compose_from_high_low_bytes(UInt32 high, UInt32 low) +{ return (static_cast(high) << 32) | low; } -DeltaDVRoaringBitmapArray::DeltaDVRoaringBitmapArray( - const DB::ContextPtr & context_) : context(context_) +DeltaDVRoaringBitmapArray::DeltaDVRoaringBitmapArray(const DB::ContextPtr & context_) : context(context_) { } -void DeltaDVRoaringBitmapArray::rb_read(const String & file_path, const Int32 offset, const Int32 data_size) +void DeltaDVRoaringBitmapArray::rb_read(const String & file_path, Int32 offset, Int32 data_size) { - // TODO: use `ReadBufferBuilderFactory::instance().createBuilder` to create hdfs/local/s3 ReadBuffer + substrait::ReadRel::LocalFiles::FileOrFiles file_info; + file_info.set_uri_file(file_path); + file_info.set_start(offset); + file_info.set_length(data_size); const Poco::URI file_uri(file_path); - ReadBufferFromFile in(file_uri.getPath()); + ReadBufferBuilderPtr read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context); + auto * in = dynamic_cast(read_buffer_builder->build(file_info).release()); + if (in == nullptr) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to create a valid SeekableReadBuffer."); - in.seek(offset, SEEK_SET); + in->seek(offset, SEEK_SET); int size; - readBinaryBigEndian(size, in); + readBinaryBigEndian(size, *in); if (data_size != size) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "The size of the deletion vector is mismatch."); - int checksum_value = static_cast(crc32_z(0L, reinterpret_cast(in.position()), size)); + int checksum_value = static_cast(crc32_z(0L, reinterpret_cast(in->position()), size)); int magic_num; - readBinaryLittleEndian(magic_num, in); + readBinaryLittleEndian(magic_num, *in); if (magic_num != 1681511377) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "The magic num is mismatch."); int64_t bitmap_array_size; - readBinaryLittleEndian(bitmap_array_size, in); + readBinaryLittleEndian(bitmap_array_size, *in); roaring_bitmap_array.reserve(bitmap_array_size); for (size_t i = 0; i < bitmap_array_size; ++i) { int bitmap_index; - readBinaryLittleEndian(bitmap_index, in); - roaring::Roaring r = roaring::Roaring::read(in.position()); + readBinaryLittleEndian(bitmap_index, *in); + roaring::Roaring r = roaring::Roaring::read(in->position()); size_t current_bitmap_size = r.getSizeInBytes(); - in.ignore(current_bitmap_size); + in->ignore(current_bitmap_size); roaring_bitmap_array.push_back(r); } int expected_checksum; - readBinaryBigEndian(expected_checksum, in); + readBinaryBigEndian(expected_checksum, *in); if (expected_checksum != checksum_value) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Checksum mismatch."); + } UInt64 DeltaDVRoaringBitmapArray::rb_size() const { UInt64 sum = 0; for (const auto & r : roaring_bitmap_array) - { sum += r.cardinality(); - } return sum; } @@ -126,44 +134,41 @@ void DeltaDVRoaringBitmapArray::rb_add(Int64 x) assert(value >= 0 && value <= MAX_REPRESENTABLE_VALUE); auto [high, low] = decompose_high_low_bytes(value); if (high >= roaring_bitmap_array.size()) - { rb_extend_bitmaps(high + 1); - } roaring_bitmap_array[high].add(low); } void DeltaDVRoaringBitmapArray::rb_extend_bitmaps(Int32 new_length) { - if (roaring_bitmap_array.size() >= new_length) return; + if (roaring_bitmap_array.size() >= new_length) + return; roaring_bitmap_array.resize(new_length); } void DeltaDVRoaringBitmapArray::rb_shrink_bitmaps(Int32 new_length) { - if (roaring_bitmap_array.size() <= new_length) return; + if (roaring_bitmap_array.size() <= new_length) + return; roaring_bitmap_array.resize(new_length); } void DeltaDVRoaringBitmapArray::rb_merge(const DeltaDVRoaringBitmapArray & that) { - return rb_or(that); + rb_or(that); } void DeltaDVRoaringBitmapArray::rb_or(const DeltaDVRoaringBitmapArray & that) { - if (roaring_bitmap_array.size() < that.roaring_bitmap_array.size()) { + if (roaring_bitmap_array.size() < that.roaring_bitmap_array.size()) rb_extend_bitmaps(that.roaring_bitmap_array.size()); - } const Int32 count = that.roaring_bitmap_array.size(); for (Int32 i = 0; i < count; ++i) - { roaring_bitmap_array[i] |= that.roaring_bitmap_array[i]; - } } bool DeltaDVRoaringBitmapArray::operator==(const DeltaDVRoaringBitmapArray & other) const { - if (this == &other) { + if (this == &other) return true; - } + return roaring_bitmap_array == other.roaring_bitmap_array; } } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h index f8315a1cc568..5b9fd93f585b 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include -#include -#include +#pragma once + +#include +#include +#include #include namespace local_engine @@ -27,9 +29,8 @@ namespace local_engine */ class DeltaDVRoaringBitmapArray : private boost::noncopyable { -private: - const Int64 MAX_REPRESENTABLE_VALUE = - (static_cast(INT32_MAX - 1) << 32) | (static_cast(INT32_MIN) & 0xFFFFFFFFL); + static constexpr Int64 MAX_REPRESENTABLE_VALUE + = (static_cast(INT32_MAX - 1) << 32) | (static_cast(INT32_MIN) & 0xFFFFFFFFL); DB::ContextPtr context; std::vector roaring_bitmap_array; @@ -37,13 +38,13 @@ class DeltaDVRoaringBitmapArray : private boost::noncopyable static UInt64 compose_from_high_low_bytes(UInt32 high, UInt32 low); void rb_extend_bitmaps(Int32 new_length); void rb_shrink_bitmaps(Int32 new_length); -public: - DeltaDVRoaringBitmapArray(const DB::ContextPtr & context_); +public: + explicit DeltaDVRoaringBitmapArray(const DB::ContextPtr & context_); ~DeltaDVRoaringBitmapArray() = default; - bool operator==(const DeltaDVRoaringBitmapArray& other) const; + bool operator==(const DeltaDVRoaringBitmapArray & other) const; UInt64 rb_size() const; - void rb_read(const String & file_path, const Int32 offset, const Int32 data_size); + void rb_read(const String & file_path, Int32 offset, Int32 data_size); bool rb_contains(Int64 x) const; bool rb_is_empty() const; void rb_clear(); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaReader.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaReader.cpp new file mode 100644 index 000000000000..75843c422419 --- /dev/null +++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaReader.cpp @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "DeltaReader.h" + +#include +#include +#include +#include +#include + +using namespace DB; + +namespace local_engine::delta +{ + +std::unique_ptr DeltaReader::create( + const FormatFilePtr & file_, + const Block & to_read_header_, + const Block & output_header_, + const FormatFile::InputFormatPtr & input_format_, + const String & row_index_ids_encoded, + const String & row_index_filter_type) +{ + assert(row_index_filter_type == DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_TYPE_IF_CONTAINED); + + auto bitmap_config_ = std::make_shared(); + rapidjson::Document doc; + doc.Parse(row_index_ids_encoded.c_str()); + assert(!doc.HasParseError()); + if (doc.HasMember("storageType") && doc["storageType"].IsString()) + bitmap_config_->storage_type = doc["storageType"].GetString(); + if (doc.HasMember("pathOrInlineDv") && doc["pathOrInlineDv"].IsString()) + bitmap_config_->path_or_inline_dv = doc["pathOrInlineDv"].GetString(); + if (doc.HasMember("offset") && doc["offset"].IsInt()) + bitmap_config_->offset = doc["offset"].GetInt(); + if (doc.HasMember("sizeInBytes") && doc["sizeInBytes"].IsInt()) + bitmap_config_->size_in_bytes = doc["sizeInBytes"].GetInt(); + if (doc.HasMember("cardinality") && doc["cardinality"].IsInt64()) + bitmap_config_->cardinality = doc["cardinality"].GetInt64(); + if (doc.HasMember("maxRowIndex") && doc["maxRowIndex"].IsInt64()) + bitmap_config_->max_row_index = doc["maxRowIndex"].GetInt64(); + + return std::make_unique(file_, to_read_header_, output_header_, input_format_, bitmap_config_); +} + +DeltaReader::DeltaReader( + const FormatFilePtr & file_, + const Block & to_read_header_, + const Block & output_header_, + const FormatFile::InputFormatPtr & input_format_, + const std::shared_ptr & bitmap_config_) + : NormalFileReader(file_, to_read_header_, output_header_, input_format_) + , bitmap_config(bitmap_config_) +{ + bitmap_array = std::make_unique(file->getContext()); + bitmap_array->rb_read(bitmap_config->path_or_inline_dv, bitmap_config->offset, bitmap_config->size_in_bytes); +} + +Chunk DeltaReader::doPull() +{ + if (!bitmap_array) + return NormalFileReader::doPull(); + + while (true) + { + Chunk chunk = NormalFileReader::doPull(); + if (chunk.getNumRows() == 0) + return chunk; + + deleteRowsByDV(chunk); + + if (chunk.getNumRows() != 0) + return chunk; + } +} + +void DeltaReader::deleteRowsByDV(Chunk & chunk) const +{ + assert(bitmap_array); + size_t num_rows = chunk.getNumRows(); + size_t deleted_row_pos = readHeader.getPositionByName(DeltaParquetVirtualMeta::DELTA_INTERNAL_IS_ROW_DELETED); + size_t tmp_row_id_pos_output = readHeader.getPositionByName(ParquetVirtualMeta::TMP_ROWINDEX); + DB::DataTypePtr deleted_row_type = DeltaParquetVirtualMeta::getMetaColumnType(readHeader); + size_t tmp_row_id_pos = chunk.getNumColumns() - 1; + if (tmp_row_id_pos_output < tmp_row_id_pos) + tmp_row_id_pos = tmp_row_id_pos_output; + + auto deleted_row_column_nest = DB::ColumnInt8::create(num_rows); + auto & vec = deleted_row_column_nest->getData(); + + for (int i = 0; i < num_rows; i++) + vec[i] = bitmap_array->rb_contains(chunk.getColumns()[tmp_row_id_pos]->get64(i)); + + DB::ColumnPtr deleted_row_column; + if (deleted_row_type->isNullable()) + { + auto nullmap_column = DB::ColumnUInt8::create(deleted_row_column_nest->size(), 0); + deleted_row_column = DB::ColumnNullable::create(std::move(deleted_row_column_nest), std::move(nullmap_column)); + } + else + deleted_row_column = std::move(deleted_row_column_nest); + + if (deleted_row_pos < chunk.getNumColumns()) + chunk.addColumn(deleted_row_pos, std::move(deleted_row_column)); + else + chunk.addColumn(std::move(deleted_row_column)); +} + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaReader.h b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaReader.h new file mode 100644 index 000000000000..04d2e65acf12 --- /dev/null +++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaReader.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace local_engine::delta +{ + +struct DeltaDVBitmapConfig +{ + inline static const String DELTA_ROW_INDEX_FILTER_TYPE = "row_index_filter_type"; + inline static const String DELTA_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded"; + inline static const String DELTA_ROW_INDEX_FILTER_TYPE_IF_CONTAINED = "IF_CONTAINED"; + inline static const String DELTA_ROW_INDEX_FILTER_TYPE_IF_NOT_CONTAINED = "IF_NOT_CONTAINED"; + + String storage_type; + String path_or_inline_dv; + Int32 offset = 0; + Int32 size_in_bytes = 0; + Int64 cardinality = 0; + Int64 max_row_index = 0; +}; + +class DeltaReader final : public NormalFileReader +{ + std::shared_ptr bitmap_config; + std::unique_ptr bitmap_array; + +public: + static std::unique_ptr create( + const FormatFilePtr & file_, + const DB::Block & to_read_header_, + const DB::Block & output_header_, + const FormatFile::InputFormatPtr & input_format_, + const String & row_index_ids_encoded, + const String & row_index_filter_type); + + DeltaReader( + const FormatFilePtr & file_, + const DB::Block & to_read_header_, + const DB::Block & output_header_, + const FormatFile::InputFormatPtr & input_format_, + const std::shared_ptr & bitmap_config_ = nullptr); + +protected: + DB::Chunk doPull() override; + + void deleteRowsByDV(DB::Chunk & chunk) const; +}; + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp index 14e9925da66c..c513906f55a4 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp @@ -27,6 +27,8 @@ #include #include #include +#include +#include namespace DB { @@ -286,7 +288,26 @@ std::unique_ptr createNormalFileReader( return iceberg::IcebergReader::create(file, to_read_header_, output_header_, createInputFormat); auto input_format = createInputFormat(to_read_header_); - return input_format == nullptr ? nullptr : std::make_unique(file, to_read_header_, output_header_, input_format); + + if (!input_format) + return nullptr; + + if (file->getFileInfo().other_const_metadata_columns_size()) + { + String row_index_ids_encoded; + String row_index_filter_type; + for (const auto & column : file->getFileInfo().other_const_metadata_columns()) + { + if (column.key() == delta::DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_ID_ENCODED) + row_index_ids_encoded = toString(column.value()); + if (column.key() == delta::DeltaDVBitmapConfig::DELTA_ROW_INDEX_FILTER_TYPE) + row_index_filter_type = toString(column.value()); + } + if (!row_index_ids_encoded.empty() && !row_index_filter_type.empty()) + return delta::DeltaReader::create(file, to_read_header_, output_header_, input_format, row_index_ids_encoded, row_index_filter_type); + } + + return std::make_unique(file, to_read_header_, output_header_, input_format); } } std::unique_ptr BaseReader::create( diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index be5ed3dd27a0..ec2bc6b8e890 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -18,8 +18,6 @@ #if USE_PARQUET -#include - #include #include #include @@ -30,7 +28,7 @@ #include #include #include -#include +#include namespace DB { @@ -116,7 +114,7 @@ class ParquetInputFormat : public FormatFile::InputFormat if (row_index_reader && num_rows) { auto row_index_column = row_index_reader->readBatch(num_rows); - assert(outputHeader.columns() == readHeader.columns() + 1); + assert(outputHeader.columns() > readHeader.columns()); size_t column_pos = outputHeader.getPositionByName(TMP_ROWINDEX); if (column_pos < chunk.getNumColumns()) chunk.addColumn(column_pos, std::move(row_index_column)); @@ -150,9 +148,10 @@ FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const Block & he bool readRowIndex = hasMetaColumns(header); bool usePageIndexReader = (use_pageindex_reader || readRowIndex) && onlyHasFlatType(header); + auto format_settings = getFormatSettings(context); Block output_header = header; - Block read_header = removeMetaColumns(header); + Block read_header = DeltaParquetVirtualMeta::removeMetaColumns(removeMetaColumns(header)); ParquetMetaBuilder metaBuilder{ .collectPageIndex = usePageIndexReader || readRowIndex, diff --git a/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java index 18233306222e..aed9ec21e73a 100644 --- a/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java @@ -49,7 +49,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode { new ArrayList<>(), fileFormat, preferredLocations, - new HashMap<>()); + new HashMap<>(), + new ArrayList<>()); this.deleteFilesList = deleteFilesList; } diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java index a58f5e043503..ebf96f3bc6ff 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java @@ -33,7 +33,8 @@ public static LocalFilesNode makeLocalFiles( List> metadataColumns, LocalFilesNode.ReadFileFormat fileFormat, List preferredLocations, - Map properties) { + Map properties, + List> otherMetadataColumns) { return new LocalFilesNode( index, paths, @@ -45,7 +46,8 @@ public static LocalFilesNode makeLocalFiles( metadataColumns, fileFormat, preferredLocations, - properties); + properties, + otherMetadataColumns); } public static LocalFilesNode makeLocalFiles(String iterPath) { diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index bebc6b1e95ef..f3faee09e742 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -18,6 +18,7 @@ import org.apache.gluten.config.GlutenConfig; import org.apache.gluten.expression.ConverterUtils; +import org.apache.gluten.substrait.utils.SubstraitUtil; import io.substrait.proto.NamedStruct; import io.substrait.proto.ReadRel; @@ -38,6 +39,7 @@ public class LocalFilesNode implements SplitInfo { private final List modificationTimes = new ArrayList<>(); private final List> partitionColumns = new ArrayList<>(); private final List> metadataColumns = new ArrayList<>(); + private final List> otherMetadataColumns = new ArrayList<>(); private final List preferredLocations = new ArrayList<>(); // The format of file to read. @@ -69,7 +71,8 @@ public enum ReadFileFormat { List> metadataColumns, ReadFileFormat fileFormat, List preferredLocations, - Map properties) { + Map properties, + List> otherMetadataColumns) { this.index = index; this.paths.addAll(paths); this.starts.addAll(starts); @@ -81,6 +84,7 @@ public enum ReadFileFormat { this.metadataColumns.addAll(metadataColumns); this.preferredLocations.addAll(preferredLocations); this.fileReadProperties = properties; + this.otherMetadataColumns.addAll(otherMetadataColumns); } LocalFilesNode(String iterPath) { @@ -194,6 +198,19 @@ public ReadRel.LocalFiles toProtobuf() { NamedStruct namedStruct = buildNamedStruct(); fileBuilder.setSchema(namedStruct); + if (!otherMetadataColumns.isEmpty()) { + Map otherMetadatas = otherMetadataColumns.get(i); + if (!otherMetadatas.isEmpty()) { + otherMetadatas.forEach( + (key, value) -> { + ReadRel.LocalFiles.FileOrFiles.otherConstantMetadataColumnValues.Builder builder = + ReadRel.LocalFiles.FileOrFiles.otherConstantMetadataColumnValues.newBuilder(); + builder.setKey(key).setValue(SubstraitUtil.convertJavaObjectToAny(value)); + fileBuilder.addOtherConstMetadataColumns(builder.build()); + }); + } + } + switch (fileFormat) { case ParquetReadFormat: ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/utils/SubstraitUtil.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/utils/SubstraitUtil.java new file mode 100644 index 000000000000..3bdd475ec029 --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/utils/SubstraitUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.utils; + +import org.apache.gluten.backendsapi.BackendsApiManager; + +import com.google.protobuf.Any; +import com.google.protobuf.DoubleValue; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; +import com.google.protobuf.StringValue; + +public class SubstraitUtil { + + private SubstraitUtil() {} + + public static Any convertJavaObjectToAny(Object obj) { + if (obj == null) return null; + Message msg = null; + if (obj instanceof String) { + msg = StringValue.newBuilder().setValue(obj.toString()).build(); + } else if (obj instanceof Integer) { + msg = Int32Value.newBuilder().setValue(Integer.valueOf(obj.toString())).build(); + } else if (obj instanceof Long) { + msg = Int64Value.newBuilder().setValue(Long.valueOf(obj.toString())).build(); + } else if (obj instanceof Double) { + msg = DoubleValue.newBuilder().setValue(Double.valueOf(obj.toString())).build(); + } else { + // TODO: generate the message according to the object type + msg = StringValue.newBuilder().setValue(obj.toString()).build(); + } + + if (msg == null) return null; + return BackendsApiManager.getTransformerApiInstance().packPBMessage(msg); + } +} diff --git a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto index 8c1468117b4f..2d6472605725 100644 --- a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto @@ -232,6 +232,12 @@ message ReadRel { int64 modificationTime = 2; } fileProperties properties = 20; + + message otherConstantMetadataColumnValues { + string key = 1; + google.protobuf.Any value = 2; + } + repeated otherConstantMetadataColumnValues other_const_metadata_columns = 21; } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala index fa0823e1c6f1..f52e704e1fbb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala @@ -58,16 +58,18 @@ case class InputPartitionsUtil( SparkShimLoader.getSparkShims.getFileStatus(partition).flatMap { file => // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath + val filePath = file._1.getPath val isSplitable = SparkShimLoader.getSparkShims.isFileSplittable(relation, filePath, requiredSchema) SparkShimLoader.getSparkShims.splitFiles( sparkSession = relation.sparkSession, - file = file, + file = file._1, filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, - partitionValues = partition.values) + partitionValues = partition.values, + metadata = file._2 + ) } } .sortBy(_.length)(implicitly[Ordering[Long]].reverse) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala index 3a65d6f559ea..b35f8f9d93e9 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala @@ -141,9 +141,9 @@ class HivePartitionConverter(hadoopConf: Configuration, session: SparkSession) f => SparkShimLoader.getSparkShims.splitFiles( session, - f, - f.getPath, - isSplitable = canBeSplit(f.getPath), + f._1, + f._1.getPath, + isSplitable = canBeSplit(f._1.getPath), maxSplitBytes, partition.values ) diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index f8c060bc1862..9a2d62a048d5 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -49,12 +49,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.fs.LocatedFileStatus +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.parquet.schema.MessageType import java.util.{Map => JMap, Properties} +import scala.collection.JavaConverters._ import scala.reflect.ClassTag sealed abstract class ShimDescriptor @@ -193,7 +193,7 @@ trait SparkShims { // Because above, this feature is only supported after spark 3.3 def supportDuplicateReadingTracking: Boolean - def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] + def getFileStatus(partition: PartitionDirectory): Seq[(FileStatus, Map[String, Any])] def isFileSplittable(relation: HadoopFsRelation, filePath: Path, sparkSchema: StructType): Boolean @@ -207,7 +207,8 @@ trait SparkShims { filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] + partitionValues: InternalRow, + metadata: Map[String, Any] = Map.empty): Seq[PartitionedFile] def structFromAttributes(attrs: Seq[Attribute]): StructType @@ -297,4 +298,6 @@ trait SparkShims { def isColumnarLimitExecSupported(): Boolean + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = + Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] } diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index 8289ffce0e23..1c327df656ce 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -196,7 +196,8 @@ class Spark32Shims extends SparkShims { override def supportDuplicateReadingTracking: Boolean = false - def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = partition.files + def getFileStatus(partition: PartitionDirectory): Seq[(FileStatus, Map[String, Any])] = + partition.files.map(f => (f, Map.empty[String, Any])) def isFileSplittable( relation: HadoopFsRelation, @@ -213,7 +214,8 @@ class Spark32Shims extends SparkShims { filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { + partitionValues: InternalRow, + metadata: Map[String, Any] = Map.empty): Seq[PartitionedFile] = { PartitionedFileUtil.splitFiles( sparkSession, file, diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index c8627f5d5a82..94b426a54d21 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -288,7 +288,8 @@ class Spark33Shims extends SparkShims { override def supportDuplicateReadingTracking: Boolean = true - def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = partition.files + def getFileStatus(partition: PartitionDirectory): Seq[(FileStatus, Map[String, Any])] = + partition.files.map(f => (f, Map.empty[String, Any])) def isFileSplittable( relation: HadoopFsRelation, @@ -305,7 +306,8 @@ class Spark33Shims extends SparkShims { filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { + partitionValues: InternalRow, + metadata: Map[String, Any] = Map.empty): Seq[PartitionedFile] = { PartitionedFileUtil.splitFiles( sparkSession, file, diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 31f4a4f13c32..f230bf737523 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -345,7 +345,8 @@ class Spark34Shims extends SparkShims { override def supportDuplicateReadingTracking: Boolean = true - def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = partition.files + def getFileStatus(partition: PartitionDirectory): Seq[(FileStatus, Map[String, Any])] = + partition.files.map(f => (f, Map.empty[String, Any])) def isFileSplittable( relation: HadoopFsRelation, @@ -384,7 +385,8 @@ class Spark34Shims extends SparkShims { filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { + partitionValues: InternalRow, + metadata: Map[String, Any] = Map.empty): Seq[PartitionedFile] = { PartitionedFileUtil.splitFiles( sparkSession, file, diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index e370e1204cea..2203e08d3ed0 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -67,6 +67,7 @@ import org.apache.parquet.schema.MessageType import java.time.ZoneOffset import java.util.{HashMap => JHashMap, Map => JMap, Properties} +import scala.collection.JavaConverters._ import scala.reflect.ClassTag class Spark35Shims extends SparkShims { @@ -375,8 +376,8 @@ class Spark35Shims extends SparkShims { override def supportDuplicateReadingTracking: Boolean = true - def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = - partition.files.map(_.fileStatus) + def getFileStatus(partition: PartitionDirectory): Seq[(FileStatus, Map[String, Any])] = + partition.files.map(f => (f.fileStatus, f.metadata)) def isFileSplittable( relation: HadoopFsRelation, @@ -387,7 +388,8 @@ class Spark35Shims extends SparkShims { } def isRowIndexMetadataColumn(name: String): Boolean = - name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME || + name.equalsIgnoreCase("__delta_internal_is_row_deleted") def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = { sparkSchema.fields.zipWithIndex.find { @@ -411,10 +413,11 @@ class Spark35Shims extends SparkShims { filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { + partitionValues: InternalRow, + metadata: Map[String, Any] = Map.empty): Seq[PartitionedFile] = { PartitionedFileUtil.splitFiles( sparkSession, - FileStatusWithMetadata(file), + FileStatusWithMetadata(file, metadata), isSplitable, maxSplitBytes, partitionValues) @@ -669,4 +672,6 @@ class Spark35Shims extends SparkShims { override def isColumnarLimitExecSupported(): Boolean = false + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = + file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]] }