Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't send receiver reports for SSRC during simulcast probe #623

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions interceptor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub mod twcc;

pub use error::Error;

/// Attribute indicating the stream is probing incoming packets.
pub const ATTR_READ_PROBE: usize = 2295978936;

/// Attributes are a generic key/value store used by interceptors
pub type Attributes = HashMap<usize, usize>;

Expand Down
16 changes: 11 additions & 5 deletions interceptor/src/report/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ impl ReceiverReport {
m.values().cloned().collect()
};
for stream in streams {
let pkt = stream.generate_report(now);

let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
log::warn!("failed sending: {}", err);
if let Some(pkt) = stream.generate_report(now) {
let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
log::warn!("failed sending: {}", err);
}
}
}
}
Expand Down Expand Up @@ -186,11 +186,17 @@ impl Interceptor for ReceiverReport {
info: &StreamInfo,
reader: Arc<dyn RTPReader + Send + Sync>,
) -> Arc<dyn RTPReader + Send + Sync> {
let wait_for_probe = info
.attributes
.get(&crate::ATTR_READ_PROBE)
.is_some_and(|v| *v != 0);

let stream = Arc::new(ReceiverStream::new(
info.ssrc,
info.clock_rate,
reader,
self.internal.now.clone(),
wait_for_probe,
));
{
let mut streams = self.internal.streams.lock().await;
Expand Down
26 changes: 20 additions & 6 deletions interceptor/src/report/receiver/receiver_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct ReceiverStreamInternal {

packets: Vec<u64>,
started: bool,
wait_for_probe: bool,
seq_num_cycles: u16,
last_seq_num: i32,
last_report_seq_num: i32,
Expand Down Expand Up @@ -40,7 +41,7 @@ impl ReceiverStreamInternal {
(self.packets[pos / 64] & (1 << (pos % 64))) != 0
}

fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) {
fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
if !self.started {
// first frame
self.started = true;
Expand Down Expand Up @@ -79,6 +80,7 @@ impl ReceiverStreamInternal {

self.last_rtp_time_rtp = pkt.header.timestamp;
self.last_rtp_time_time = now;
self.wait_for_probe &= is_probe;
}

fn process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport) {
Expand Down Expand Up @@ -158,6 +160,7 @@ impl ReceiverStream {
clock_rate: u32,
reader: Arc<dyn RTPReader + Send + Sync>,
now: Option<FnTimeGen>,
wait_for_probe: bool,
) -> Self {
let receiver_ssrc = rand::random::<u32>();
ReceiverStream {
Expand All @@ -171,6 +174,7 @@ impl ReceiverStream {

packets: vec![0u64; 128],
started: false,
wait_for_probe,
seq_num_cycles: 0,
last_seq_num: 0,
last_report_seq_num: 0,
Expand All @@ -184,9 +188,9 @@ impl ReceiverStream {
}
}

pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
let mut internal = self.internal.lock();
internal.process_rtp(now, pkt);
internal.process_rtp(now, pkt, is_probe);
}

pub(crate) fn process_sender_report(
Expand All @@ -198,9 +202,17 @@ impl ReceiverStream {
internal.process_sender_report(now, sr);
}

pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
pub(crate) fn generate_report(
&self,
now: SystemTime,
) -> Option<rtcp::receiver_report::ReceiverReport> {
let mut internal = self.internal.lock();
internal.generate_report(now)

if internal.wait_for_probe {
return None;
}

Some(internal.generate_report(now))
}
}

Expand All @@ -213,14 +225,16 @@ impl RTPReader for ReceiverStream {
buf: &mut [u8],
a: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
let is_probe = a.get(&crate::ATTR_READ_PROBE).is_some_and(|v| *v != 0);

let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;

let now = if let Some(f) = &self.now {
f()
} else {
SystemTime::now()
};
self.process_rtp(now, &pkt);
self.process_rtp(now, &pkt, is_probe);

Ok((pkt, attr))
}
Expand Down
67 changes: 67 additions & 0 deletions interceptor/src/report/receiver/receiver_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,73 @@ async fn test_receiver_interceptor_before_any_packet() -> Result<()> {
Ok(())
}

#[tokio::test(start_paused = true)]
async fn test_receiver_interceptor_read_probe() -> Result<()> {
let mt = Arc::new(MockTime::default());
let time_gen = {
let mt = Arc::clone(&mt);
Arc::new(move || mt.now())
};

let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
.with_interval(Duration::from_millis(50))
.with_now_fn(time_gen)
.build("")?;

let stream = MockStream::new(
&StreamInfo {
ssrc: 123456,
clock_rate: 90000,
attributes: [(crate::ATTR_READ_PROBE, 1)].into_iter().collect(),
..Default::default()
},
icpr,
)
.await;

// no report initially
tokio::time::timeout(Duration::from_millis(60), stream.written_rtcp())
.await
.expect_err("expected no report");

stream
.receive_rtp(rtp::packet::Packet {
header: rtp::header::Header {
sequence_number: 7,
..Default::default()
},
..Default::default()
})
.await;

let pkts = stream.written_rtcp().await.unwrap();
assert_eq!(pkts.len(), 1);
if let Some(rr) = pkts[0]
.as_any()
.downcast_ref::<rtcp::receiver_report::ReceiverReport>()
{
assert_eq!(rr.reports.len(), 1);
assert_eq!(
rr.reports[0],
rtcp::reception_report::ReceptionReport {
ssrc: 123456,
last_sequence_number: 7,
last_sender_report: 0,
fraction_lost: 0,
total_lost: 0,
delay: 0,
jitter: 0,
}
)
} else {
panic!();
}

stream.close().await?;

Ok(())
}

#[tokio::test]
async fn test_receiver_interceptor_after_rtp_packets() -> Result<()> {
let mt = Arc::new(MockTime::default());
Expand Down
8 changes: 7 additions & 1 deletion webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,14 +1060,20 @@ impl PeerConnectionInternal {
None => return Err(Error::ErrInterceptorNotBind),
};

let stream_info = create_stream_info(
let mut stream_info = create_stream_info(
"".to_owned(),
ssrc,
params.codecs[0].payload_type,
params.codecs[0].capability.clone(),
&params.header_extensions,
None,
);

// indicate this stream starts with probing
stream_info
.attributes
.insert(interceptor::ATTR_READ_PROBE, 1);

let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
.dtls_transport
.streams_for_ssrc(ssrc, &stream_info, &icpr)
Expand Down
Loading