Skip to content

Commit 6369ef3

Browse files
authored
feat(s2n-quic-core): add aggregate metrics support (#2364)
1 parent 65d55a4 commit 6369ef3

File tree

29 files changed

+5260
-979
lines changed

29 files changed

+5260
-979
lines changed

dc/s2n-quic-dc/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ exclude = ["corpus.tar.gz"]
1212

1313
[features]
1414
default = ["tokio"]
15-
testing = ["bolero-generator", "s2n-quic-core/testing"]
15+
testing = ["bolero-generator", "s2n-quic-core/testing", "tracing-subscriber"]
1616
tokio = ["tokio/io-util", "tokio/net", "tokio/rt-multi-thread", "tokio/time"]
1717

1818
[dependencies]
@@ -40,6 +40,7 @@ slotmap = "1"
4040
thiserror = "1"
4141
tokio = { version = "1", default-features = false, features = ["sync"] }
4242
tracing = "0.1"
43+
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
4344
zerocopy = { version = "0.7", features = ["derive"] }
4445
zeroize = "1"
4546
parking_lot = "0.12"
@@ -51,6 +52,7 @@ insta = "1"
5152
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
5253
s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] }
5354
tokio = { version = "1", features = ["full"] }
55+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
5456

5557
[lints.rust.unexpected_cfgs]
5658
level = "warn"

dc/s2n-quic-dc/events/connection.rs

+6
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,23 @@
44
#[event("application:write")]
55
pub struct ApplicationWrite {
66
/// The number of bytes that the application tried to write
7+
#[measure("provided", "b")]
78
total_len: usize,
89

910
/// The amount that was written
11+
#[measure("committed", "b")]
12+
#[counter("committed.total", "b")]
1013
write_len: usize,
1114
}
1215

1316
#[event("application:read")]
1417
pub struct ApplicationRead {
1518
/// The number of bytes that the application tried to read
19+
#[measure("capacity", "b")]
1620
capacity: usize,
1721

1822
/// The amount that was read
23+
#[measure("committed", "b")]
24+
#[counter("committed.total", "b")]
1925
read_len: usize,
2026
}

dc/s2n-quic-dc/events/map.rs

+4
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
#[subject(endpoint)]
66
struct PathSecretMapInitialized {
77
/// The capacity of the path secret map
8+
#[measure("capacity")]
89
capacity: usize,
910
}
1011

1112
#[event("path_secret_map:uninitialized")]
1213
#[subject(endpoint)]
1314
struct PathSecretMapUninitialized {
1415
/// The capacity of the path secret map
16+
#[measure("capacity")]
1517
capacity: usize,
1618

1719
/// The number of entries in the map
20+
#[measure("entries")]
1821
entries: usize,
1922
}
2023

@@ -128,6 +131,7 @@ struct ReplayPotentiallyDetected<'a> {
128131

129132
key_id: u64,
130133

134+
#[measure("gap")]
131135
gap: u64,
132136
}
133137

dc/s2n-quic-dc/src/event.rs

