Skip to content

Commit ff840fc

Browse files
committed
fix(iceberg): Date partition value parse issue
1 parent ce273fa commit ff840fc

File tree

8 files changed

+114
-23
lines changed

8 files changed

+114
-23
lines changed

velox/connectors/hive/HiveConnectorUtil.cpp

+13-5
Original file line numberDiff line numberDiff line change
@@ -634,12 +634,18 @@ namespace {
634634
bool applyPartitionFilter(
635635
const TypePtr& type,
636636
const std::string& partitionValue,
637+
bool isPartitionValueDaysSinceEpoch,
637638
common::Filter* filter) {
638639
if (type->isDate()) {
639-
const auto result = util::fromDateString(
640-
StringView(partitionValue), util::ParseMode::kPrestoCast);
641-
VELOX_CHECK(!result.hasError());
642-
return applyFilter(*filter, result.value());
640+
int32_t result = 0;
641+
// Iceberg partition values are already in daysSinceEpoch, no need to
642+
// convert.
643+
if (isPartitionValueDaysSinceEpoch) {
644+
result = folly::to<int32_t>(partitionValue);
645+
} else {
646+
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
647+
}
648+
return applyFilter(*filter, result);
643649
}
644650

645651
switch (type->kind()) {
@@ -681,7 +687,8 @@ bool testFilters(
681687
const std::unordered_map<std::string, std::optional<std::string>>&
682688
partitionKeys,
683689
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
684-
partitionKeysHandle) {
690+
partitionKeysHandle,
691+
bool isPartitionValueDaysSinceEpoch) {
685692
const auto totalRows = reader->numberOfRows();
686693
const auto& fileTypeWithId = reader->typeWithId();
687694
const auto& rowType = reader->rowType();
@@ -701,6 +708,7 @@ bool testFilters(
701708
return applyPartitionFilter(
702709
handlesIter->second->dataType(),
703710
iter->second.value(),
711+
isPartitionValueDaysSinceEpoch,
704712
child->filter());
705713
}
706714
// Column is missing, most likely due to schema evolution. Or it's a

velox/connectors/hive/HiveConnectorUtil.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "velox/connectors/Connector.h"
2222
#include "velox/connectors/hive/FileHandle.h"
23+
#include "velox/connectors/hive/SplitReader.h"
2324
#include "velox/dwio/common/BufferedInput.h"
2425
#include "velox/dwio/common/Reader.h"
2526

@@ -97,7 +98,8 @@ bool testFilters(
9798
const std::unordered_map<std::string, std::optional<std::string>>&
9899
partitionKey,
99100
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
100-
partitionKeysHandle);
101+
partitionKeysHandle,
102+
bool isPartitionValueDaysSinceEpoch = false);
101103

102104
std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
103105
const FileHandle& fileHandle,

velox/connectors/hive/SplitReader.cpp

+21-7
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,22 @@ VectorPtr newConstantFromString(
3535
const std::optional<std::string>& value,
3636
vector_size_t size,
3737
velox::memory::MemoryPool* pool,
38-
const std::string& sessionTimezone) {
38+
const std::string& sessionTimezone,
39+
bool isPartitionValueDaysSinceEpoch) {
3940
using T = typename TypeTraits<kind>::NativeType;
4041
if (!value.has_value()) {
4142
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
4243
}
4344

4445
if (type->isDate()) {
45-
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
46+
int32_t days = 0;
47+
// For Iceberg, the date partition values are already in daysSinceEpoch
48+
// form.
49+
if (isPartitionValueDaysSinceEpoch) {
50+
days = folly::to<int32_t>(value.value());
51+
} else {
52+
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
53+
}
4654
return std::make_shared<ConstantVector<int32_t>>(
4755
pool, size, false, type, std::move(days));
4856
}
@@ -101,7 +109,8 @@ std::unique_ptr<SplitReader> SplitReader::create(
101109
ioStats,
102110
fileHandleFactory,
103111
executor,
104-
scanSpec));
112+
scanSpec,
113+
false));
105114
}
106115
}
107116

