Skip to content

Commit 67b025c

Browse files
committed
fix(iceberg): Date partition value parse issue
1 parent 54c6d9e commit 67b025c

File tree

8 files changed

+117
-23
lines changed

8 files changed

+117
-23
lines changed

velox/connectors/hive/HiveConnectorUtil.cpp

+13-5
Original file line numberDiff line numberDiff line change
@@ -632,14 +632,20 @@ void configureRowReaderOptions(
632632
namespace {
633633

634634
bool applyPartitionFilter(
635+
SplitReader::TableFormat tableFormat,
635636
const TypePtr& type,
636637
const std::string& partitionValue,
637638
common::Filter* filter) {
638639
if (type->isDate()) {
639-
const auto result = util::fromDateString(
640-
StringView(partitionValue), util::ParseMode::kPrestoCast);
641-
VELOX_CHECK(!result.hasError());
642-
return applyFilter(*filter, result.value());
640+
int32_t result = 0;
641+
// Iceberg partition values are already in daysSinceEpoch, no need to
642+
// convert.
643+
if (tableFormat == SplitReader::TableFormat::kIceberg) {
644+
result = std::stoi(partitionValue);
645+
} else {
646+
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
647+
}
648+
return applyFilter(*filter, result);
643649
}
644650

645651
switch (type->kind()) {
@@ -681,7 +687,8 @@ bool testFilters(
681687
const std::unordered_map<std::string, std::optional<std::string>>&
682688
partitionKeys,
683689
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
684-
partitionKeysHandle) {
690+
partitionKeysHandle,
691+
SplitReader::TableFormat tableFormat) {
685692
const auto totalRows = reader->numberOfRows();
686693
const auto& fileTypeWithId = reader->typeWithId();
687694
const auto& rowType = reader->rowType();
@@ -699,6 +706,7 @@ bool testFilters(
699706

700707
// This is a non-null partition key
701708
return applyPartitionFilter(
709+
tableFormat,
702710
handlesIter->second->dataType(),
703711
iter->second.value(),
704712
child->filter());

velox/connectors/hive/HiveConnectorUtil.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "velox/connectors/Connector.h"
2222
#include "velox/connectors/hive/FileHandle.h"
23+
#include "velox/connectors/hive/SplitReader.h"
2324
#include "velox/dwio/common/BufferedInput.h"
2425
#include "velox/dwio/common/Reader.h"
2526

@@ -95,7 +96,8 @@ bool testFilters(
9596
const std::unordered_map<std::string, std::optional<std::string>>&
9697
partitionKey,
9798
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
98-
partitionKeysHandle);
99+
partitionKeysHandle,
100+
SplitReader::TableFormat tableFormat = SplitReader::TableFormat::kHive);
99101

100102
std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
101103
const FileHandle& fileHandle,

velox/connectors/hive/SplitReader.cpp

+22-7
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,23 @@ VectorPtr newConstantFromString(
3535
const std::optional<std::string>& value,
3636
vector_size_t size,
3737
velox::memory::MemoryPool* pool,
38-
const std::string& sessionTimezone) {
38+
const std::string& sessionTimezone,
39+
const std::optional<SplitReader::TableFormat>& tableFormat) {
3940
using T = typename TypeTraits<kind>::NativeType;
4041
if (!value.has_value()) {
4142
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
4243
}
4344

4445
if (type->isDate()) {
45-
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
46+
int32_t days = 0;
47+
// For Iceberg, the date partition values are already in daysSinceEpoch
48+
// form.
49+
if (tableFormat.has_value() &&
50+
tableFormat.value() == SplitReader::TableFormat::kIceberg) {
51+
days = std::stoi(value.value());
52+
} else {
53+
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
54+
}
4655
return std::make_shared<ConstantVector<int32_t>>(
4756
pool, size, false, type, std::move(days));
4857
}
@@ -101,7 +110,8 @@ std::unique_ptr<SplitReader> SplitReader::create(
101110
ioStats,
102111
fileHandleFactory,
103112
executor,
104-
scanSpec));
113+
scanSpec,
114+
TableFormat::kHive));
105115
}
106116
}
107117

@@ -116,12 +126,14 @@ SplitReader::SplitReader(
116126
const std::shared_ptr<io::IoStatistics>& ioStats,
117127
FileHandleFactory* fileHandleFactory,
118128
folly::Executor* executor,
119-
const std::shared_ptr<common::ScanSpec>& scanSpec)
129+
const std::shared_ptr<common::ScanSpec>& scanSpec,
130+
TableFormat tableFormat)
120131
: hiveSplit_(hiveSplit),
121132
hiveTableHandle_(hiveTableHandle),
122133
partitionKeys_(partitionKeys),
123134
connectorQueryCtx_(connectorQueryCtx),
124135
hiveConfig_(hiveConfig),
136+
tableFormat_(tableFormat),
125137
readerOutputType_(readerOutputType),
126138
ioStats_(ioStats),
127139
fileHandleFactory_(fileHandleFactory),
@@ -272,7 +284,8 @@ bool SplitReader::filterOnStats(
272284
baseReader_.get(),
273285
hiveSplit_->filePath,
274286
hiveSplit_->partitionKeys,
275-
*partitionKeys_)) {
287+
*partitionKeys_,
288+
tableFormat_)) {
276289
return true;
277290
}
278291
++runtimeStats.skippedSplits;
@@ -335,7 +348,8 @@ std::vector<TypePtr> SplitReader::adaptColumns(
335348
iter->second,
336349
1,
337350
connectorQueryCtx_->memoryPool(),
338-
connectorQueryCtx_->sessionTimezone());
351+
connectorQueryCtx_->sessionTimezone(),
352+
{});
339353
childSpec->setConstantValue(constant);
340354
} else if (
341355
childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) {
@@ -389,7 +403,8 @@ void SplitReader::setPartitionValue(
389403
value,
390404
1,
391405
connectorQueryCtx_->memoryPool(),
392-
connectorQueryCtx_->sessionTimezone());
406+
connectorQueryCtx_->sessionTimezone(),
407+
tableFormat_);
393408
spec->setConstantValue(constant);
394409
}
395410

