Skip to content

Commit 8798a2f

Browse files
xiaoxmengfacebook-github-bot
authored andcommitted
feat: Support prefetch in index lookup join (#12611)
Summary: This PR adds prefetch for index join with query config. The index join operator can prefetch up to the configured prefetch limit to enable: (1) parallel prefetches at backend for parallel execution (1-1) in case of multiple backend shards or (1-2) enable backend to batch multiple requests to improve throughput; (2) pipeline the table scan and index lookup execution in the same driver pipeline. The table scan is sync executed while index lookup is async. This achieve pipelining without relying exchange which might cause non-deterministic execution and Meta internal use case needs deterministic execution for checkpointing. With Meta internal testing, this can achieve 2x throughput improvement (measured in rows per second) with 33% memory overhead with up to 4 batches prefetch. The follow is to add memory based prefetch throttling to integrate with Meta internal ML use case and memory pool wiring to ease performance (memory overhead) analysis Reviewed By: wenqiwooo Differential Revision: D70909786
1 parent 17d5f83 commit 8798a2f

8 files changed

+410
-131
lines changed

velox/core/QueryConfig.h

+9
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,11 @@ class QueryConfig {
511511
static constexpr const char* kThrowExceptionOnDuplicateMapKeys =
512512
"throw_exception_on_duplicate_map_keys";
513513

514+
/// Specifies the max number of input batches to prefetch to do index lookup
515+
/// ahead. If it is zero, then process one input batch at a time.
516+
static constexpr const char* kIndexLookupJoinMaxPrefetchBatches =
517+
"index_lookup_join_max_prefetch_batches";
518+
514519
bool selectiveNimbleReaderEnabled() const {
515520
return get<bool>(kSelectiveNimbleReaderEnabled, false);
516521
}
@@ -932,6 +937,10 @@ class QueryConfig {
932937
return get<double>(kTableScanScaleUpMemoryUsageRatio, 0.7);
933938
}
934939

940+
uint32_t indexLookupJoinMaxPrefetchBatches() const {
941+
return get<uint32_t>(kIndexLookupJoinMaxPrefetchBatches, 4);
942+
}
943+
935944
std::string shuffleCompressionKind() const {
936945
return get<std::string>(kShuffleCompressionKind, "none");
937946
}

velox/docs/configs.rst

+5
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ Generic Configuration
166166
- false
167167
- By default, if a key is found in multiple given maps, that key's value in the resulting map comes from the last one of those maps.
168168
If true, throws exception when duplicate keys are found. This configuration is needed by Spark functions `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`.
169+
* - index_lookup_join_max_prefetch_batches
170+
- integer
171+
- 0
172+
- Specifies the max number of input batches to prefetch to do index lookup ahead. If it is zero,
173+
then process one input batch at a time.
169174

170175
.. _expression-evaluation-conf:
171176

0 commit comments

Comments
 (0)