Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Streaming Aggregation - A Memory-Efficient Approach #16774

Open
bowenlan-amzn opened this issue Dec 3, 2024 · 3 comments
Open

[RFC] Streaming Aggregation - A Memory-Efficient Approach #16774

bowenlan-amzn opened this issue Dec 3, 2024 · 3 comments
Assignees
Labels
Roadmap:Search Project-wide roadmap label Search:Aggregations Search:Performance v3.0.0 Issues and PRs related to version 3.0.0

Comments

@bowenlan-amzn
Copy link
Member

bowenlan-amzn commented Dec 3, 2024

Abstract

This RFC proposes an enhancement to OpenSearch's aggregation framework through the integration of the newly introduced streaming transport capabilities. The enhancement transitions the existing aggregation model to a streaming paradigm, where partial aggregation results transmitted continuously to the coordinator. This approach redistributes memory load from data nodes to coordinator nodes, resulting in improved cluster stability and resource utilization. Furthermore, this enhancement facilitates future horizontal scaling of aggregation computations through the introduction of intermediate processing workers.

Challenge

The existing aggregation framework distributes requests to data nodes that hold relevant shards. Each data node must maintain partial aggregation results in memory until processing completes, creating several operational challenges:

  1. Memory Constraints: Data nodes must maintain substantial in-memory structures for partial aggregation results, particularly when performing terms aggregations on high-cardinality fields. This forces users to either restrict query scope or over-provision hardware resources on data nodes.
  2. Garbage Collection Overhead: Large in-memory data structures trigger frequent garbage collection cycles, consuming CPU resources and degrading performance of other operations on data nodes. This can create a cascading effect: GC pauses extend response times, causing request queuing, which further increases memory demands.
  3. Resource Contention: Memory-intensive aggregations compete with other critical operations for resources on data nodes, leading to unpredictable cluster performance and potential service degradation.

Opportunity

The proposed streaming model eliminates the need for data nodes to accumulate results by implementing controlled streaming of partial results. This transformation offers several benefits:

  1. Liberation of Data Nodes: The streaming model caps peak memory usage on data nodes to the configured streaming buffer size, providing predictable resource allocation and improved isolation between operations.
  2. Enhanced Stability: Stream processing prevents data nodes from becoming overwhelmed by sudden spikes in aggregation workloads of unpredictable memory patterns.
  3. Flexible and Cost-Effective Scaling: This change enables independent scaling of coordinator fleet, which typically comprises less than 10% of the cluster and often under-utilized in terms of heap usage. For example, you can vertically scale coordinators to handle aggregations with memory-optimized hardware with big heap, while reduce heap on data nodes where page cache is more important. This would make resource utilization more balanced across the cluster, improving overall performance efficiency.

Proposed Solution

Stream Producer (Data Node)

  • Implement Arrow-based memory representation for partial aggregation buckets
  • Replace bulk partial result accumulation with incremental streaming to coordinator

Stream Consumer (Coordinator Node)

  • Implement a streaming merger to efficiently buffer and merge streamed partial results
  • Back-pressure mechanisms to prevent overwhelming the coordinator

Execution Planner (Coordinator Node)

  • Refactor the existing routing and transport code to suit for streaming communication
  • Adaptive smart per-request buffer sizing based on statistics like system load, throughput and latency
---
title: Data Flow Diagram
---
flowchart TB
%% Legend
subgraph Legend["Legend"]
    direction LR
    NewComponent["New"] ~~~ OldComponent["Existing"]
    style NewComponent fill:#bbf,stroke-width:0px,font-size:9pt,width:30px,height:30px
    style OldComponent fill:#bfb,stroke-width:0px,font-size:9pt,width:30px,height:30px
end
subgraph Coordinator["Coordinator Node"]
    QueryPlanner["Execution Planner"]
    StreamConsumer["Stream Consumer"]
    StreamMerger["Stream Merger"]