velox/connectors/hive/SplitReader.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class HiveConfig;
5353

5454
class SplitReader {
5555
public:
56+
enum class TableFormat { kHive, kIceberg };
57+
5658
static std::unique_ptr<SplitReader> create(
5759
const std::shared_ptr<hive::HiveConnectorSplit>& hiveSplit,
5860
const std::shared_ptr<const HiveTableHandle>& hiveTableHandle,
@@ -109,7 +111,8 @@ class SplitReader {
109111
const std::shared_ptr<io::IoStatistics>& ioStats,
110112
FileHandleFactory* fileHandleFactory,
111113
folly::Executor* executor,
112-
const std::shared_ptr<common::ScanSpec>& scanSpec);
114+
const std::shared_ptr<common::ScanSpec>& scanSpec,
115+
TableFormat tableFormat);
113116

114117
/// Create the dwio::common::Reader object baseReader_, which will be used to
115118
/// read the data file's metadata and schema
@@ -154,6 +157,7 @@ class SplitReader {
154157
std::shared_ptr<HiveColumnHandle>>* const partitionKeys_;
155158
const ConnectorQueryCtx* connectorQueryCtx_;
156159
const std::shared_ptr<const HiveConfig> hiveConfig_;
160+
TableFormat tableFormat_;
157161

158162
const RowTypePtr readerOutputType_;
159163
const std::shared_ptr<io::IoStatistics> ioStats_;

velox/connectors/hive/iceberg/IcebergSplitReader.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ IcebergSplitReader::IcebergSplitReader(
4646
ioStats,
4747
fileHandleFactory,
4848
executor,
49-
scanSpec),
49+
scanSpec,
50+
SplitReader::TableFormat::kIceberg),
5051
baseReadOffset_(0),
5152
splitOffset_(0),
5253
deleteBitmap_(nullptr),

velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp

+54-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include "velox/common/base/tests/GTestUtils.h"
1718
#include "velox/common/file/FileSystems.h"
1819
#include "velox/connectors/hive/HiveConnectorSplit.h"
1920
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
@@ -225,6 +226,35 @@ class HiveIcebergTest : public HiveConnectorTestBase {
225226
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
226227
}
227228

229+
void assertQuery(
230+
RowTypePtr rowType,
231+
const std::vector<RowVectorPtr>& dataVectors,
232+
const std::string duckDbSql,
233+
const std::unordered_map<std::string, std::optional<std::string>>
234+
partitionKeys = {},
235+
const std::vector<std::string> filters = {}) {
236+
VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty");
237+
auto dataFilePath = TempFilePath::create();
238+
239+
writeToFile(
240+
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
241+
std::vector<std::shared_ptr<ConnectorSplit>> splits;
242+
splits.emplace_back(
243+
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));
244+
245+
std::unordered_set<std::string> partitionColumns;
246+
247+
for (auto partitionKey : partitionKeys) {
248+
partitionColumns.insert(partitionKey.first);
249+
}
250+
251+
auto plan =
252+
PlanBuilder(pool_.get())
253+
.tableScan(rowType, filters, "", nullptr, {}, partitionColumns)
254+
.planNode();
255+
HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql);
256+
}
257+
228258
const static int rowCount = 20000;
229259

