Skip to content

Commit a092070

Browse files
kagamiorifacebook-github-bot
authored andcommitted
Extend ApproxDistinctResultVerifier for WindowFuzzer (facebookincubator#8953)
Summary: Pull Request resolved: facebookincubator#8953 Extend ApproxDistinctResultVerifier to verify approx_distinct in window operations. This diff adds an internal aggregation function `$internal$count_distinct` to be used in ApproxDistinctResultVerifier for window fuzzer. Reviewed By: kgpai Differential Revision: D54497464 fbshipit-source-id: 9fd0ba618945e106dec3e4474abef6370398a3ab
1 parent 5906234 commit a092070

12 files changed

+621
-30
lines changed

velox/exec/SetAccumulator.h

+89
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ struct SetAccumulator {
7878
vector_size_t index,
7979
const DecodedVector& values,
8080
HashStringAllocator* allocator) {
81+
VELOX_DCHECK(!arrayVector.isNullAt(index));
8182
const auto size = arrayVector.sizeAt(index);
8283
const auto offset = arrayVector.offsetAt(index);
8384

@@ -86,6 +87,33 @@ struct SetAccumulator {
8687
}
8788
}
8889

90+
/// Adds non-null value if new. No-op if the value is NULL or was added
91+
/// before.
92+
void addNonNullValue(
93+
const DecodedVector& decoded,
94+
vector_size_t index,
95+
HashStringAllocator* /*allocator*/) {
96+
const auto cnt = uniqueValues.size();
97+
if (!decoded.isNullAt(index)) {
98+
uniqueValues.insert({decoded.valueAt<T>(index), cnt});
99+
}
100+
}
101+
102+
/// Adds new non-null values from an array.
103+
void addNonNullValues(
104+
const ArrayVector& arrayVector,
105+
vector_size_t index,
106+
const DecodedVector& values,
107+
HashStringAllocator* allocator) {
108+
VELOX_DCHECK(!arrayVector.isNullAt(index));
109+
const auto size = arrayVector.sizeAt(index);
110+
const auto offset = arrayVector.offsetAt(index);
111+
112+
for (auto i = 0; i < size; ++i) {
113+
addNonNullValue(values, offset + i, allocator);
114+
}
115+
}
116+
89117
/// Returns number of unique values including null.
90118
size_t size() const {
91119
return uniqueValues.size() + (nullIndex.has_value() ? 1 : 0);
@@ -150,6 +178,7 @@ struct StringViewSetAccumulator {
150178
vector_size_t index,
151179
const DecodedVector& values,
152180
HashStringAllocator* allocator) {
181+
VELOX_DCHECK(!arrayVector.isNullAt(index));
153182
const auto size = arrayVector.sizeAt(index);
154183
const auto offset = arrayVector.offsetAt(index);
155184

@@ -158,6 +187,37 @@ struct StringViewSetAccumulator {
158187
}
159188
}
160189

190+
void addNonNullValue(
191+
const DecodedVector& decoded,
192+
vector_size_t index,
193+
HashStringAllocator* allocator) {
194+
const auto cnt = base.uniqueValues.size();
195+
if (!decoded.isNullAt(index)) {
196+
auto value = decoded.valueAt<StringView>(index);
197+
if (!value.isInline()) {
198+
if (base.uniqueValues.contains(value)) {
199+
return;
200+
}
201+
value = strings.append(value, *allocator);
202+
}
203+
base.uniqueValues.insert({value, cnt});
204+
}
205+
}
206+
207+
void addNonNullValues(
208+
const ArrayVector& arrayVector,
209+
vector_size_t index,
210+
const DecodedVector& values,
211+
HashStringAllocator* allocator) {
212+
VELOX_DCHECK(!arrayVector.isNullAt(index));
213+
const auto size = arrayVector.sizeAt(index);
214+
const auto offset = arrayVector.offsetAt(index);
215+
216+
for (auto i = 0; i < size; ++i) {
217+
addNonNullValue(values, offset + i, allocator);
218+
}
219+
}
220+
161221
size_t size() const {
162222
return base.size();
163223
}
@@ -218,6 +278,7 @@ struct ComplexTypeSetAccumulator {
218278
vector_size_t index,
219279
const DecodedVector& values,
220280
HashStringAllocator* allocator) {
281+
VELOX_DCHECK(!arrayVector.isNullAt(index));
221282
const auto size = arrayVector.sizeAt(index);
222283
const auto offset = arrayVector.offsetAt(index);
223284

@@ -226,6 +287,34 @@ struct ComplexTypeSetAccumulator {
226287
}
227288
}
228289

290+
void addNonNullValue(
291+
const DecodedVector& decoded,
292+
vector_size_t index,
293+
HashStringAllocator* allocator) {
294+
const auto cnt = base.uniqueValues.size();
295+
if (!decoded.isNullAt(index)) {
296+
auto entry = values.append(decoded, index, allocator);
297+
298+
if (!base.uniqueValues.insert({entry, cnt}).second) {
299+
values.removeLast(entry);
300+
}
301+
}
302+
}
303+
304+
void addNonNullValues(
305+
const ArrayVector& arrayVector,
306+
vector_size_t index,
307+
const DecodedVector& values,
308+
HashStringAllocator* allocator) {
309+
VELOX_DCHECK(!arrayVector.isNullAt(index));
310+
const auto size = arrayVector.sizeAt(index);
311+
const auto offset = arrayVector.offsetAt(index);
312+
313+
for (auto i = 0; i < size; ++i) {
314+
addNonNullValue(values, offset + i, allocator);
315+
}
316+
}
317+
229318
size_t size() const {
230319
return base.size();
231320
}

velox/exec/fuzzer/ResultVerifier.h

+13
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ class ResultVerifier {
5454
const core::AggregationNode::Aggregate& aggregate,
5555
const std::string& aggregateName) = 0;
5656

57+
/// Called once on a window operation before possibly multiple calls to the
58+
/// 'compare' or 'verify' APIs. to specify the input data, window partition-by
59+
/// keys, the window function, the window frame, and the name of the column
60+
/// that will store the window function results.
61+
virtual void initializeWindow(
62+
const std::vector<RowVectorPtr>& /*input*/,
63+
const std::vector<std::string>& /*partitionByKeys*/,
64+
const core::WindowNode::Function& /*function*/,
65+
const std::string& /*frame*/,
66+
const std::string& /*windowName*/) {
67+
VELOX_NYI();
68+
}
69+
5770
/// Compares results of two logically equivalent Velox plans or a Velox plan
5871
/// and a reference DB query.
5972
///

velox/exec/fuzzer/WindowFuzzer.cpp

+42-10
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ void WindowFuzzer::go() {
220220

221221
const bool customVerification =
222222
customVerificationFunctions_.count(signature.name) != 0;
223+
std::shared_ptr<ResultVerifier> customVerifier = nullptr;
224+
if (customVerification) {
225+
customVerifier = customVerificationFunctions_.at(signature.name);
226+
}
223227
const bool requireSortedInput =
224228
orderDependentFunctions_.count(signature.name) != 0;
225229

@@ -257,6 +261,7 @@ void WindowFuzzer::go() {
257261
call,
258262
input,
259263
customVerification,
264+
customVerifier,
260265
FLAGS_enable_window_reference_verification);
261266
if (failed) {
262267
signatureWithStats.second.numFailed++;
@@ -292,6 +297,7 @@ void WindowFuzzer::testAlternativePlans(
292297
const std::string& functionCall,
293298
const std::vector<RowVectorPtr>& input,
294299
bool customVerification,
300+
const std::shared_ptr<ResultVerifier>& customVerifier,
295301
const velox::test::ResultOrError& expected) {
296302
std::vector<AggregationFuzzerBase::PlanWithSplits> plans;
297303

@@ -348,28 +354,47 @@ void WindowFuzzer::testAlternativePlans(
348354

349355
for (const auto& plan : plans) {
350356
testPlan(
351-
plan,
352-
false,
353-
false,
354-
customVerification,
355-
/*customVerifiers*/ {},
356-
expected);
357+
plan, false, false, customVerification, {customVerifier}, expected);
357358
}
358359
}
359360

361+
namespace {
362+
void initializeVerifier(
363+
const core::PlanNodePtr& plan,
364+
const std::shared_ptr<ResultVerifier>& customVerifier,
365+
const std::vector<RowVectorPtr>& input,
366+
const std::vector<std::string>& partitionKeys,
367+
const std::string& frame) {
368+
const auto& windowNode =
369+
std::dynamic_pointer_cast<const core::WindowNode>(plan);
370+
customVerifier->initializeWindow(
371+
input, partitionKeys, windowNode->windowFunctions()[0], frame, "w0");
372+
}
373+
} // namespace
374+
360375
bool WindowFuzzer::verifyWindow(
361376
const std::vector<std::string>& partitionKeys,
362377
const std::vector<SortingKeyAndOrder>& sortingKeysAndOrders,
363378
const std::string& frameClause,
364379
const std::string& functionCall,
365380
const std::vector<RowVectorPtr>& input,
366381
bool customVerification,
382+
const std::shared_ptr<ResultVerifier>& customVerifier,
367383
bool enableWindowVerification) {
368384
auto frame = getFrame(partitionKeys, sortingKeysAndOrders, frameClause);
369385
auto plan = PlanBuilder()
370386
.values(input)
371387
.window({fmt::format("{} over ({})", functionCall, frame)})
372388
.planNode();
389+
if (customVerifier) {
390+
initializeVerifier(plan, customVerifier, input, partitionKeys, frame);
391+
}
392+
SCOPE_EXIT {
393+
if (customVerifier) {
394+
customVerifier->reset();
395+
}
396+
};
397+
373398
if (persistAndRunOnce_) {
374399
persistReproInfo({{plan, {}}}, reproPersistPath_);
375400
}
@@ -381,8 +406,8 @@ bool WindowFuzzer::verifyWindow(
381406
++stats_.numFailed;
382407
}
383408

384-
if (!customVerification && enableWindowVerification) {
385-
if (resultOrError.result) {
409+
if (!customVerification) {
410+
if (resultOrError.result && enableWindowVerification) {
386411
auto referenceResult = computeReferenceResults(plan, input);
387412
stats_.updateReferenceQueryStats(referenceResult.second);
388413
if (auto expectedResult = referenceResult.first) {
@@ -399,9 +424,15 @@ bool WindowFuzzer::verifyWindow(
399424
}
400425
}
401426
} else {
402-
// TODO: support custom verification.
403-
LOG(INFO) << "Verification skipped";
427+
LOG(INFO) << "Verification through custom verifier";
404428
++stats_.numVerificationSkipped;
429+
430+
if (customVerifier && resultOrError.result) {
431+
VELOX_CHECK(
432+
customVerifier->supportsVerify(),
433+
"Window fuzzer only uses custom verify() methods.");
434+
customVerifier->verify(resultOrError.result);
435+
}
405436
}
406437

407438
testAlternativePlans(
@@ -411,6 +442,7 @@ bool WindowFuzzer::verifyWindow(
411442
functionCall,
412443
input,
413444
customVerification,
445+
customVerifier,
414446
resultOrError);
415447

416448
return resultOrError.exceptionPtr != nullptr;

velox/exec/fuzzer/WindowFuzzer.h

+2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class WindowFuzzer : public AggregationFuzzerBase {
118118
const std::string& functionCall,
119119
const std::vector<RowVectorPtr>& input,
120120
bool customVerification,
121+
const std::shared_ptr<ResultVerifier>& customVerifier,
121122
bool enableWindowVerification);
122123

123124
void testAlternativePlans(
@@ -127,6 +128,7 @@ class WindowFuzzer : public AggregationFuzzerBase {
127128
const std::string& functionCall,
128129
const std::vector<RowVectorPtr>& input,
129130
bool customVerification,
131+
const std::shared_ptr<ResultVerifier>& customVerifier,
130132
const velox::test::ResultOrError& expected);
131133

132134
const std::unordered_set<std::string> orderDependentFunctions_;

velox/functions/prestosql/aggregates/RegisterAggregateFunctions.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,10 @@ void registerAllAggregateFunctions(
184184
registerVarianceAggregates(prefix, withCompanionFunctions, overwrite);
185185
}
186186

187+
extern void registerCountDistinctAggregate(const std::string& prefix);
188+
189+
void registerInternalAggregateFunctions(const std::string& prefix) {
190+
registerCountDistinctAggregate(prefix);
191+
}
192+
187193
} // namespace facebook::velox::aggregate::prestosql

velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h

+4
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ void registerAllAggregateFunctions(
3030
bool onlyPrestoSignatures = false,
3131
bool overwrite = true);
3232

33+
/// Register internal aggregation functions only for testing.
34+
/// \param prefix : Prefix for the aggregate functions.
35+
void registerInternalAggregateFunctions(const std::string& prefix);
36+
3337
} // namespace facebook::velox::aggregate::prestosql

0 commit comments

Comments
 (0)