Skip to content

Commit b86d9e7

Browse files
BIGOBIGO
BIGO
authored and
BIGO
committed
Merge branch 'opt_nested_funcs_1' into opt_nested_funcs
2 parents 44b21c7 + 5a78249 commit b86d9e7

File tree

3 files changed

+198
-20
lines changed

3 files changed

+198
-20
lines changed

cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,8 @@ REGISTER_FUNCTION(FlattenJSONStringOnRequiredFunction)
142142
{
143143
factory.registerFunction<FlattenJSONStringOnRequiredFunction>();
144144
}
145+
REGISTER_FUNCTION(FlattenJSONStringOnCollapsedFunctions)
146+
{
147+
factory.registerFunction<FlattenJSONStringOnCollapsedFunctions>();
148+
}
145149
}

cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h

+164-16
Original file line numberDiff line numberDiff line change
@@ -637,13 +637,6 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
637637
GetJsonObjectImpl<DB::DummyJSONParser, DB::DefaultJSONStringSerializer<DB::DummyJSONParser::Element>>>(arguments);
638638
}
639639

640-
private:
641-
DB::ContextPtr context;
642-
/// If too many rows cannot be parsed by simdjson directly, we will normalize the json text at first;
643-
mutable bool is_most_normal_json_text = true;
644-
mutable size_t total_parsed_rows = 0;
645-
mutable size_t total_normalized_rows = 0;
646-
647640
template <typename JSONParser>
648641
bool safeParseJson(std::string_view str, JSONParser & parser, JSONParser::Element & doc) const
649642
{
@@ -675,6 +668,52 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
675668
return is_doc_ok;
676669
}
677670

671+
virtual void insertResultToColumn(
672+
const DB::MutableColumns & res,
673+
DB::SimdJSONParser::Element & document,
674+
std::vector<std::shared_ptr<DB::GeneratorJSONPath<DB::SimdJSONParser>>> & generator_json_paths,
675+
DB::SimdJSONParser &,
676+
GetJsonObjectImpl<DB::SimdJSONParser, DB::JSONStringSerializer<DB::SimdJSONParser::Element, DB::SimdJSONElementFormatter>> & impl) const
677+
{
678+
for (size_t j = 0; j < res.size(); ++j)
679+
{
680+
generator_json_paths[j]->reinitialize();
681+
if (!impl.insertResultToColumn(*res[j], document, *generator_json_paths[j], true))
682+
{
683+
res[j]->insertDefault();
684+
}
685+
}
686+
}
687+
688+
virtual void insertResultToColumn(
689+
const DB::MutableColumns & res,
690+
DB::DummyJSONParser::Element & document,
691+
std::vector<std::shared_ptr<DB::GeneratorJSONPath<DB::DummyJSONParser>>> & generator_json_paths,
692+
DB::DummyJSONParser &,
693+
GetJsonObjectImpl<DB::DummyJSONParser, DB::DefaultJSONStringSerializer<DB::DummyJSONParser::Element>> & impl) const
694+
{
695+
for (size_t j = 0; j < res.size(); ++j)
696+
{
697+
generator_json_paths[j]->reinitialize();
698+
if (!impl.insertResultToColumn(*res[j], document, *generator_json_paths[j], true))
699+
{
700+
res[j]->insertDefault();
701+
}
702+
}
703+
}
704+
705+
virtual DB::ColumnPtr makeResultToReturn(DB::MutableColumns & res) const
706+
{
707+
return DB::ColumnTuple::create(std::move(res));
708+
}
709+
710+
private:
711+
DB::ContextPtr context;
712+
/// If too many rows cannot be parsed by simdjson directly, we will normalize the json text at first;
713+
mutable bool is_most_normal_json_text = true;
714+
mutable size_t total_parsed_rows = 0;
715+
mutable size_t total_normalized_rows = 0;
716+
678717
template <typename JSONParser, typename Impl>
679718
DB::ColumnPtr innerExecuteImpl(const DB::ColumnsWithTypeAndName & arguments) const
680719
{
@@ -773,14 +812,7 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
773812
}
774813
if (document_ok)
775814
{
776-
for (size_t j = 0; j < tuple_size; ++j)
777-
{
778-
generator_json_paths[j]->reinitialize();
779-
if (!impl.insertResultToColumn(*tuple_columns[j], document, *generator_json_paths[j], true))
780-
{
781-
tuple_columns[j]->insertDefault();
782-
}
783-
}
815+
insertResultToColumn(tuple_columns, document, generator_json_paths, parser, impl);
784816
}
785817
else
786818
{
@@ -791,7 +823,123 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
791823
}
792824
}
793825