@@ -116,12 +125,14 @@ SplitReader::SplitReader(
116125
const std::shared_ptr<io::IoStatistics>& ioStats,
117126
FileHandleFactory* fileHandleFactory,
118127
folly::Executor* executor,
119-
const std::shared_ptr<common::ScanSpec>& scanSpec)
128+
const std::shared_ptr<common::ScanSpec>& scanSpec,
129+
bool isPartitionValueDaysSinceEpoch)
120130
: hiveSplit_(hiveSplit),
121131
hiveTableHandle_(hiveTableHandle),
122132
partitionKeys_(partitionKeys),
123133
connectorQueryCtx_(connectorQueryCtx),
124134
hiveConfig_(hiveConfig),
135+
isPartitionValueDaysSinceEpoch_(isPartitionValueDaysSinceEpoch),
125136
readerOutputType_(readerOutputType),
126137
ioStats_(ioStats),
127138
fileHandleFactory_(fileHandleFactory),
@@ -272,7 +283,8 @@ bool SplitReader::filterOnStats(
272283
baseReader_.get(),
273284
hiveSplit_->filePath,
274285
hiveSplit_->partitionKeys,
275-
*partitionKeys_)) {
286+
*partitionKeys_,
287+
isPartitionValueDaysSinceEpoch_)) {
276288
return true;
277289
}
278290
++runtimeStats.skippedSplits;
@@ -335,7 +347,8 @@ std::vector<TypePtr> SplitReader::adaptColumns(
335347
iter->second,
336348
1,
337349
connectorQueryCtx_->memoryPool(),
338-
connectorQueryCtx_->sessionTimezone());
350+
connectorQueryCtx_->sessionTimezone(),
351+
{});
339352
childSpec->setConstantValue(constant);
340353
} else if (
341354
childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) {
@@ -389,7 +402,8 @@ void SplitReader::setPartitionValue(
389402
value,
390403
1,
391404
connectorQueryCtx_->memoryPool(),
392-
connectorQueryCtx_->sessionTimezone());
405+
connectorQueryCtx_->sessionTimezone(),
406+
isPartitionValueDaysSinceEpoch_);
393407
spec->setConstantValue(constant);
394408
}
395409

velox/connectors/hive/SplitReader.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class SplitReader {
109109
const std::shared_ptr<io::IoStatistics>& ioStats,
110110
FileHandleFactory* fileHandleFactory,
111111
folly::Executor* executor,
112-
const std::shared_ptr<common::ScanSpec>& scanSpec);
112+
const std::shared_ptr<common::ScanSpec>& scanSpec,
113+
bool isPartitionValueDaysSinceEpoch);
113114

114115
/// Create the dwio::common::Reader object baseReader_, which will be used to
115116
/// read the data file's metadata and schema
@@ -154,6 +155,7 @@ class SplitReader {
154155
std::shared_ptr<HiveColumnHandle>>* const partitionKeys_;
155156
const ConnectorQueryCtx* connectorQueryCtx_;
156157
const std::shared_ptr<const HiveConfig> hiveConfig_;
158+
bool isPartitionValueDaysSinceEpoch_;
157159

158160
const RowTypePtr readerOutputType_;
159161
const std::shared_ptr<io::IoStatistics> ioStats_;

velox/connectors/hive/iceberg/IcebergSplitReader.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ IcebergSplitReader::IcebergSplitReader(
4646
ioStats,
4747
fileHandleFactory,
4848
executor,
49-
scanSpec),
49+
scanSpec,
50+
true),
5051
baseReadOffset_(0),
5152
splitOffset_(0),
5253
deleteBitmap_(nullptr),

velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp

+54-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include "velox/common/base/tests/GTestUtils.h"
1718
#include "velox/common/file/FileSystems.h"
1819
#include "velox/connectors/hive/HiveConnectorSplit.h"
1920
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
@@ -225,6 +226,35 @@ class HiveIcebergTest : public HiveConnectorTestBase {
225226
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
226227
}
227228

229+
void assertQuery(
230+
RowTypePtr rowType,
231+
const std::vector<RowVectorPtr>& dataVectors,
232+
const std::string duckDbSql,
233+
const std::unordered_map<std::string, std::optional<std::string>>
234+
partitionKeys = {},
235+
const std::vector<std::string> filters = {}) {
236+
VELOX_CHECK(!duckDbSql.empty(), "DuckDb sql is empty");
237+
auto dataFilePath = TempFilePath::create();
238+
239+
writeToFile(
240+
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
241+
std::vector<std::shared_ptr<ConnectorSplit>> splits;
242+
splits.emplace_back(
243+
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));
244+
245+
std::unordered_set<std::string> partitionColumns;
246+
247+
for (auto partitionKey : partitionKeys) {
248+
partitionColumns.insert(partitionKey.first);
249+
}
250+
251+
auto plan =
252+
PlanBuilder(pool_.get())
253+
.tableScan(rowType, filters, "", nullptr, {}, partitionColumns)
254+
.planNode();
255+
HiveConnectorTestBase::assertQuery(plan, splits, duckDbSql);
256+
}
257+
228258
const static int rowCount = 20000;
229259