230260
private:
@@ -337,8 +367,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {
337367

338368
std::shared_ptr<ConnectorSplit> makeIcebergSplit(
339369
const std::string& dataFilePath,
340-
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
341-
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
370+
const std::vector<IcebergDeleteFile>& deleteFiles = {},
371+
const std::unordered_map<std::string, std::optional<std::string>>
372+
partitionKeys = {}) {
342373
std::unordered_map<std::string, std::string> customSplitInfo;
343374
customSplitInfo["table_format"] = "hive-iceberg";
344375

@@ -660,4 +691,25 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
660691
assertMultipleSplits({}, 10, 3);
661692
}
662693

694+
TEST_F(HiveIcebergTest, testPartitionedRead) {
695+
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
696+
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
697+
// Iceberg API sets partition values for dates to daysSinceEpoch, so
698+
// in velox, we do not need to convert it to days.
699+
// Date = 2018-04-06, daysSinceEpoch = 17627
700+
partitionKeys["ds"] = "17627";
701+
702+
std::vector<RowVectorPtr> dataVectors;
703+
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){1});
704+
VectorPtr ds = makeFlatVector<int32_t>((std::vector<int32_t>){17627});
705+
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));
706+
707+
assertQuery(
708+
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, {});
709+
710+
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
711+
assertQuery(
712+
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, filters);
713+
}
714+
663715
} // namespace facebook::velox::connector::hive::iceberg

velox/exec/tests/utils/PlanBuilder.cpp

+8-5
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,16 @@ PlanBuilder& PlanBuilder::tableScan(
7474
const RowTypePtr& dataColumns,
7575
const std::unordered_map<
7676
std::string,
77-
std::shared_ptr<connector::ColumnHandle>>& assignments) {
77+
std::shared_ptr<connector::ColumnHandle>>& assignments,
78+
const std::unordered_set<std::string>& partitionKeys) {
7879
return TableScanBuilder(*this)
7980
.outputType(outputType)
8081
.assignments(assignments)
8182
.subfieldFilters(subfieldFilters)
8283
.remainingFilter(remainingFilter)
8384
.dataColumns(dataColumns)
8485
.assignments(assignments)
86+
.partitionKeys(partitionKeys)
8587
.endTableScan();
8688
}
8789

@@ -170,13 +172,14 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
170172
}
171173

172174
if (!hasAssignments) {
175+
auto columnType = HiveColumnHandle::ColumnType::kRegular;
176+
if (partitionKeys_.count(name) > 0) {
177+
columnType = HiveColumnHandle::ColumnType::kPartitionKey;
178+
}
173179
assignments_.insert(
174180
{name,
175181
std::make_shared<HiveColumnHandle>(
176-
hiveColumnName,
177-
HiveColumnHandle::ColumnType::kRegular,
178-
type,
179-
type)});
182+
hiveColumnName, columnType, type, type)});
180183
}
181184
}
182185

velox/exec/tests/utils/PlanBuilder.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,16 @@ class PlanBuilder {
131131
/// you define the output types only. See 'missingColumns' test in
132132
/// 'TableScanTest'.
133133
/// @param assignments Optional ColumnHandles.
134+
/// @param partitionKeys Optional partition keys.
134135
PlanBuilder& tableScan(
135136
const RowTypePtr& outputType,
136137
const std::vector<std::string>& subfieldFilters = {},
137138
const std::string& remainingFilter = "",
138139
const RowTypePtr& dataColumns = nullptr,
139140
const std::unordered_map<
140141
std::string,
141-
std::shared_ptr<connector::ColumnHandle>>& assignments = {});
142+
std::shared_ptr<connector::ColumnHandle>>& assignments = {},
143+
const std::unordered_set<std::string>& partitionKeys = {});
142144

143145
/// Add a TableScanNode to scan a Hive table.
144146
///
@@ -277,6 +279,12 @@ class PlanBuilder {
277279
return *this;
278280
}
279281

282+
TableScanBuilder& partitionKeys(
283+
std::unordered_set<std::string> partitionKeys) {
284+
partitionKeys_ = std::move(partitionKeys);
285+
return *this;
286+
}
287+
280288
/// Stop the TableScanBuilder.
281289
PlanBuilder& endTableScan() {
282290
planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId());
@@ -298,6 +306,7 @@ class PlanBuilder {
298306
std::shared_ptr<connector::ConnectorTableHandle> tableHandle_;
299307
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
300308
assignments_;
309+
std::unordered_set<std::string> partitionKeys_;
301310
};
302311

303312
/// Start a TableScanBuilder.

0 commit comments

Comments
 (0)