+17
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,20 @@ impl Meta for api::EndpointMeta {
2828

2929
mod generated;
3030
pub use generated::*;
31+
32+
pub mod metrics {
33+
pub use crate::event::generated::metrics::*;
34+
pub use s2n_quic_core::event::metrics::Recorder;
35+
36+
pub mod aggregate {
37+
pub use crate::event::generated::metrics::aggregate::*;
38+
pub use s2n_quic_core::event::metrics::aggregate::{
39+
info, AsMetric, Info, Recorder, Registry,
40+
};
41+
42+
pub mod probe {
43+
pub use crate::event::generated::metrics::probe::*;
44+
pub use s2n_quic_core::event::metrics::aggregate::probe::dynamic;
45+
}
46+
}
47+
}

dc/s2n-quic-dc/src/event/generated.rs

+1-76
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// changes should be made there.
77

88
use super::*;
9+
pub(crate) mod metrics;
910
pub mod api {
1011
#![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"]
1112
use super::*;
@@ -2673,82 +2674,6 @@ mod traits {
26732674
}
26742675
}
26752676
}
2676-
pub mod metrics {
2677-
use super::*;
2678-
use core::sync::atomic::{AtomicU32, Ordering};
2679-
use s2n_quic_core::event::metrics::Recorder;
2680-
#[derive(Debug)]
2681-
pub struct Subscriber<S: super::Subscriber>
2682-
where
2683-
S::ConnectionContext: Recorder,
2684-
{
2685-
subscriber: S,
2686-
}
2687-
impl<S: super::Subscriber> Subscriber<S>
2688-
where
2689-
S::ConnectionContext: Recorder,
2690-
{
2691-
pub fn new(subscriber: S) -> Self {
2692-
Self { subscriber }
2693-
}
2694-
}
2695-
pub struct Context<R: Recorder> {
2696-
recorder: R,
2697-
application_write: AtomicU32,
2698-
application_read: AtomicU32,
2699-
}
2700-
impl<S: super::Subscriber> super::Subscriber for Subscriber<S>
2701-
where
2702-
S::ConnectionContext: Recorder,
2703-
{
2704-
type ConnectionContext = Context<S::ConnectionContext>;
2705-
fn create_connection_context(
2706-
&self,
2707-
meta: &api::ConnectionMeta,
2708-
info: &api::ConnectionInfo,
2709-
) -> Self::ConnectionContext {
2710-
Context {
2711-
recorder: self.subscriber.create_connection_context(meta, info),
2712-
application_write: AtomicU32::new(0),
2713-
application_read: AtomicU32::new(0),
2714-
}
2715-
}
2716-
#[inline]
2717-
fn on_application_write(
2718-
&self,
2719-
context: &Self::ConnectionContext,
2720-
meta: &api::ConnectionMeta,
2721-
event: &api::ApplicationWrite,
2722-
) {
2723-
context.application_write.fetch_add(1, Ordering::Relaxed);
2724-
self.subscriber
2725-
.on_application_write(&context.recorder, meta, event);
2726-
}
2727-
#[inline]
2728-
fn on_application_read(
2729-
&self,
2730-
context: &Self::ConnectionContext,
2731-
meta: &api::ConnectionMeta,
2732-
event: &api::ApplicationRead,
2733-
) {
2734-
context.application_read.fetch_add(1, Ordering::Relaxed);
2735-
self.subscriber
2736-
.on_application_read(&context.recorder, meta, event);
2737-
}
2738-
}
2739-
impl<R: Recorder> Drop for Context<R> {
2740-
fn drop(&mut self) {
2741-
self.recorder.increment_counter(
2742-
"application_write",
2743-
self.application_write.load(Ordering::Relaxed) as _,
2744-
);
2745-
self.recorder.increment_counter(
2746-
"application_read",
2747-
self.application_read.load(Ordering::Relaxed) as _,
2748-
);
2749-
}
2750-
}
2751-
}
27522677
#[cfg(any(test, feature = "testing"))]
27532678
pub mod testing {
27542679
use super::*;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// DO NOT MODIFY THIS FILE
5+
// This file was generated with the `s2n-quic-events` crate and any required
6+
// changes should be made there.
7+
8+
use crate::event::{self, api, metrics::Recorder};
9+
use core::sync::atomic::{AtomicU32, Ordering};
10+
pub(crate) mod aggregate;
11+
pub(crate) mod probe;
12+
#[derive(Debug)]
13+
pub struct Subscriber<S: event::Subscriber>
14+
where
15+
S::ConnectionContext: Recorder,
16+
{
17+
subscriber: S,
18+
}
19+
impl<S: event::Subscriber> Subscriber<S>
20+
where
21+
S::ConnectionContext: Recorder,
22+
{
23+
pub fn new(subscriber: S) -> Self {
24+
Self { subscriber }
25+
}
26+
}
27+
pub struct Context<R: Recorder> {
28+
recorder: R,
29+
application_write: AtomicU32,
30+
application_read: AtomicU32,
31+
}
32+
impl<S: event::Subscriber> event::Subscriber for Subscriber<S>
33+
where
34+
S::ConnectionContext: Recorder,
35+
{
36+
type ConnectionContext = Context<S::ConnectionContext>;
37+
fn create_connection_context(
38+
&self,
39+
meta: &api::ConnectionMeta,
40+
info: &api::ConnectionInfo,
41+
) -> Self::ConnectionContext {
42+
Context {
43+
recorder: self.subscriber.create_connection_context(meta, info),
44+
application_write: AtomicU32::new(0),
45+
application_read: AtomicU32::new(0),
46+
}
47+
}
48+
#[inline]
49+
fn on_application_write(
50+
&self,
51+
context: &Self::ConnectionContext,
52+
meta: &api::ConnectionMeta,
53+
event: &api::ApplicationWrite,
54+
) {
55+
context.application_write.fetch_add(1, Ordering::Relaxed);
56+
self.subscriber
57+
.on_application_write(&context.recorder, meta, event);
58+
}
59+
#[inline]
60+
fn on_application_read(
61+
&self,
62+
context: &Self::ConnectionContext,
63+
meta: &api::ConnectionMeta,
64+
event: &api::ApplicationRead,
65+
) {
66+
context.application_read.fetch_add(1, Ordering::Relaxed);
67+
self.subscriber
68+
.on_application_read(&context.recorder, meta, event);
69+
}
70+
}
71+
impl<R: Recorder> Drop for Context<R> {
72+
fn drop(&mut self) {
73+
self.recorder.increment_counter(
74+
"application_write",
75+
self.application_write.load(Ordering::Relaxed) as _,
76+
);
77+
self.recorder.increment_counter(
78+
"application_read",
79+
self.application_read.load(Ordering::Relaxed) as _,
80+
);
81+
}
82+
}

0 commit comments

Comments
 (0)