Skip to content

Commit 2b346d1

Browse files
committed
feat(s2n-quic-core): add aggregate metrics support
1 parent 65d55a4 commit 2b346d1

File tree

20 files changed

+3566
-957
lines changed

20 files changed

+3566
-957
lines changed

dc/s2n-quic-dc/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ exclude = ["corpus.tar.gz"]
1212

1313
[features]
1414
default = ["tokio"]
15-
testing = ["bolero-generator", "s2n-quic-core/testing"]
15+
testing = ["bolero-generator", "s2n-quic-core/testing", "tracing-subscriber"]
1616
tokio = ["tokio/io-util", "tokio/net", "tokio/rt-multi-thread", "tokio/time"]
1717

1818
[dependencies]
@@ -40,6 +40,7 @@ slotmap = "1"
4040
thiserror = "1"
4141
tokio = { version = "1", default-features = false, features = ["sync"] }
4242
tracing = "0.1"
43+
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
4344
zerocopy = { version = "0.7", features = ["derive"] }
4445
zeroize = "1"
4546
parking_lot = "0.12"
@@ -51,6 +52,7 @@ insta = "1"
5152
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
5253
s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] }
5354
tokio = { version = "1", features = ["full"] }
55+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
5456

5557
[lints.rust.unexpected_cfgs]
5658
level = "warn"

dc/s2n-quic-dc/src/event.rs