794-
return DB::ColumnTuple::create(std::move(tuple_columns));
826+
return makeResultToReturn(tuple_columns);
827+
}
828+
};
829+
830+
class FlattenJSONStringOnCollapsedFunctions : public FlattenJSONStringOnRequiredFunction
831+
{
832+
public:
833+
static constexpr auto name = "FlattenJSONStringOnCollapsedFunctions";
834+
static DB::FunctionPtr create(const DB::ContextPtr & context) { return std::make_shared<FlattenJSONStringOnCollapsedFunctions>(context); }
835+
explicit FlattenJSONStringOnCollapsedFunctions(DB::ContextPtr context_) : FlattenJSONStringOnRequiredFunction(context_) { }
836+
~FlattenJSONStringOnCollapsedFunctions() override = default;
837+
String getName() const override { return name; }
838+
839+
void insertResultToColumn(
840+
const DB::MutableColumns & res,
841+
DB::SimdJSONParser::Element & document,
842+
std::vector<std::shared_ptr<DB::GeneratorJSONPath<DB::SimdJSONParser>>> & generator_json_paths,
843+
DB::SimdJSONParser & parser,
844+
GetJsonObjectImpl<DB::SimdJSONParser, DB::JSONStringSerializer<DB::SimdJSONParser::Element, DB::SimdJSONElementFormatter>> & impl) const override
845+
{
846+
using JSONStringSerializer = DB::JSONStringSerializer<DB::SimdJSONParser::Element, DB::SimdJSONElementFormatter>;
847+
insertResultToColumnImpl<DB::SimdJSONParser, JSONStringSerializer>(res, document, generator_json_paths, parser, impl);
848+
}
849+
850+
void insertResultToColumn(
851+
const DB::MutableColumns & res,
852+
DB::DummyJSONParser::Element & document,
853+
std::vector<std::shared_ptr<DB::GeneratorJSONPath<DB::DummyJSONParser>>> & generator_json_paths,
854+
DB::DummyJSONParser & parser,
855+
GetJsonObjectImpl<DB::DummyJSONParser, DB::DefaultJSONStringSerializer<DB::DummyJSONParser::Element>> & impl) const override
856+
{
857+
using JSONStringSerializer = DB::DefaultJSONStringSerializer<DB::DummyJSONParser::Element>;
858+
insertResultToColumnImpl<DB::DummyJSONParser, JSONStringSerializer>(res, document, generator_json_paths, parser, impl);
859+
}
860+
861+
DB::ColumnPtr makeResultToReturn(DB::MutableColumns & res) const override
862+
{
863+
DB::Columns final_cols;
864+
final_cols.emplace_back(std::move(res[res.size() - 1]));
865+
return DB::ColumnTuple::create(std::move(final_cols));
866+
}
867+
868+
private:
869+
template<typename JSONParser, typename JSONStringSerializer>
870+
void insertResultToColumnImpl(
871+
const DB::MutableColumns & res,
872+
typename JSONParser::Element & document,
873+
std::vector<std::shared_ptr<DB::GeneratorJSONPath<JSONParser>>> & generator_json_paths,
874+
JSONParser & parser,
875+
GetJsonObjectImpl<JSONParser, JSONStringSerializer> &) const
876+
{
877+
size_t json_path_pos = 0;
878+
size_t last_col_index = res.size() - 1;
879+
if (!insertResultToColumnImpl<JSONParser, JSONStringSerializer>(*res[last_col_index], document, generator_json_paths, json_path_pos))
880+
{
881+
if (document.isString())
882+
{
883+
typename JSONParser::Element t;
884+
bool parsed = safeParseJson(document.getString(), parser, t);
885+
if (!parsed)
886+
res[last_col_index]->insertDefault();
887+
else if(!insertResultToColumnImpl<JSONParser, JSONStringSerializer>(*res[last_col_index], t, generator_json_paths, json_path_pos))
888+
{
889+
res[last_col_index]->insertDefault();
890+
}
891+
}
892+
else
893+
res[last_col_index]->insertDefault();
894+
}
895+
}
896+
897+
template<typename JSONParser, typename JSONStringSerializer>
898+
bool insertResultToColumnImpl(DB::IColumn & dest, typename JSONParser::Element & root, std::vector<std::shared_ptr<DB::GeneratorJSONPath<JSONParser>>> & generator_json_paths, size_t & json_path_pos) const
899+
{
900+
DB::VisitorStatus status = DB::VisitorStatus::Ok;
901+
bool success = false;
902+
for (size_t i = json_path_pos; i < generator_json_paths.size(); ++i)
903+
{
904+
std::shared_ptr<DB::GeneratorJSONPath<JSONParser>> generator_json_path = generator_json_paths[i];
905+
generator_json_path->reinitialize();
906+
status = DB::VisitorStatus::Ok;
907+
while (status != DB::VisitorStatus::Exhausted)
908+
{
909+
status = generator_json_path->getNextItem(root);
910+
if (status == DB::VisitorStatus::Ok)
911+
{
912+
success = true;
913+
}
914+
else if (status == DB::VisitorStatus::Error)
915+
{
916+
success = false;
917+
}
918+
}
919+
json_path_pos = i;
920+
if (!success)
921+
{
922+
break;
923+
}
924+
}
925+
if (!success)
926+
{
927+
return false;
928+
}
929+
DB::ColumnNullable & nullable_col_str = assert_cast<DB::ColumnNullable &>(dest);
930+
DB::ColumnString * col_str = assert_cast<DB::ColumnString *>(&nullable_col_str.getNestedColumn());
931+
JSONStringSerializer serializer(*col_str);
932+
nullable_col_str.getNullMapData().push_back(0);
933+
if (root.isString())
934+
{
935+
serializer.addRawString(root.getString());
936+
}
937+
else
938+
{
939+
serializer.addElement(root);
940+
}
941+
serializer.commit();
942+
return true;
795943
}
796944
};
797945