end
subgraph DataNode1["Data Node 1"]
    Scanner1["Searcher"]
    LocalAgg1["Aggregator"]
    Stream1["Stream Producer"]
end
subgraph DataNode2["Data Node 2"]
    Scanner2["Searcher"]
    LocalAgg2["Aggregator"]
    Stream2["Stream Producer"]
end
%% Query Flow
Query["Aggregation Query"] --> QueryPlanner
QueryPlanner -->|shard request| Scanner1
QueryPlanner -->|shard request| Scanner2
%% Data Node 1 Flow
Scanner1 --> LocalAgg1
LocalAgg1 -->|results in arrow format| Stream1
Stream1 --> StreamConsumer
%% Data Node 2 Flow
Scanner2 --> LocalAgg2
LocalAgg2 -->|results in arrow format| Stream2
Stream2 --> StreamConsumer
%% Coordinator Processing
StreamConsumer --> StreamMerger
StreamMerger --> Result["Final Result"]
%% Styling
classDef new fill:#bbf,stroke:#333,stroke-width:2px
classDef old fill:#bfb,stroke:#333,stroke-width:2px
class QueryPlanner,StreamMerger,StreamConsumer new
class Stream1,Stream2 new
class Scanner1,LocalAgg1,Scanner2,LocalAgg2 old
class Query,Result old
Loading

Compatibility Considerations

We plan to start with terms bucket aggregation and stats metric aggregation, and evaluate the approach before extending this to more aggregation types.

  • Streaming aggregation works within cluster between nodes and should be compatible with existing aggregation APIs
  • Provides configuration options to enable/disable streaming per request for users to compare with old aggregation implementation

Success Criteria

We plan to work on terms bucket aggregation and stats metric aggregation first.
We plan to use nyc_taxis and big5 dataset and simulated real-world workload of queries with aggregations to benchmark

  1. Memory usage and GC pause time reduction on data nodes under heavy aggregation load (target: 30~50% reduction)
  2. Improved latency for query with streamed aggregation (target: 30~50% improvement)
  3. Enhanced cluster stability under heavy aggregation loads (target: 100% improvement on red-line QPS)

Call for Feedback

We welcome community feedback on:

  1. The overall approach and architecture
  2. Additional use cases to consider
  3. Any related optimizations or enhancements

References

@Vikasht34
Copy link

Enhancing Streaming Aggregation with Push-Down Aggregation Architecture

This RFC presents a great approach to improving aggregation efficiency by streaming partial results from data nodes to the coordinator. While this method helps in reducing memory overhead on data nodes, I’d like to propose an additional Push-Down Aggregation Strategy, similar to what Presto/Trino employs, which could further optimize performance.

How Push-Down Aggregation Works

Instead of streaming raw partial results to the coordinator, data nodes can compute aggregations locally before sending only pre-aggregated results. This means:

  • Data nodes perform local aggregations (e.g., SUM, COUNT, AVG) before sending data.
  • The coordinator merges these precomputed results, rather than reprocessing raw data.
  • This minimizes network traffic and reduces memory pressure on the coordinator.

Comparison: Streaming vs. Push-Down

Feature Streaming Aggregation (RFC) Push-Down Aggregation
Where Aggregation Happens? Data nodes send incremental results; final aggregation at coordinator Data nodes compute partial aggregations locally
Memory Utilization Reduces memory on data nodes, increases on coordinator Reduces memory on both data nodes and coordinator
Network Efficiency Streams continuous data to coordinator Sends only aggregated results (less data transfer)
Best Use Case Real-time monitoring, dynamic aggregations High-cardinality aggregations, batch queries

Potential Hybrid Model

Instead of choosing either streaming aggregation or push-down aggregation, OpenSearch could use a hybrid approach:

  • Push-down aggregation for simple operations like SUM, AVG, COUNT.
  • Streaming aggregation when aggregations cannot be fully computed on data nodes.
  • Adaptive execution: If memory is a constraint, push more computation to data nodes.

Next Steps

