Skip to content

Commit

Permalink
Fix issue with connection lifetime over iterator (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd authored Jan 31, 2025
1 parent aa6448c commit 7232cab
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions src/pantab/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,17 @@ static auto SetSchemaTypeFromHyperType(struct ArrowSchema *schema,
}

struct HyperResultIteratorPrivate {
HyperResultIteratorPrivate(
std::unique_ptr<hyperapi::Result> result,
std::unique_ptr<hyperapi::ChunkedResultIterator> iter)
: result_(std::move(result)), iter_(std::move(iter)) {}

HyperResultIteratorPrivate(hyperapi::HyperProcess process,
hyperapi::Connection connection,
std::unique_ptr<hyperapi::Result> result,
hyperapi::ChunkedResultIterator iter)
: process_(std::move(process)), connection_(std::move(connection)),
result_(std::move(result)), iter_(std::move(iter)) {}

const hyperapi::HyperProcess process_;
hyperapi::Connection connection_;
std::unique_ptr<hyperapi::Result> result_;
std::unique_ptr<hyperapi::ChunkedResultIterator> iter_;
hyperapi::ChunkedResultIterator iter_;
struct ArrowError error_ {};
};

Expand Down Expand Up @@ -512,7 +516,7 @@ static const auto GetNext = [](struct ArrowArrayStream *stream,

auto end = hyperapi::ChunkedResultIterator{*private_data->result_,
hyperapi::IteratorEndTag{}};
if (*private_data->iter_ == end) {
if (private_data->iter_ == end) {
return 0;
}

Expand Down Expand Up @@ -552,7 +556,7 @@ static const auto GetNext = [](struct ArrowArrayStream *stream,
"ArrowArrayStartAppending failed!");
return EINVAL;
}
for (const auto &row : **private_data->iter_) {
for (const auto &row : *private_data->iter_) {
size_t column_idx = 0;
for (const auto &value : row) {
const auto &read_helper = read_helpers[column_idx];
Expand All @@ -565,7 +569,7 @@ static const auto GetNext = [](struct ArrowArrayStream *stream,
return EINVAL;
}
}
++(*private_data->iter_);
++(private_data->iter_);

if (ArrowArrayFinishBuildingDefault(array.get(), nullptr)) {
ArrowErrorSetString(&private_data->error_,
Expand All @@ -591,9 +595,8 @@ auto read_from_hyper_query(
if (!process_params.count("default_database_version"))
process_params["default_database_version"] = "2";

const hyperapi::HyperProcess hyper{
hyperapi::Telemetry::DoNotSendUsageDataToTableau, "",
std::move(process_params)};
hyperapi::HyperProcess hyper{hyperapi::Telemetry::DoNotSendUsageDataToTableau,
"", std::move(process_params)};
hyperapi::Connection connection(hyper.getEndpoint(), path);

if (chunk_size) {
Expand All @@ -604,11 +607,12 @@ auto read_from_hyper_query(
auto hyperResult =
std::make_unique<hyperapi::Result>(connection.executeQuery(query));

auto iter = std::make_unique<hyperapi::ChunkedResultIterator>(
*hyperResult, hyperapi::IteratorBeginTag{});
hyperapi::ChunkedResultIterator iter{*hyperResult,
hyperapi::IteratorBeginTag{}};

auto private_data = gsl::owner<HyperResultIteratorPrivate *>(
new HyperResultIteratorPrivate{std::move(hyperResult), std::move(iter)});
new HyperResultIteratorPrivate{std::move(hyper), std::move(connection),
std::move(hyperResult), std::move(iter)});

auto stream =
gsl::owner<struct ArrowArrayStream *>(new struct ArrowArrayStream);
Expand Down

0 comments on commit 7232cab

Please sign in to comment.