230260
private:
@@ -337,8 +367,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {
337367

338368
std::shared_ptr<ConnectorSplit> makeIcebergSplit(
339369
const std::string& dataFilePath,
340-
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
341-
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
370+
const std::vector<IcebergDeleteFile>& deleteFiles = {},
371+
const std::unordered_map<std::string, std::optional<std::string>>
372+
partitionKeys = {}) {
342373
std::unordered_map<std::string, std::string> customSplitInfo;
343374
customSplitInfo["table_format"] = "hive-iceberg";
344375

@@ -660,4 +691,25 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
660691
assertMultipleSplits({}, 10, 3);
661692
}
662693

694+
TEST_F(HiveIcebergTest, testPartitionedRead) {
695+
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
696+
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
697+
// Iceberg API sets partition values for dates to daysSinceEpoch, so
698+
// in velox, we do not need to convert it to days.
699+
// Date = 2018-04-06, daysSinceEpoch = 17627
700+
partitionKeys["ds"] = "17627";
701+
702+
std::vector<RowVectorPtr> dataVectors;
703+
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){1});
704+
VectorPtr ds = makeFlatVector<int32_t>((std::vector<int32_t>){17627});
705+
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));
706+
707+
assertQuery(
708+
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, {});
709+
710+
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
711+
assertQuery(
712+
rowType, dataVectors, "SELECT 1, '2018-04-06'", partitionKeys, filters);
713+
}
714+
663715
} // namespace facebook::velox::connector::hive::iceberg

velox/exec/tests/utils/PlanBuilder.cpp

+8-5
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,16 @@ PlanBuilder& PlanBuilder::tableScan(
7474
const RowTypePtr& dataColumns,
7575
const std::unordered_map<
7676
std::string,
77-
std::shared_ptr<connector::ColumnHandle>>& assignments) {
77+
std::shared_ptr<connector::ColumnHandle>>& assignments,
78+
const std::unordered_set<std::string>& partitionKeys) {
7879
return TableScanBuilder(*this)
7980
.outputType(outputType)
8081
.assignments(assignments)
8182
.subfieldFilters(subfieldFilters)
8283
.remainingFilter(remainingFilter)
8384
.dataColumns(dataColumns)
8485
.assignments(assignments)
86+
.partitionKeys(partitionKeys)
8587
.endTableScan();
8688
}
8789

@@ -170,13 +172,14 @@ core::PlanNodePtr PlanBuilder::TableScanBuilder::build(core::PlanNodeId id) {
170172
}
171173

172174
if (!hasAssignments) {
175+
auto columnType = HiveColumnHandle::ColumnType::kRegular;
176+
if (partitionKeys_.count(name) > 0) {
177+
columnType = HiveColumnHandle::ColumnType::kPartitionKey;
178+
}
173179
assignments_.insert(
174180
{name,
175181
std::make_shared<HiveColumnHandle>(
176-
hiveColumnName,
177-
HiveColumnHandle::ColumnType::kRegular,
178-
type,
179-
type)});
182+
hiveColumnName, columnType, type, type)});
180183
}
181184
}
182185

velox/exec/tests/utils/PlanBuilder.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,16 @@ class PlanBuilder {
131131
/// you define the output types only. See 'missingColumns' test in
132132
/// 'TableScanTest'.
133133
/// @param assignments Optional ColumnHandles.
134+
/// @param partitionKeys Optional partition keys.
134135
PlanBuilder& tableScan(
135136
const RowTypePtr& outputType,
136137
const std::vector<std::string>& subfieldFilters = {},
137138
const std::string& remainingFilter = "",
138139
const RowTypePtr& dataColumns = nullptr,
139140
const std::unordered_map<
140141
std::string,
141-
std::shared_ptr<connector::ColumnHandle>>& assignments = {});
142+
std::shared_ptr<connector::ColumnHandle>>& assignments = {},
143+
const std::unordered_set<std::string>& partitionKeys = {});
142144

143145
/// Add a TableScanNode to scan a Hive table.
144146
///
@@ -277,6 +279,12 @@ class PlanBuilder {
277279
return *this;
278280
}
279281

282+
TableScanBuilder& partitionKeys(
283+
std::unordered_set<std::string> partitionKeys) {
284+
partitionKeys_ = std::move(partitionKeys);
285+
return *this;
286+
}
287+
280288
/// Stop the TableScanBuilder.
281289
PlanBuilder& endTableScan() {
282290
planBuilder_.planNode_ = build(planBuilder_.nextPlanNodeId());
@@ -298,6 +306,7 @@ class PlanBuilder {
298306
std::shared_ptr<connector::ConnectorTableHandle> tableHandle_;
299307
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
300308
assignments_;
309+
std::unordered_set<std::string> partitionKeys_;
301310
};
302311

303312
/// Start a TableScanBuilder.

0 commit comments

Comments
 (0)