Would love to hear thoughts on integrating push-down aggregation optimizations into the streaming aggregation framework. This could help OpenSearch scale better under heavy aggregation workloads while maintaining real-time responsiveness.

Reference: Presto/Trino’s push-down aggregation model: Trino Optimizer Docs

@navneet1v
Copy link
Contributor

@bowenlan-amzn thanks for putting up this RFC. The streaming based aggregations is a good idea. But when I think streaming based aggregations will be most useful with Bucket aggregation. Do you see any use-case where this will be useful for Metrics aggregations?

@harshavamsi
Copy link
Contributor

Update on streaming aggregations

I used the streaming APIs provided as part of #16679 to implement custom aggregation logic starting with terms aggregation and then extending the same approach to cardinality aggregations. Sharing some preliminary results here.

We ran a comprehensive benchmark workload on the big5 data set using the terms aggregation query

{
    "size": 0,
    "aggs": {
        "station": {
            "terms": {
                "field": "aws.cloudwatch.log_stream",
                "size": 500
            }
        }
    }
}

This represents a common aggregation use case that is intensive on a cluster. We disabled the optimization that relies on the simple term lookup since the optimization only kicks in if there is no top level query.

Latency comparison, green is streaming

Image

Throughput comparison, green is streaming

Image

CPU comparison, left is streaming

Image

Heap comparison, left is streaming

Image

Results analysis

We see from these results that streaming is significantly better at latency and throughput outperforming baseline. As the number of search clients ramp up, we see that streaming is not affected. At peak search clients streaming latency is over 3x better compared to baseline. The latency benefit can be explained from the lack of serialization and de-serialization of the aggregation results, zero-memory copy over the wire using apache flight and the efficient apache arrow storage format letting for SIMD execution. We see that although peak CPU utilization is approximately the same, CPU during streaming is spiky. The spikes can be explained by the fact that each data node performs the aggregation and send the results back to the coordinator incrementally. While a new data node awaits for its next aggregation task, its CPU drops. We also see that streaming uses significantly less heap. This can be explained by the fact that there are no stale aggregation objects being created in memory. These results point to an impressive aggregation use case for streaming.

Testing with cardinality aggregations

I also tested streaming with cardinality aggregations to see how metric-agg types would do in the streaming world. I chose cardinality as it was the simplest to implement. Cardinality aggregations has multiple collector implementation and picks one depending on the amount of memory available. Direct collector is a really slow implementation, while ordinals collector consumes more memory since it needs to see every ordinal. I was able to reproduce ordinals collector with streaming. I chose a high cardinality field and a super high cardinality field to test how streaming would do against baseline.

      "body": {
        "size": 0,
        "aggs": {
          "agent": {
            "cardinality": {
              "field": "agent.name"
            }
          }
        }
      }
    }

and

      "body": {
        "size": 0,
        "aggs": {
          "agent": {
            "cardinality": {
              "field": "event.id" // super high cardinality
            }
          }
        }
      }
    }

CPU utilization, streaming on the left

Image

Heap, streaming on the left

Image

Latency with super high cardinality, streaming in green

Image

Latency with high cardinality, streaming in green

Image

Results analysis

Similar to terms aggregation, cardinality aggregations perform better with streaming. The reason we see a significant latency difference with super high cardinality is because baseline by default uses a direct collector while streaming uses the ordinals collector. Although we use the ordinals collector for streaming, we see that we have lower heap utilization in streaming making this approach really efficient. This essentially means that we can always use the more efficient ordinals collector without having memory issues.

Next steps

Given we have sufficient evidence of the benefit of streaming aggregations, we will productionize each type to work with streaming starting with terms and cardinality aggregations. Sub aggregations is currently a work in progress and will be the major point of work before we can proceed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Roadmap:Search Project-wide roadmap label Search:Aggregations Search:Performance v3.0.0 Issues and PRs related to version 3.0.0
Projects
Status: New
Status: Done
Status: 🆕 New
Development

No branches or pull requests

8 participants