Skip to content

Commit f0381a4

Browse files
committed
Add TableWriterBuilder::insertHandle() and handling.
1 parent f73b7b1 commit f0381a4

File tree

2 files changed

+48
-35
lines changed

2 files changed

+48
-35
lines changed

velox/exec/tests/utils/PlanBuilder.cpp

+39-35
Original file line numberDiff line numberDiff line change
@@ -234,46 +234,50 @@ core::PlanNodePtr PlanBuilder::TableWriterBuilder::build(core::PlanNodeId id) {
234234
// upstream operator.
235235
auto outputType = outputType_ ? outputType_ : upstreamNode->outputType();
236236

237-
// Create column handles.
238-
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
239-
columnHandles;
240-
for (auto i = 0; i < outputType->size(); ++i) {
241-
const auto column = outputType->nameOf(i);
242-
const bool isPartitionKey =
243-
std::find(partitionBy_.begin(), partitionBy_.end(), column) !=
244-
partitionBy_.end();
245-
columnHandles.push_back(std::make_shared<connector::hive::HiveColumnHandle>(
246-
column,
247-
isPartitionKey
248-
? connector::hive::HiveColumnHandle::ColumnType::kPartitionKey
249-
: connector::hive::HiveColumnHandle::ColumnType::kRegular,
250-
outputType->childAt(i),
251-
outputType->childAt(i)));
252-
}
237+
// If insertHandle_ is not specified, build a HiveInsertTableHandle along with
238+
// columnHandles, bucketProperty and locationHandle.
239+
if (!insertHandle_) {
240+
// Create column handles.
241+
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
242+
columnHandles;
243+
for (auto i = 0; i < outputType->size(); ++i) {
244+
const auto column = outputType->nameOf(i);
245+
const bool isPartitionKey =
246+
std::find(partitionBy_.begin(), partitionBy_.end(), column) !=
247+
partitionBy_.end();
248+
columnHandles.push_back(std::make_shared<connector::hive::HiveColumnHandle>(
249+
column,
250+
isPartitionKey
251+
? connector::hive::HiveColumnHandle::ColumnType::kPartitionKey
252+
: connector::hive::HiveColumnHandle::ColumnType::kRegular,
253+
outputType->childAt(i),
254+
outputType->childAt(i)));
255+
}
253256

254-
auto locationHandle = std::make_shared<connector::hive::LocationHandle>(
255-
outputDirectoryPath_,
256-
outputDirectoryPath_,
257-
connector::hive::LocationHandle::TableType::kNew,
258-
outputFileName_);
257+
auto locationHandle = std::make_shared<connector::hive::LocationHandle>(
258+
outputDirectoryPath_,
259+
outputDirectoryPath_,
260+
connector::hive::LocationHandle::TableType::kNew,
261+
outputFileName_);
259262

260-
std::shared_ptr<HiveBucketProperty> bucketProperty;
261-
if (bucketCount_ != 0) {
262-
bucketProperty =
263-
buildHiveBucketProperty(outputType, bucketCount_, bucketedBy_, sortBy_);
264-
}
263+
std::shared_ptr<HiveBucketProperty> bucketProperty;
264+
if (bucketCount_ != 0) {
265+
bucketProperty =
266+
buildHiveBucketProperty(outputType, bucketCount_, bucketedBy_, sortBy_);
267+
}
265268

266-
auto hiveHandle = std::make_shared<connector::hive::HiveInsertTableHandle>(
267-
columnHandles,
268-
locationHandle,
269-
fileFormat_,
270-
bucketProperty,
271-
compressionKind_,
272-
serdeParameters_,
273-
options_);
269+
auto hiveHandle = std::make_shared<connector::hive::HiveInsertTableHandle>(
270+
columnHandles,
271+
locationHandle,
272+
fileFormat_,
273+
bucketProperty,
274+
compressionKind_,
275+
serdeParameters_,
276+
options_);
274277

275-
auto insertHandle =
278+
insertHandle_ =
276279
std::make_shared<core::InsertTableHandle>(connectorId_, hiveHandle);
280+
}
277281

278282
std::shared_ptr<core::AggregationNode> aggregationNode;
279283
if (!aggregates_.empty()) {

velox/exec/tests/utils/PlanBuilder.h

+9
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,14 @@ class PlanBuilder {
347347
return *this;
348348
}
349349

350+
/// @param insertHandle TableInsertHandle (optional). Other builder arguments such as
351+
/// the `connectorId`, `outputDirectoryPath`, `fileFormat` and so on will be ignored.
352+
TableWriterBuilder& insertHandle(
353+
std::shared_ptr<core::InsertTableHandle> insertHandle) {
354+
insertHandle_ = std::move(insertHandle);
355+
return *this;
356+
}
357+
350358
/// @param partitionBy Specifies the partition key columns.
351359
TableWriterBuilder& partitionBy(std::vector<std::string> partitionBy) {
352360
partitionBy_ = std::move(partitionBy);
@@ -423,6 +431,7 @@ class PlanBuilder {
423431
std::string outputDirectoryPath_;
424432
std::string outputFileName_;
425433
std::string connectorId_{kHiveDefaultConnectorId};
434+
shared_ptr<core::InsertTableHandle> insertHandle_;
426435

427436
std::vector<std::string> partitionBy_;
428437
int32_t bucketCount_{0};

0 commit comments

Comments
 (0)