Skip to content

Commit 121b230

Browse files
arhimondrfacebook-github-bot
authored andcommitted
feat: Avoid small batches in Exchange (facebookincubator#12010)
Summary: Pull Request resolved: facebookincubator#12010 Prevent exchange client from unblocking to early. Unblocking to early impedes effectiveness of page merging. When the cost of creating a vector is high (for example for data sets with high number of columns) creating small pages can make queries significantly less efficient. For example it was observed that when network is congested and Exchange buffers are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421 Reviewed By: xiaoxmeng Differential Revision: D67615570 fbshipit-source-id: a9936d9a95d43045f3bdabc8db5a4638bfef177c
1 parent 5e238f4 commit 121b230

13 files changed

+524
-71
lines changed

velox/core/QueryConfig.h

+14
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,15 @@ class QueryConfig {
113113
static constexpr const char* kMaxMergeExchangeBufferSize =
114114
"merge_exchange.max_buffer_size";
115115

116+
/// The minimum number of bytes to accumulate in the ExchangeQueue
117+
/// before unblocking a consumer. This is used to avoid creating tiny
118+
/// batches which may have a negative impact on performance when the
119+
/// cost of creating vectors is high (for example, when there are many
120+
/// columns). To avoid latency degradation, the exchange client unblocks a
121+
/// consumer when 1% of the data size observed so far is accumulated.
122+
static constexpr const char* kMinExchangeOutputBatchBytes =
123+
"min_exchange_output_batch_bytes";
124+
116125
static constexpr const char* kMaxPartialAggregationMemory =
117126
"max_partial_aggregation_memory";
118127

@@ -594,6 +603,11 @@ class QueryConfig {
594603
return get<uint64_t>(kMaxMergeExchangeBufferSize, kDefault);
595604
}
596605

606+
uint64_t minExchangeOutputBatchBytes() const {
607+
static constexpr uint64_t kDefault = 2UL << 20;
608+
return get<uint64_t>(kMinExchangeOutputBatchBytes, kDefault);
609+
}
610+
597611
uint64_t preferredOutputBatchBytes() const {
598612
static constexpr uint64_t kDefault = 10UL << 20;
599613
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);

velox/docs/configs.rst

+9-2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ Generic Configuration
8989
- Size of buffer in the exchange client that holds data fetched from other nodes before it is processed.
9090
A larger buffer can increase network throughput for larger clusters and thus decrease query processing time
9191
at the expense of reducing the amount of memory available for other usage.
92+
* - min_exchange_output_batch_bytes
93+
- integer
94+
- 2MB
95+
- The minimum number of bytes to accumulate in the ExchangeQueue before unblocking a consumer. This is used to avoid
96+
creating tiny batches which may have a negative impact on performance when the cost of creating vectors is high
97+
(for example, when there are many columns). To avoid latency degradation, the exchange client unblocks a consumer
98+
when 1% of the data size observed so far is accumulated.
9299
* - merge_exchange.max_buffer_size
93100
- integer
94101
- 128MB
@@ -670,13 +677,13 @@ Each query can override the config by setting corresponding query session proper
670677
- Default AWS secret key to use.
671678
* - hive.s3.endpoint
672679
- string
673-
-
680+
-
674681
- The S3 storage endpoint server. This can be used to connect to an S3-compatible storage system instead of AWS.
675682
* - hive.s3.endpoint.region
676683
- string
677684
- us-east-1
678685
- The S3 storage endpoint server region. Default is set by the AWS SDK. If not configured, region will be attempted
679-
to be parsed from the hive.s3.endpoint value.
686+
to be parsed from the hive.s3.endpoint value.
680687
* - hive.s3.path-style-access
681688
- bool
682689
- false

velox/exec/Exchange.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Exchange::Exchange(
5252
operatorCtx_->driverCtx()->queryConfig(),
5353
serdeKind_)},
5454
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
55+
driverId_{driverCtx->driverId},
5556
exchangeClient_{std::move(exchangeClient)} {}
5657

5758
void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
@@ -111,8 +112,8 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) {
111112
}
112113

