Skip to content

Commit

Permalink
ttweaks to get the calls query working, including migrating the call …
Browse files Browse the repository at this point in the history
…table
  • Loading branch information
gtarpenning committed Jan 31, 2025
1 parent 051171d commit f641f84
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 25 deletions.
5 changes: 3 additions & 2 deletions weave/trace_server/calls_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,11 @@ def _as_sql_base_format(
)

where_filter_sql = combine_conditions(
having_conditions_sql
+ [c.as_sql(pb, table_alias, no_agg=True) for c in self.query_conditions],
[c.as_sql(pb, table_alias, no_agg=True) for c in self.query_conditions],
"AND",
)
if where_filter_sql:
where_filter_sql = "AND " + where_filter_sql

order_by_sql_no_agg = ""
if len(self.order_fields) > 0:
Expand Down
4 changes: 2 additions & 2 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -1947,8 +1947,8 @@ def _ch_call_dict_to_call_schema_dict(ch_call_dict: dict) -> dict:
"op_name": ch_call_dict.get("op_name"),
"started_at": started_at,
"ended_at": ended_at,
"attributes": _dict_dump_to_dict(ch_call_dict.get("attributes_dump", "{}")),
"inputs": _dict_dump_to_dict(ch_call_dict.get("inputs_dump", "{}")),
"attributes": _dict_dump_to_dict(ch_call_dict.get("attributes_dump") or {}),
"inputs": _dict_dump_to_dict(ch_call_dict.get("inputs_dump") or {}),
"output": _nullable_any_dump_to_any(ch_call_dict.get("output_dump")),
"summary": make_derived_summary_fields(
summary=summary or {},
Expand Down
10 changes: 2 additions & 8 deletions weave/trace_server/migrations/011_display_name_simple.down.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
ALTER TABLE calls_merged
DROP COLUMN display_name;

ALTER TABLE calls_merged
ADD COLUMN display_name AggregateFunction(argMax, Nullable(String), DateTime64(3));

ALTER TABLE calls_merged_stats
DROP COLUMN display_name;
MODIFY COLUMN display_name AggregateFunction(argMax, Nullable(String), DateTime64(3));

ALTER TABLE calls_merged_stats
ADD COLUMN display_name AggregateFunction(argMax, Nullable(String), DateTime64(3));
MODIFY COLUMN display_name AggregateFunction(argMax, Nullable(String), DateTime64(3));

ALTER TABLE calls_merged_view MODIFY QUERY
SELECT project_id,
Expand Down
12 changes: 1 addition & 11 deletions weave/trace_server/migrations/011_display_name_simple.up.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
ALTER TABLE calls_merged
DROP COLUMN display_name;

ALTER TABLE calls_merged
ADD COLUMN display_name SimpleAggregateFunction(any, Nullable(String));

ALTER TABLE calls_merged_stats
DROP COLUMN display_name;

ALTER TABLE calls_merged_stats
ADD COLUMN display_name SimpleAggregateFunction(any, Nullable(String));
ALTER TABLE calls_merged MODIFY COLUMN display_name SimpleAggregateFunction(any, Nullable(String));

ALTER TABLE calls_merged_view MODIFY QUERY
SELECT project_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

ALTER TABLE calls_merged MODIFY SETTING min_bytes_for_wide_part = 10485760, min_age_to_force_merge_seconds = 0;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

ALTER TABLE calls_merged
MODIFY SETTING min_bytes_for_wide_part = 0, min_age_to_force_merge_seconds = 10;

OPTIMIZE TABLE calls_merged FINAL;
4 changes: 2 additions & 2 deletions weave/trace_server/trace_server_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def hydrate_calls_with_feedback(

def make_derived_summary_fields(
summary: dict[str, Any],
op_name: str,
op_name: Optional[str],
started_at: Optional[datetime.datetime] = None,
ended_at: Optional[datetime.datetime] = None,
exception: Optional[str] = None,
Expand Down Expand Up @@ -101,7 +101,7 @@ def make_derived_summary_fields(

if display_name:
weave_summary["display_name"] = display_name
else:
elif op_name:
if ri.string_will_be_interpreted_as_ref(op_name):
op = ri.parse_internal_uri(op_name)
if isinstance(op, ri.InternalObjectRef):
Expand Down

0 comments on commit f641f84

Please sign in to comment.