+10
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,13 @@ impl Meta for api::EndpointMeta {
2828

2929
mod generated;
3030
pub use generated::*;
31+
32+
pub mod metrics {
33+
pub use crate::event::generated::metrics::*;
34+
pub use s2n_quic_core::event::metrics::Recorder;
35+
36+
pub mod aggregate {
37+
pub use crate::event::generated::metrics::aggregate::*;
38+
pub use s2n_quic_core::event::metrics::aggregate::{info, probe, Info, Recorder, Registry};
39+
}
40+
}

dc/s2n-quic-dc/src/event/generated.rs

+1-76
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// changes should be made there.
77

88
use super::*;
9+
pub(crate) mod metrics;
910
pub mod api {
1011
#![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"]
1112
use super::*;
@@ -2673,82 +2674,6 @@ mod traits {
26732674
}
26742675
}
26752676
}
2676-
pub mod metrics {
2677-
use super::*;
2678-
use core::sync::atomic::{AtomicU32, Ordering};
2679-
use s2n_quic_core::event::metrics::Recorder;
2680-
#[derive(Debug)]
2681-
pub struct Subscriber<S: super::Subscriber>
2682-
where
2683-
S::ConnectionContext: Recorder,
2684-
{
2685-
subscriber: S,
2686-
}
2687-
impl<S: super::Subscriber> Subscriber<S>
2688-
where
2689-
S::ConnectionContext: Recorder,
2690-
{
2691-
pub fn new(subscriber: S) -> Self {
2692-
Self { subscriber }
2693-
}
2694-
}
2695-
pub struct Context<R: Recorder> {
2696-
recorder: R,
2697-
application_write: AtomicU32,
2698-
application_read: AtomicU32,
2699-
}
2700-
impl<S: super::Subscriber> super::Subscriber for Subscriber<S>
2701-
where
2702-
S::ConnectionContext: Recorder,
2703-
{
2704-
type ConnectionContext = Context<S::ConnectionContext>;
2705-
fn create_connection_context(
2706-
&self,
2707-
meta: &api::ConnectionMeta,
2708-
info: &api::ConnectionInfo,
2709-
) -> Self::ConnectionContext {
2710-
Context {
2711-
recorder: self.subscriber.create_connection_context(meta, info),
2712-
application_write: AtomicU32::new(0),
2713-
application_read: AtomicU32::new(0),
2714-
}
2715-
}
2716-
#[inline]
2717-
fn on_application_write(
2718-
&self,
2719-
context: &Self::ConnectionContext,
2720-
meta: &api::ConnectionMeta,
2721-
event: &api::ApplicationWrite,
2722-
) {
2723-
context.application_write.fetch_add(1, Ordering::Relaxed);
2724-
self.subscriber
2725-
.on_application_write(&context.recorder, meta, event);
2726-
}
2727-
#[inline]
2728-
fn on_application_read(
2729-
&self,
2730-
context: &Self::ConnectionContext,
2731-
meta: &api::ConnectionMeta,
2732-
event: &api::ApplicationRead,
2733-
) {
2734-
context.application_read.fetch_add(1, Ordering::Relaxed);
2735-
self.subscriber
2736-
.on_application_read(&context.recorder, meta, event);
2737-
}
2738-
}
2739-
impl<R: Recorder> Drop for Context<R> {
2740-
fn drop(&mut self) {
2741-
self.recorder.increment_counter(
2742-
"application_write",
2743-
self.application_write.load(Ordering::Relaxed) as _,
2744-
);
2745-
self.recorder.increment_counter(
2746-
"application_read",
2747-
self.application_read.load(Ordering::Relaxed) as _,
2748-
);
2749-
}
2750-
}
2751-
}
27522677
#[cfg(any(test, feature = "testing"))]
27532678
pub mod testing {
27542679
use super::*;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// DO NOT MODIFY THIS FILE
5+
// This file was generated with the `s2n-quic-events` crate and any required
6+
// changes should be made there.
7+
8+
use crate::event::{self, api, metrics::Recorder};
9+
use core::sync::atomic::{AtomicU32, Ordering};
10+
pub(crate) mod aggregate;
11+
#[derive(Debug)]
12+
pub struct Subscriber<S: event::Subscriber>
13+
where
14+
S::ConnectionContext: Recorder,
15+
{
16+
subscriber: S,
17+
}
18+
impl<S: event::Subscriber> Subscriber<S>
19+
where
20+
S::ConnectionContext: Recorder,
21+
{
22+
pub fn new(subscriber: S) -> Self {
23+
Self { subscriber }
24+
}
25+
}
26+
pub struct Context<R: Recorder> {
27+
recorder: R,
28+
application_write: AtomicU32,
29+
application_read: AtomicU32,
30+
}
31+
impl<S: event::Subscriber> event::Subscriber for Subscriber<S>
32+
where
33+
S::ConnectionContext: Recorder,
34+
{
35+
type ConnectionContext = Context<S::ConnectionContext>;
36+
fn create_connection_context(
37+
&self,
38+
meta: &api::ConnectionMeta,
39+
info: &api::ConnectionInfo,
40+
) -> Self::ConnectionContext {
41+
Context {
42+
recorder: self.subscriber.create_connection_context(meta, info),
43+
application_write: AtomicU32::new(0),
44+
application_read: AtomicU32::new(0),
45+
}
46+
}
47+
#[inline]
48+
fn on_application_write(
49+
&self,
50+
context: &Self::ConnectionContext,
51+
meta: &api::ConnectionMeta,
52+
event: &api::ApplicationWrite,
53+
) {
54+
context.application_write.fetch_add(1, Ordering::Relaxed);
55+
self.subscriber
56+
.on_application_write(&context.recorder, meta, event);
57+
}
58+
#[inline]
59+
fn on_application_read(
60+
&self,
61+
context: &Self::ConnectionContext,
62+
meta: &api::ConnectionMeta,
63+
event: &api::ApplicationRead,
64+
) {
65+
context.application_read.fetch_add(1, Ordering::Relaxed);
66+
self.subscriber
67+
.on_application_read(&context.recorder, meta, event);
68+
}
69+
}
70+
impl<R: Recorder> Drop for Context<R> {
71+
fn drop(&mut self) {
72+
self.recorder.increment_counter(
73+
"application_write",
74+
self.application_write.load(Ordering::Relaxed) as _,
75+
);
76+
self.recorder.increment_counter(
77+
"application_read",
78+
self.application_read.load(Ordering::Relaxed) as _,
79+
);
80+
}
81+
}

0 commit comments

Comments
 (0)