113114
ContinueFuture dataFuture;
114-
currentPages_ =
115-
exchangeClient_->next(preferredOutputBatchBytes_, &atEnd_, &dataFuture);
115+
currentPages_ = exchangeClient_->next(
116+
driverId_, preferredOutputBatchBytes_, &atEnd_, &dataFuture);
116117
if (!currentPages_.empty() || atEnd_) {
117118
if (atEnd_ && noMoreSplits_) {
118119
const auto numSplits = stats_.rlock()->numSplits;

velox/exec/Exchange.h

+2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ class Exchange : public SourceOperator {
8989
/// and passing these to ExchangeClient.
9090
const bool processSplits_;
9191

92+
const int driverId_;
93+
9294
bool noMoreSplits_ = false;
9395

9496
std::shared_ptr<ExchangeClient> exchangeClient_;

velox/exec/ExchangeClient.cpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,14 @@ folly::F14FastMap<std::string, RuntimeMetric> ExchangeClient::stats() const {
118118
return stats;
119119
}
120120

121-
std::vector<std::unique_ptr<SerializedPage>>
122-
ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
121+
std::vector<std::unique_ptr<SerializedPage>> ExchangeClient::next(
122+
int consumerId,
123+
uint32_t maxBytes,
124+
bool* atEnd,
125+
ContinueFuture* future) {
123126
std::vector<RequestSpec> requestSpecs;
124127
std::vector<std::unique_ptr<SerializedPage>> pages;
128+
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
125129
{
126130
std::lock_guard<std::mutex> l(queue_->mutex());
127131
if (closed_) {
@@ -130,7 +134,8 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
130134
}
131135

132136
*atEnd = false;
133-
pages = queue_->dequeueLocked(maxBytes, atEnd, future);
137+
pages = queue_->dequeueLocked(
138+
consumerId, maxBytes, atEnd, future, &stalePromise);
134139
if (*atEnd) {
135140
return pages;
136141
}
@@ -143,6 +148,9 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
143148
}
144149

145150
// Outside of lock
151+
if (stalePromise.valid()) {
152+
stalePromise.setValue();
153+
}
146154
request(std::move(requestSpecs));
147155
return pages;
148156
}

velox/exec/ExchangeClient.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,18 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
3333
std::string taskId,
3434
int destination,
3535
int64_t maxQueuedBytes,
36+
int32_t numberOfConsumers,
37+
uint64_t minOutputBatchBytes,
3638
memory::MemoryPool* pool,
3739
folly::Executor* executor)
3840
: taskId_{std::move(taskId)},
3941
destination_(destination),
4042
maxQueuedBytes_{maxQueuedBytes},
4143
pool_(pool),
4244
executor_(executor),
43-
queue_(std::make_shared<ExchangeQueue>()) {
45+
queue_(std::make_shared<ExchangeQueue>(
46+
numberOfConsumers,
47+
minOutputBatchBytes)) {
4448
VELOX_CHECK_NOT_NULL(pool_);
4549
VELOX_CHECK_NOT_NULL(executor_);
4650
// NOTE: the executor is used to run async response callback from the
@@ -91,7 +95,7 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
9195
/// The data may be compressed, in which case 'maxBytes' applies to compressed
9296
/// size.
9397
std::vector<std::unique_ptr<SerializedPage>>
94-
next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
98+
next(int consumerId, uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
9599

96100
std::string toString() const;
97101

velox/exec/ExchangeQueue.cpp

+50-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616
#include "velox/exec/ExchangeQueue.h"
17+
#include <algorithm>
1718

1819
namespace facebook::velox::exec {
1920

@@ -64,6 +65,15 @@ void ExchangeQueue::close() {
6465
clearPromises(promises);
6566
}
6667

68+
int64_t ExchangeQueue::minOutputBatchBytesLocked() const {
69+
// always allow to unblock when at end
70+
if (atEnd_) {
71+
return 0;
72+
}
73+
// At most 1% of received bytes so far to minimize latency for small exchanges
74+
return std::min<int64_t>(minOutputBatchBytes_, receivedBytes_ / 100);
75+
}
76+
6777
void ExchangeQueue::enqueueLocked(
6878
std::unique_ptr<SerializedPage>&& page,
6979
std::vector<ContinuePromise>& promises) {
@@ -86,17 +96,45 @@ void ExchangeQueue::enqueueLocked(
8696
receivedBytes_ += page->size();
8797

8898
queue_.push_back(std::move(page));
89-
if (!promises_.empty()) {
99+
const auto minBatchSize = minOutputBatchBytesLocked();
100+
while (!promises_.empty()) {
101+
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
102+
const int32_t unblockedConsumers = numberOfConsumers_ - promises_.size();
103+
const int64_t unasignedBytes =
104+
totalBytes_ - unblockedConsumers * minBatchSize;
105+
if (unasignedBytes < minBatchSize) {
106+
break;
107+
}
90108
// Resume one of the waiting drivers.
91-
promises.push_back(std::move(promises_.back()));
92-
promises_.pop_back();
109+
auto it = promises_.begin();
110+
promises.push_back(std::move(it->second));
111+
promises_.erase(it);
93112
}
94113
}
95114

115+
void ExchangeQueue::addPromiseLocked(
116+
int consumerId,
117+
ContinueFuture* future,
118+
ContinuePromise* stalePromise) {
119+
ContinuePromise promise{"ExchangeQueue::dequeue"};
120+
*future = promise.getSemiFuture();
121+
auto it = promises_.find(consumerId);
122+
if (it != promises_.end()) {
123+
// resolve stale promises outside the lock to avoid broken promises
124+
*stalePromise = std::move(it->second);
125+
it->second = std::move(promise);
126+
} else {
127+
promises_[consumerId] = std::move(promise);
128+
}
129+
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
130+
}
131+
96132
std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
133+
int consumerId,
97134
uint32_t maxBytes,
98135
bool* atEnd,
99-
ContinueFuture* future) {
136+
ContinueFuture* future,
137+
ContinuePromise* stalePromise) {
100138
VELOX_CHECK_NOT_NULL(future);
101139
if (!error_.empty()) {
102140
*atEnd = true;
@@ -105,15 +143,21 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
105143

106144
*atEnd = false;
107145

146+
// If we don't have enough bytes to return, we wait for more data to be
147+
// available
148+
if (totalBytes_ < minOutputBatchBytesLocked()) {
149+
addPromiseLocked(consumerId, future, stalePromise);
150+
return {};
151+
}
152+
108153
std::vector<std::unique_ptr<SerializedPage>> pages;
109154
uint32_t pageBytes = 0;
110155
for (;;) {
111156
if (queue_.empty()) {
112157
if (atEnd_) {
113158
*atEnd = true;
114159
} else if (pages.empty()) {
115-
promises_.emplace_back("ExchangeQueue::dequeue");
116-
*future = promises_.back().getSemiFuture();
160+
addPromiseLocked(consumerId, future, stalePromise);
117161
}
118162
return pages;
119163
}

velox/exec/ExchangeQueue.h

+46-3
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ class SerializedPage {
8181
/// for input.
8282
class ExchangeQueue {
8383
public:
84+
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
85+
explicit ExchangeQueue() : ExchangeQueue(1, 0) {}
86+
#endif
87+
88+
explicit ExchangeQueue(
89+
int32_t numberOfConsumers,
90+
uint64_t minOutputBatchBytes)
91+
: numberOfConsumers_{numberOfConsumers},
92+
minOutputBatchBytes_{minOutputBatchBytes} {
93+
VELOX_CHECK_GE(numberOfConsumers, 1);
94+
}
95+
8496
~ExchangeQueue() {
8597
clearAllPromises();
8698
}
@@ -119,8 +131,20 @@ class ExchangeQueue {
119131
///
120132
/// The data may be compressed, in which case 'maxBytes' applies to compressed
121133
/// size.
134+
std::vector<std::unique_ptr<SerializedPage>> dequeueLocked(
135+
int consumerId,
136+
uint32_t maxBytes,
137+
bool* atEnd,
138+
ContinueFuture* future,
139+
ContinuePromise* stalePromise);
140+
141+
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
122142
std::vector<std::unique_ptr<SerializedPage>>
123-
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
143+
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
144+
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
145+
return dequeueLocked(0, maxBytes, atEnd, future, &stalePromise);
146+
}
147+
#endif
124148

125149
/// Returns the total bytes held by SerializedPages in 'this'.
126150
int64_t totalBytes() const {
@@ -166,6 +190,11 @@ class ExchangeQueue {
166190
return {};
167191
}
168192

193+
void addPromiseLocked(
194+
int consumerId,
195+
ContinueFuture* future,
196+
ContinuePromise* stalePromise);
197+
169198
void clearAllPromises() {
170199
std::vector<ContinuePromise> promises;
171200
{
@@ -176,7 +205,14 @@ class ExchangeQueue {
176205
}
177206

178207
std::vector<ContinuePromise> clearAllPromisesLocked() {
179-
return std::move(promises_);
208+
std::vector<ContinuePromise> promises(promises_.size());
209+
auto it = promises_.begin();
210+
while (it != promises_.end()) {
211+
promises.push_back(std::move(it->second));
212+
it = promises_.erase(it);
213+
}
214+
VELOX_CHECK(promises_.empty());
215+
return promises;
180216
}
181217

182218
static void clearPromises(std::vector<ContinuePromise>& promises) {
@@ -185,14 +221,21 @@ class ExchangeQueue {
185221
}
186222
}
187223

224+
int64_t minOutputBatchBytesLocked() const;
225+
226+
const int32_t numberOfConsumers_;
227+
const uint64_t minOutputBatchBytes_;
228+
188229
int numCompleted_{0};
189230
int numSources_{0};
190231
bool noMoreSources_{false};
191232
bool atEnd_{false};
192233

193234
std::mutex mutex_;
194235
std::deque<std::unique_ptr<SerializedPage>> queue_;
195-
std::vector<ContinuePromise> promises_;
236+
// The map from consumer id to the waiting promise
237+
folly::F14FastMap<int, ContinuePromise> promises_;
238+
196239
// When set, all promises will be realized and the next dequeue will
197240
// throw an exception with this message.
198241
std::string error_;

velox/exec/MergeSource.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ class MergeExchangeSource : public MergeSource {
128128
mergeExchange->taskId(),
129129
destination,
130130
maxQueuedBytes,
131+
1,
132+
// Deliver right away to avoid blocking other sources
133+
0,
131134
pool,
132135
executor)) {
133136
client_->addRemoteTaskId(taskId);
@@ -146,7 +149,7 @@ class MergeExchangeSource : public MergeSource {
146149
}
147150

148151
if (!currentPage_) {
149-
auto pages = client_->next(1, &atEnd_, future);
152+
auto pages = client_->next(0, 1, &atEnd_, future);
150153
VELOX_CHECK_LE(pages.size(), 1);
151154
currentPage_ = pages.empty() ? nullptr : std::move(pages.front());
152155

velox/exec/Task.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,8 @@ void Task::initializePartitionOutput() {
970970
// exchange client for each merge source to fetch data as we can't mix
971971
// the data from different sources for merging.
972972
if (auto exchangeNodeId = factory->needsExchangeClient()) {
973-
createExchangeClientLocked(pipeline, exchangeNodeId.value());
973+
createExchangeClientLocked(
974+
pipeline, exchangeNodeId.value(), factory->numDrivers);
974975
}
975976
}
976977
}
@@ -2982,7 +2983,8 @@ bool Task::pauseRequested(ContinueFuture* future) {
29822983

29832984
void Task::createExchangeClientLocked(
29842985
int32_t pipelineId,
2985-
const core::PlanNodeId& planNodeId) {
2986+
const core::PlanNodeId& planNodeId,
2987+
int32_t numberOfConsumers) {
29862988
VELOX_CHECK_NULL(
29872989
getExchangeClientLocked(pipelineId),
29882990
"Exchange client has been created at pipeline: {} for planNode: {}",
@@ -2998,6 +3000,8 @@ void Task::createExchangeClientLocked(
29983000
taskId_,
29993001
destination_,
30003002
queryCtx()->queryConfig().maxExchangeBufferSize(),
3003+
numberOfConsumers,
3004+
queryCtx()->queryConfig().minExchangeOutputBatchBytes(),
30013005
addExchangeClientPool(planNodeId, pipelineId),
30023006
queryCtx()->executor());
30033007
exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]);

velox/exec/Task.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,8 @@ class Task : public std::enable_shared_from_this<Task> {
10031003
// pipeline.
10041004
void createExchangeClientLocked(
10051005
int32_t pipelineId,
1006-
const core::PlanNodeId& planNodeId);
1006+
const core::PlanNodeId& planNodeId,
1007+
int32_t numberOfConsumers);
10071008

10081009
// Get a shared reference to the exchange client with the specified exchange
10091010
// plan node 'planNodeId'. The function returns null if there is no client

0 commit comments

Comments
 (0)