From b457e0fcf1068e57682ef1ddd69884cddf111ee1 Mon Sep 17 00:00:00 2001 From: NEUpanning Date: Thu, 6 Mar 2025 17:05:21 +0800 Subject: [PATCH 1/5] initial --- velox/core/QueryConfig.h | 10 + velox/docs/configs.rst | 5 + .../aggregates/CentralMomentsAggregate.cpp | 251 ++++++++++++++---- .../tests/CentralMomentsAggregationTest.cpp | 30 +++ 4 files changed, 249 insertions(+), 47 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 7041126e2153..d0bfe7db114e 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -318,6 +318,12 @@ class QueryConfig { static constexpr const char* kSparkLegacyDateFormatter = "spark.legacy_date_formatter"; + /// if true, statistical aggregation function includes skewness, kurtosis, + /// will return std::numeric_limits::quiet_NaN() instead of NULL when + /// DivideByZero occurs during expression evaluation. + static constexpr const char* kSparkLegacyStatisticalAggregate = + "spark.legacy_statistical_aggregate"; + /// The number of local parallel table writer operators per task. static constexpr const char* kTaskWriterCount = "task_writer_count"; @@ -731,6 +737,10 @@ class QueryConfig { return get(kSparkLegacyDateFormatter, false); } + bool sparkLegacyStatisticalAggregate() const { + return get(kSparkLegacyStatisticalAggregate, false); + } + bool exprTrackCpuUsage() const { return get(kExprTrackCpuUsage, false); } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index d16dbb3ecef7..516f2d05ce96 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -733,6 +733,11 @@ Spark-specific Configuration - Joda date formatter performs strict checking of its input and uses different pattern string. - For example, the 2015-07-22 10:00:00 timestamp cannot be parse if pattern is yyyy-MM-dd because the parser does not consume whole input. - Another example is that the 'W' pattern, which means week in month, is not supported. For more differences, see :issue:`10354`. + * - spark.legacy_statistical_aggregate + - bool + - false + - if true, statistical aggregation function includes skewness, kurtosis will return std::numeric_limits::quiet_NaN() + - instead of NULL when DivideByZero occurs during expression evaluation. Tracing -------- diff --git a/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp b/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp index 90a0ade6f0e7..e1d6e0c5498f 100644 --- a/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp +++ b/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp @@ -15,28 +15,43 @@ */ #include "velox/functions/sparksql/aggregates/CentralMomentsAggregate.h" +#include #include "velox/functions/lib/aggregates/CentralMomentsAggregatesBase.h" namespace facebook::velox::functions::aggregate::sparksql { namespace { +template struct SkewnessResultAccessor { static bool hasResult(const CentralMomentsAccumulator& accumulator) { - return accumulator.count() >= 1 && accumulator.m2() != 0; + if constexpr (nullOnDivideByZero) { + return accumulator.count() >= 1 && accumulator.m2() != 0; + } + return accumulator.count() >= 1; } static double result(const CentralMomentsAccumulator& accumulator) { + if (accumulator.m2() == 0) { + return std::numeric_limits::quiet_NaN(); + } return std::sqrt(accumulator.count()) * accumulator.m3() / std::pow(accumulator.m2(), 1.5); } }; +template struct KurtosisResultAccessor { static bool hasResult(const CentralMomentsAccumulator& accumulator) { - return accumulator.count() >= 1 && accumulator.m2() != 0; + if constexpr (nullOnDivideByZero) { + return accumulator.count() >= 1 && accumulator.m2() != 0; + } + return accumulator.count() >= 1; } static double result(const CentralMomentsAccumulator& accumulator) { + if (accumulator.m2() == 0) { + return std::numeric_limits::quiet_NaN(); + } double count = accumulator.count(); double m2 = accumulator.m2(); double m4 = accumulator.m4(); @@ -44,11 +59,7 @@ struct KurtosisResultAccessor { } }; -template -exec::AggregateRegistrationResult registerCentralMoments( - const std::string& name, - bool withCompanionFunctions, - bool overwrite) { +std::vector> getSignatures() { std::vector> signatures; std::vector inputTypes = { "smallint", "integer", "bigint", "real", "double"}; @@ -60,6 +71,115 @@ exec::AggregateRegistrationResult registerCentralMoments( .argumentType(inputType) .build()); } + return signatures; +} + +exec::AggregateRegistrationResult registerSkewness( + const std::string& name, + bool withCompanionFunctions, + bool overwrite) { + std::vector> signatures = + getSignatures(); + + return exec::registerAggregateFunction( + name, + std::move(signatures), + [name]( + core::AggregationNode::Step step, + const std::vector& argTypes, + const TypePtr& resultType, + const core::QueryConfig& config) -> std::unique_ptr { + VELOX_CHECK_LE( + argTypes.size(), 1, "{} takes at most one argument", name); + const auto& inputType = argTypes[0]; + if (config.sparkLegacyStatisticalAggregate()) { + if (exec::isRawInput(step)) { + switch (inputType->kind()) { + case TypeKind::SMALLINT: + return std::make_unique>>(resultType); + case TypeKind::INTEGER: + return std::make_unique>>(resultType); + case TypeKind::BIGINT: + return std::make_unique>>(resultType); + case TypeKind::DOUBLE: + return std::make_unique>>(resultType); + case TypeKind::REAL: + return std::make_unique>>(resultType); + default: + VELOX_UNSUPPORTED( + "Unsupported input type: {}. " + "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", + inputType->toString()); + } + } else { + checkAccumulatorRowType( + inputType, + "Input type for final aggregation must be " + "(count:bigint, m1:double, m2:double, m3:double, m4:double) struct"); + return std::make_unique>>(resultType); + } + } else { + if (exec::isRawInput(step)) { + switch (inputType->kind()) { + case TypeKind::SMALLINT: + return std::make_unique>>(resultType); + case TypeKind::INTEGER: + return std::make_unique>>(resultType); + case TypeKind::BIGINT: + return std::make_unique>>(resultType); + case TypeKind::DOUBLE: + return std::make_unique>>(resultType); + case TypeKind::REAL: + return std::make_unique>>(resultType); + default: + VELOX_UNSUPPORTED( + "Unsupported input type: {}. " + "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", + inputType->toString()); + } + } else { + checkAccumulatorRowType( + inputType, + "Input type for final aggregation must be " + "(count:bigint, m1:double, m2:double, m3:double, m4:double) struct"); + return std::make_unique>>(resultType); + } + } + }, + withCompanionFunctions, + overwrite); +} + +exec::AggregateRegistrationResult registerKurtosis( + const std::string& name, + bool withCompanionFunctions, + bool overwrite) { + std::vector> signatures = + getSignatures(); return exec::registerAggregateFunction( name, @@ -68,47 +188,86 @@ exec::AggregateRegistrationResult registerCentralMoments( core::AggregationNode::Step step, const std::vector& argTypes, const TypePtr& resultType, - const core::QueryConfig& /*config*/) - -> std::unique_ptr { + const core::QueryConfig& config) -> std::unique_ptr { VELOX_CHECK_LE( argTypes.size(), 1, "{} takes at most one argument", name); const auto& inputType = argTypes[0]; - if (exec::isRawInput(step)) { - switch (inputType->kind()) { - case TypeKind::SMALLINT: - return std::make_unique< - CentralMomentsAggregatesBase>( - resultType); - case TypeKind::INTEGER: - return std::make_unique< - CentralMomentsAggregatesBase>( - resultType); - case TypeKind::BIGINT: - return std::make_unique< - CentralMomentsAggregatesBase>( - resultType); - case TypeKind::DOUBLE: - return std::make_unique< - CentralMomentsAggregatesBase>( - resultType); - case TypeKind::REAL: - return std::make_unique< - CentralMomentsAggregatesBase>( - resultType); - default: - VELOX_UNSUPPORTED( - "Unsupported input type: {}. " - "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", - inputType->toString()); + if (config.sparkLegacyStatisticalAggregate()) { + if (exec::isRawInput(step)) { + switch (inputType->kind()) { + case TypeKind::SMALLINT: + return std::make_unique>>(resultType); + case TypeKind::INTEGER: + return std::make_unique>>(resultType); + case TypeKind::BIGINT: + return std::make_unique>>(resultType); + case TypeKind::DOUBLE: + return std::make_unique>>(resultType); + case TypeKind::REAL: + return std::make_unique>>(resultType); + default: + VELOX_UNSUPPORTED( + "Unsupported input type: {}. " + "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", + inputType->toString()); + } + } else { + checkAccumulatorRowType( + inputType, + "Input type for final aggregation must be " + "(count:bigint, m1:double, m2:double, m3:double, m4:double) struct"); + return std::make_unique>>(resultType); } } else { - checkAccumulatorRowType( - inputType, - "Input type for final aggregation must be " - "(count:bigint, m1:double, m2:double, m3:double, m4:double) struct"); - return std::make_unique>(resultType); + if (exec::isRawInput(step)) { + switch (inputType->kind()) { + case TypeKind::SMALLINT: + return std::make_unique>>(resultType); + case TypeKind::INTEGER: + return std::make_unique>>(resultType); + case TypeKind::BIGINT: + return std::make_unique>>(resultType); + case TypeKind::DOUBLE: + return std::make_unique>>(resultType); + case TypeKind::REAL: + return std::make_unique>>(resultType); + default: + VELOX_UNSUPPORTED( + "Unsupported input type: {}. " + "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", + inputType->toString()); + } + } else { + checkAccumulatorRowType( + inputType, + "Input type for final aggregation must be " + "(count:bigint, m1:double, m2:double, m3:double, m4:double) struct"); + return std::make_unique>>(resultType); + } } }, withCompanionFunctions, @@ -120,10 +279,8 @@ void registerCentralMomentsAggregate( const std::string& prefix, bool withCompanionFunctions, bool overwrite) { - registerCentralMoments( - prefix + "skewness", withCompanionFunctions, overwrite); - registerCentralMoments( - prefix + "kurtosis", withCompanionFunctions, overwrite); + registerSkewness(prefix + "skewness", withCompanionFunctions, overwrite); + registerKurtosis(prefix + "kurtosis", withCompanionFunctions, overwrite); } } // namespace facebook::velox::functions::aggregate::sparksql diff --git a/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp b/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp index 9f5dd9b3efd8..883d42cc5203 100644 --- a/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp +++ b/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp @@ -40,6 +40,16 @@ class CentralMomentsAggregationTest : public AggregationTestBase { builder.singleAggregation({}, {fmt::format("spark_{}(c0)", agg)}); AssertQueryBuilder(builder.planNode()).assertResults({expected}); } + + void testLegacyCenteralMomentsAggResult( + const std::string& agg, + const RowVectorPtr& input, + const RowVectorPtr& expected) { + PlanBuilder builder(pool()); + builder.values({input}); + builder.singleAggregation({}, {fmt::format("spark_{}(c0)", agg)}); + AssertQueryBuilder(builder.planNode()).config("spark.legacy_statistical_aggregate", "true").assertResults({expected}); + } }; TEST_F(CentralMomentsAggregationTest, skewnessHasResult) { @@ -54,6 +64,19 @@ TEST_F(CentralMomentsAggregationTest, skewnessHasResult) { expected = makeRowVector({makeNullableFlatVector( std::vector>{std::nullopt})}); testCenteralMomentsAggResult(agg, input, expected); + + // Output NULL when m2 equals 0. + input = makeRowVector({makeFlatVector({1, 1})}); + expected = makeRowVector({makeNullableFlatVector( + std::vector>{std::nullopt})}); + testCenteralMomentsAggResult(agg, input, expected); + + // Output NaN when m2 equals 0 for legacy aggregate. + input = makeRowVector({makeFlatVector({1, 1})}); + expected = makeRowVector( + {makeNullableFlatVector(std::vector>{ + std::numeric_limits::quiet_NaN()})}); + testLegacyCenteralMomentsAggResult(agg, input, expected); } TEST_F(CentralMomentsAggregationTest, pearsonKurtosis) { @@ -78,6 +101,13 @@ TEST_F(CentralMomentsAggregationTest, pearsonKurtosis) { expected = makeRowVector({makeNullableFlatVector( std::vector>{std::nullopt})}); testCenteralMomentsAggResult(agg, input, expected); + + // Output NaN when m2 equals 0 for legacy aggregate. + input = makeRowVector({makeFlatVector({1, 1})}); + expected = makeRowVector( + {makeNullableFlatVector(std::vector>{ + std::numeric_limits::quiet_NaN()})}); + testLegacyCenteralMomentsAggResult(agg, input, expected); } } // namespace From c466e7a2bf089583b86df3461730c0f92af01775 Mon Sep 17 00:00:00 2001 From: NEUpanning Date: Thu, 6 Mar 2025 17:19:43 +0800 Subject: [PATCH 2/5] reformat --- .../aggregates/tests/CentralMomentsAggregationTest.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp b/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp index 883d42cc5203..08e355c4e5b9 100644 --- a/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp +++ b/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp @@ -48,7 +48,9 @@ class CentralMomentsAggregationTest : public AggregationTestBase { PlanBuilder builder(pool()); builder.values({input}); builder.singleAggregation({}, {fmt::format("spark_{}(c0)", agg)}); - AssertQueryBuilder(builder.planNode()).config("spark.legacy_statistical_aggregate", "true").assertResults({expected}); + AssertQueryBuilder(builder.planNode()) + .config("spark.legacy_statistical_aggregate", "true") + .assertResults({expected}); } }; From 1854ba10d78134336c1527f5caf82146b293b21b Mon Sep 17 00:00:00 2001 From: NEUpanning Date: Fri, 7 Mar 2025 13:00:16 +0800 Subject: [PATCH 3/5] cr --- velox/core/QueryConfig.h | 2 +- velox/docs/configs.rst | 2 +- .../aggregates/CentralMomentsAggregate.cpp | 90 +++---------------- .../tests/CentralMomentsAggregationTest.cpp | 18 ++-- 4 files changed, 21 insertions(+), 91 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 05f4b0723631..58a6509a8255 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -333,7 +333,7 @@ class QueryConfig { static constexpr const char* kSparkLegacyDateFormatter = "spark.legacy_date_formatter"; - /// if true, statistical aggregation function includes skewness, kurtosis, + /// If true, statistical aggregation function includes skewness, kurtosis, /// will return std::numeric_limits::quiet_NaN() instead of NULL when /// DivideByZero occurs during expression evaluation. static constexpr const char* kSparkLegacyStatisticalAggregate = diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index d4a3f38c4270..7702ac026943 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -890,7 +890,7 @@ Spark-specific Configuration * - spark.legacy_statistical_aggregate - bool - false - - if true, statistical aggregation function includes skewness, kurtosis will return std::numeric_limits::quiet_NaN() + - If true, statistical aggregation function includes skewness, kurtosis will return std::numeric_limits::quiet_NaN() - instead of NULL when DivideByZero occurs during expression evaluation. Tracing diff --git a/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp b/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp index e1d6e0c5498f..c7a16a43f338 100644 --- a/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp +++ b/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp @@ -61,16 +61,12 @@ struct KurtosisResultAccessor { std::vector> getSignatures() { std::vector> signatures; - std::vector inputTypes = { - "smallint", "integer", "bigint", "real", "double"}; - for (const auto& inputType : inputTypes) { - signatures.push_back( - exec::AggregateFunctionSignatureBuilder() - .returnType("double") - .intermediateType(CentralMomentsIntermediateResult::type()) - .argumentType(inputType) - .build()); - } + signatures.push_back( + exec::AggregateFunctionSignatureBuilder() + .returnType("double") + .intermediateType(CentralMomentsIntermediateResult::type()) + .argumentType("double") + .build()); return signatures; } @@ -89,36 +85,19 @@ exec::AggregateRegistrationResult registerSkewness( const std::vector& argTypes, const TypePtr& resultType, const core::QueryConfig& config) -> std::unique_ptr { - VELOX_CHECK_LE( - argTypes.size(), 1, "{} takes at most one argument", name); + VELOX_CHECK_EQ(argTypes.size(), 1, "{} takes only one argument", name); const auto& inputType = argTypes[0]; if (config.sparkLegacyStatisticalAggregate()) { if (exec::isRawInput(step)) { switch (inputType->kind()) { - case TypeKind::SMALLINT: - return std::make_unique>>(resultType); - case TypeKind::INTEGER: - return std::make_unique>>(resultType); - case TypeKind::BIGINT: - return std::make_unique>>(resultType); case TypeKind::DOUBLE: return std::make_unique>>(resultType); - case TypeKind::REAL: - return std::make_unique>>(resultType); default: VELOX_UNSUPPORTED( "Unsupported input type: {}. " - "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", + "Expected DOUBLE.", inputType->toString()); } } else { @@ -133,30 +112,14 @@ exec::AggregateRegistrationResult registerSkewness( } else { if (exec::isRawInput(step)) { switch (inputType->kind()) { - case TypeKind::SMALLINT: - return std::make_unique>>(resultType); - case TypeKind::INTEGER: - return std::make_unique>>(resultType); - case TypeKind::BIGINT: - return std::make_unique>>(resultType); case TypeKind::DOUBLE: return std::make_unique>>(resultType); - case TypeKind::REAL: - return std::make_unique>>(resultType); default: VELOX_UNSUPPORTED( "Unsupported input type: {}. " - "Expected SMALLINT, INTEGER, BIGINT, DOUBLE or REAL.", + "Expected DOUBLE.", inputType->toString()); } } else { @@ -189,32 +152,15 @@ exec::AggregateRegistrationResult registerKurtosis( const std::vector& argTypes, const TypePtr& resultType, const core::QueryConfig& config) -> std::unique_ptr { - VELOX_CHECK_LE( - argTypes.size(), 1, "{} takes at most one argument", name); + VELOX_CHECK_EQ(argTypes.size(), 1, "{} takes only one argument", name); const auto& inputType = argTypes[0]; if (config.sparkLegacyStatisticalAggregate()) { if (exec::isRawInput(step)) { switch (inputType->kind()) { - case TypeKind::SMALLINT: - return std::make_unique>>(resultType); - case TypeKind::INTEGER: - return std::make_unique>>(resultType); - case TypeKind::BIGINT: - return std::make_unique>>(resultType); case TypeKind::DOUBLE: return std::make_unique>>(resultType); - case TypeKind::REAL: - return std::make_unique>>(resultType); default: VELOX_UNSUPPORTED( "Unsupported input type: {}. " @@ -233,26 +179,10 @@ exec::AggregateRegistrationResult registerKurtosis( } else { if (exec::isRawInput(step)) { switch (inputType->kind()) { - case TypeKind::SMALLINT: - return std::make_unique>>(resultType); - case TypeKind::INTEGER: - return std::make_unique>>(resultType); - case TypeKind::BIGINT: - return std::make_unique>>(resultType); case TypeKind::DOUBLE: return std::make_unique>>(resultType); - case TypeKind::REAL: - return std::make_unique>>(resultType); default: VELOX_UNSUPPORTED( "Unsupported input type: {}. " diff --git a/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp b/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp index 08e355c4e5b9..eef28d501c7c 100644 --- a/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp +++ b/velox/functions/sparksql/aggregates/tests/CentralMomentsAggregationTest.cpp @@ -56,25 +56,25 @@ class CentralMomentsAggregationTest : public AggregationTestBase { TEST_F(CentralMomentsAggregationTest, skewnessHasResult) { auto agg = "skewness"; - auto input = makeRowVector({makeFlatVector({1, 2})}); + auto input = makeRowVector({makeFlatVector({1, 2})}); // Even when the count is 2, Spark still produces output. auto expected = makeRowVector({makeFlatVector(std::vector{0.0})}); testCenteralMomentsAggResult(agg, input, expected); - input = makeRowVector({makeFlatVector({1, 1})}); + input = makeRowVector({makeFlatVector({1, 1})}); expected = makeRowVector({makeNullableFlatVector( std::vector>{std::nullopt})}); testCenteralMomentsAggResult(agg, input, expected); // Output NULL when m2 equals 0. - input = makeRowVector({makeFlatVector({1, 1})}); + input = makeRowVector({makeFlatVector({1, 1})}); expected = makeRowVector({makeNullableFlatVector( std::vector>{std::nullopt})}); testCenteralMomentsAggResult(agg, input, expected); // Output NaN when m2 equals 0 for legacy aggregate. - input = makeRowVector({makeFlatVector({1, 1})}); + input = makeRowVector({makeFlatVector({1, 1})}); expected = makeRowVector( {makeNullableFlatVector(std::vector>{ std::numeric_limits::quiet_NaN()})}); @@ -83,29 +83,29 @@ TEST_F(CentralMomentsAggregationTest, skewnessHasResult) { TEST_F(CentralMomentsAggregationTest, pearsonKurtosis) { auto agg = "kurtosis"; - auto input = makeRowVector({makeFlatVector({1, 10, 100, 10, 1})}); + auto input = makeRowVector({makeFlatVector({1, 10, 100, 10, 1})}); auto expected = makeRowVector( {makeFlatVector(std::vector{0.19432323191699075})}); testCenteralMomentsAggResult(agg, input, expected); - input = makeRowVector({makeFlatVector({-10, -20, 100, 1000})}); + input = makeRowVector({makeFlatVector({-10, -20, 100, 1000})}); expected = makeRowVector( {makeFlatVector(std::vector{-0.7014368047529627})}); testCenteralMomentsAggResult(agg, input, expected); // Even when the count is 2, Spark still produces non-null result. - input = makeRowVector({makeFlatVector({1, 2})}); + input = makeRowVector({makeFlatVector({1, 2})}); expected = makeRowVector({makeFlatVector(std::vector{-2.0})}); testCenteralMomentsAggResult(agg, input, expected); // Output NULL when m2 equals 0. - input = makeRowVector({makeFlatVector({1, 1})}); + input = makeRowVector({makeFlatVector({1, 1})}); expected = makeRowVector({makeNullableFlatVector( std::vector>{std::nullopt})}); testCenteralMomentsAggResult(agg, input, expected); // Output NaN when m2 equals 0 for legacy aggregate. - input = makeRowVector({makeFlatVector({1, 1})}); + input = makeRowVector({makeFlatVector({1, 1})}); expected = makeRowVector( {makeNullableFlatVector(std::vector>{ std::numeric_limits::quiet_NaN()})}); From 404f31c287664b2d3d4cfdbbb291be05ecc52ebd Mon Sep 17 00:00:00 2001 From: NEUpanning Date: Fri, 7 Mar 2025 19:48:07 +0800 Subject: [PATCH 4/5] cr --- velox/docs/configs.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 7702ac026943..ea482e7459f0 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -891,7 +891,9 @@ Spark-specific Configuration - bool - false - If true, statistical aggregation function includes skewness, kurtosis will return std::numeric_limits::quiet_NaN() - - instead of NULL when DivideByZero occurs during expression evaluation. + - instead of NULL when DivideByZero occurs during expression evaluation. It is worth noting that Spark statistical aggregation functions + - including stddev, stddev_samp, variance, var_samp, covar_samp, corr should also respect this configuration, + - although they have not been supported yet. Tracing -------- From 1a8347e25a599925aa52bda6ad7fc276c55854c5 Mon Sep 17 00:00:00 2001 From: NEUpanning Date: Mon, 10 Mar 2025 11:19:32 +0800 Subject: [PATCH 5/5] cr --- velox/core/QueryConfig.h | 4 +- velox/docs/configs.rst | 2 +- .../aggregates/CentralMomentsAggregate.cpp | 52 ++++++++++++------- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 58a6509a8255..ed565bb40f4f 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -334,8 +334,8 @@ class QueryConfig { "spark.legacy_date_formatter"; /// If true, statistical aggregation function includes skewness, kurtosis, - /// will return std::numeric_limits::quiet_NaN() instead of NULL when - /// DivideByZero occurs during expression evaluation. + /// will return NaN instead of NULL when dividing by zero during expression + /// evaluation. static constexpr const char* kSparkLegacyStatisticalAggregate = "spark.legacy_statistical_aggregate"; diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index ea482e7459f0..7f0a72eb41f1 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -891,7 +891,7 @@ Spark-specific Configuration - bool - false - If true, statistical aggregation function includes skewness, kurtosis will return std::numeric_limits::quiet_NaN() - - instead of NULL when DivideByZero occurs during expression evaluation. It is worth noting that Spark statistical aggregation functions + - instead of NULL when dividing by zero during expression evaluation. It is worth noting that Spark statistical aggregation functions - including stddev, stddev_samp, variance, var_samp, covar_samp, corr should also respect this configuration, - although they have not been supported yet. diff --git a/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp b/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp index c7a16a43f338..7afe2bf71395 100644 --- a/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp +++ b/velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp @@ -21,6 +21,10 @@ namespace facebook::velox::functions::aggregate::sparksql { namespace { +// Calculate the skewness value from m2, count and m3. +// +// @tparam nullOnDivideByZero If true, return NULL instead of NaN when dividing +// by zero during the calculating. template struct SkewnessResultAccessor { static bool hasResult(const CentralMomentsAccumulator& accumulator) { @@ -32,6 +36,9 @@ struct SkewnessResultAccessor { static double result(const CentralMomentsAccumulator& accumulator) { if (accumulator.m2() == 0) { + VELOX_USER_CHECK( + !nullOnDivideByZero, + "If NaN is returned when m2 is 0, nullOnDivideByZero must be false"); return std::numeric_limits::quiet_NaN(); } return std::sqrt(accumulator.count()) * accumulator.m3() / @@ -39,6 +46,10 @@ struct SkewnessResultAccessor { } }; +// Calculate the kurtosis value from m2, count and m4. +// +// @tparam nullOnDivideByZero If true, return NULL instead of NaN when dividing +// by zero during the calculating. template struct KurtosisResultAccessor { static bool hasResult(const CentralMomentsAccumulator& accumulator) { @@ -50,6 +61,9 @@ struct KurtosisResultAccessor { static double result(const CentralMomentsAccumulator& accumulator) { if (accumulator.m2() == 0) { + VELOX_USER_CHECK( + !nullOnDivideByZero, + "If NaN is returned when m2 is 0, nullOnDivideByZero must be false"); return std::numeric_limits::quiet_NaN(); } double count = accumulator.count(); @@ -89,16 +103,15 @@ exec::AggregateRegistrationResult registerSkewness( const auto& inputType = argTypes[0]; if (config.sparkLegacyStatisticalAggregate()) { if (exec::isRawInput(step)) { - switch (inputType->kind()) { - case TypeKind::DOUBLE: - return std::make_unique>>(resultType); - default: - VELOX_UNSUPPORTED( - "Unsupported input type: {}. " - "Expected DOUBLE.", - inputType->toString()); + if (inputType->kind() == TypeKind::DOUBLE) { + return std::make_unique>>(resultType); + } else { + VELOX_UNSUPPORTED( + "Unsupported input type: {}. " + "Expected DOUBLE.", + inputType->toString()); } } else { checkAccumulatorRowType( @@ -111,16 +124,15 @@ exec::AggregateRegistrationResult registerSkewness( } } else { if (exec::isRawInput(step)) { - switch (inputType->kind()) { - case TypeKind::DOUBLE: - return std::make_unique>>(resultType); - default: - VELOX_UNSUPPORTED( - "Unsupported input type: {}. " - "Expected DOUBLE.", - inputType->toString()); + if (inputType->kind() == TypeKind::DOUBLE) { + return std::make_unique>>(resultType); + } else { + VELOX_UNSUPPORTED( + "Unsupported input type: {}. " + "Expected DOUBLE.", + inputType->toString()); } } else { checkAccumulatorRowType(