cpp-ch/local-engine/Parser/scalar_function_parser/getJSONObject.cpp

+30-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,26 @@ class GetJSONObjectParser : public FunctionParser
5959
DB::ActionsDAG & actions_dag) const override
6060
{
6161
const auto & args = substrait_func.arguments();
62-
if (args.size() != 2)
62+
if (args.size() < 2)
6363
{
64-
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Function {} requires 2 arguments", getCHFunctionName(substrait_func));
64+
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Function {} requires at least 2 arguments", getCHFunctionName(substrait_func));
6565
}
66+
std::string json_paths = "";
67+
for (size_t i = 1; i < args.size(); ++i)
68+
{
69+
if (args[i].value().has_literal())
70+
{
71+
const auto & literal_expr = args[i].value().literal();
72+
if (literal_expr.has_string())
73+
{
74+
std::string path = literal_expr.string();
75+
json_paths += path;
76+
if (i != args.size() - 1)
77+
json_paths += "|";
78+
}
79+
}
80+
}
81+
6682
if (args[0].value().has_scalar_function()
6783
&& args[0].value().scalar_function().function_reference() == SelfDefinedFunctionReference::GET_JSON_OBJECT)
6884
{
@@ -72,8 +88,16 @@ class GetJSONObjectParser : public FunctionParser
7288
{
7389
const auto flatten_function_pb = args[0].value().scalar_function();
7490
const auto * flatten_arg0 = parseExpression(actions_dag, flatten_function_pb.arguments(0).value());
75-
const auto * flatten_arg1 = parseExpression(actions_dag, flatten_function_pb.arguments(1).value());
76-
flatten_json_column_node = toFunctionNode(actions_dag, FlattenJSONStringOnRequiredFunction::name, flatten_json_column_name, {flatten_arg0, flatten_arg1});
91+
if (args.size() > 2)
92+
{
93+
const auto * flatten_path_arg = addColumnToActionsDAG(actions_dag, std::make_shared<DB::DataTypeString>(), json_paths);
94+
flatten_json_column_node = toFunctionNode(actions_dag, FlattenJSONStringOnCollapsedFunctions::name, flatten_json_column_name, {flatten_arg0, flatten_path_arg});
95+
}
96+
else
97+
{
98+
const auto * flatten_arg1 = parseExpression(actions_dag, flatten_function_pb.arguments(1).value());
99+
flatten_json_column_node = toFunctionNode(actions_dag, FlattenJSONStringOnRequiredFunction::name, flatten_json_column_name, {flatten_arg0, flatten_arg1});
100+
}
77101
actions_dag.addOrReplaceInOutputs(*flatten_json_column_node);
78102
}
79103
return {flatten_json_column_node, parseExpression(actions_dag, args[1].value())};
@@ -85,10 +109,12 @@ class GetJSONObjectParser : public FunctionParser
85109
}
86110

87111
private:
112+
88113
static String getFlatterJsonColumnName(const substrait::Expression & arg)
89114
{
90115
return arg.ShortDebugString();
91116
}
117+
92118
};
93119

94120
static FunctionParserRegister<GetJSONObjectParser> register_get_json_object_parser;

0 commit comments

Comments
 (0)