Skip to content

Commit d5e5544

Browse files
authored
Add compression codec extension to velox written parquet file (#8000)
To align with Vanilla's parquet file name. Like gluten-part-b35dab49-3eb9-434b-abb9-7de2bc180a06.snappy.parquet
1 parent 67ba92b commit d5e5544

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala

+19-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,22 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
3131
override protected val resourcePath: String = "/tpch-data-parquet"
3232
override protected val fileFormat: String = "parquet"
3333

34+
// The parquet compression codec extensions
35+
private val parquetCompressionCodecExtensions = Map(
36+
"none" -> "",
37+
"uncompressed" -> "",
38+
"snappy" -> ".snappy",
39+
"gzip" -> ".gz",
40+
"lzo" -> ".lzo",
41+
"lz4" -> ".lz4",
42+
"brotli" -> ".br",
43+
"zstd" -> ".zstd"
44+
)
45+
46+
private def getParquetFileExtension(codec: String): String = {
47+
s"${parquetCompressionCodecExtensions(codec)}.parquet"
48+
}
49+
3450
override def beforeAll(): Unit = {
3551
super.beforeAll()
3652
createTPCHNotNullTables()
@@ -49,8 +65,8 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
4965
spark.sql("select array(struct(1), null) as var1").write.mode("overwrite").save(path)
5066
}
5167
assert(
52-
testAppender.loggingEvents.exists(
53-
_.getMessage.toString.contains("Use Gluten parquet write for hive")) == false)
68+
!testAppender.loggingEvents.exists(
69+
_.getMessage.toString.contains("Use Gluten parquet write for hive")))
5470
}
5571
}
5672

@@ -77,6 +93,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
7793
parquetFiles.forall {
7894
file =>
7995
val path = new Path(f.getCanonicalPath, file)
96+
assert(file.endsWith(getParquetFileExtension(codec)))
8097
val in = HadoopInputFile.fromPath(path, spark.sessionState.newHadoopConf())
8198
Utils.tryWithResource(ParquetFileReader.open(in)) {
8299
reader =>

cpp/velox/substrait/SubstraitToVeloxPlan.cc

+35-3
Original file line numberDiff line numberDiff line change
@@ -493,13 +493,43 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
493493
}
494494
}
495495

496+
std::string makeUuid() {
497+
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
498+
}
499+
500+
std::string compressionFileNameSuffix(common::CompressionKind kind) {
501+
switch (static_cast<int32_t>(kind)) {
502+
case common::CompressionKind_ZLIB:
503+
return ".zlib";
504+
case common::CompressionKind_SNAPPY:
505+
return ".snappy";
506+
case common::CompressionKind_LZO:
507+
return ".lzo";
508+
case common::CompressionKind_ZSTD:
509+
return ".zstd";
510+
case common::CompressionKind_LZ4:
511+
return ".lz4";
512+
case common::CompressionKind_GZIP:
513+
return ".gz";
514+
case common::CompressionKind_NONE:
515+
default:
516+
return "";
517+
}
518+
}
519+
496520
std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
497521
const std::string& targetDirectory,
522+
dwio::common::FileFormat fileFormat,
523+
common::CompressionKind compression,
498524
const std::optional<std::string>& writeDirectory = std::nullopt,
499525
const connector::hive::LocationHandle::TableType& tableType =
500526
connector::hive::LocationHandle::TableType::kExisting) {
527+
std::string targetFileName = "";
528+
if (fileFormat == dwio::common::FileFormat::PARQUET) {
529+
targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), compressionFileNameSuffix(compression), ".parquet");
530+
}
501531
return std::make_shared<connector::hive::LocationHandle>(
502-
targetDirectory, writeDirectory.value_or(targetDirectory), tableType);
532+
targetDirectory, writeDirectory.value_or(targetDirectory), tableType, targetFileName);
503533
}
504534

505535
std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandle(
@@ -615,6 +645,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
615645

616646
// Do not hard-code connector ID and allow for connectors other than Hive.
617647
static const std::string kHiveConnectorId = "test-hive";
648+
// Currently only support parquet format.
649+
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
618650

619651
return std::make_shared<core::TableWriteNode>(
620652
nextPlanNodeId(),
@@ -628,8 +660,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
628660
inputType->children(),
629661
partitionedKey,
630662
nullptr /*bucketProperty*/,
631-
makeLocationHandle(writePath),
632-
dwio::common::FileFormat::PARQUET, // Currently only support parquet format.
663+
makeLocationHandle(writePath, fileFormat, compressionCodec),
664+
fileFormat,
633665
compressionCodec)),
634666
(!partitionedKey.empty()),
635667
exec::TableWriteTraits::outputType(nullptr),

0 commit comments

Comments
 (0)