Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul stats: decouple events from stats #1400

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mod tests {
}

struct CoreHttpTrackerServices {
pub http_stats_event_sender: Arc<Option<Box<dyn bittorrent_http_tracker_core::statistics::event::sender::Sender>>>,
pub http_stats_event_sender: Arc<Option<Box<dyn bittorrent_http_tracker_core::event::sender::Sender>>>,
}

fn initialize_private_tracker() -> (CoreTrackerServices, CoreHttpTrackerServices) {
Expand Down
9 changes: 5 additions & 4 deletions packages/http-tracker-core/benches/helpers/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct CoreTrackerServices {
}

pub struct CoreHttpTrackerServices {
pub http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
}

pub fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) {
Expand Down Expand Up @@ -105,14 +105,15 @@ pub fn sample_info_hash() -> InfoHash {
.expect("String should be a valid info hash")
}

use bittorrent_http_tracker_core::statistics;
use bittorrent_http_tracker_core::event::Event;
use bittorrent_http_tracker_core::{event, statistics};
use futures::future::BoxFuture;
use mockall::mock;
use tokio::sync::broadcast::error::SendError;

mock! {
HttpStatsEventSender {}
impl statistics::event::sender::Sender for HttpStatsEventSender {
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
impl event::sender::Sender for HttpStatsEventSender {
fn send_event(&self, event: Event) -> BoxFuture<'static,Option<Result<usize,SendError<Event> > > > ;
}
}
4 changes: 2 additions & 2 deletions packages/http-tracker-core/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use torrust_tracker_configuration::{Core, HttpTracker};

use crate::services::announce::AnnounceService;
use crate::services::scrape::ScrapeService;
use crate::statistics;
use crate::{event, statistics};

pub struct HttpTrackerCoreContainer {
// todo: replace with TrackerCoreContainer
Expand All @@ -20,7 +20,7 @@ pub struct HttpTrackerCoreContainer {
pub authentication_service: Arc<AuthenticationService>,

pub http_tracker_config: Arc<HttpTracker>,
pub http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
pub http_stats_repository: Arc<statistics::repository::Repository>,
pub announce_service: Arc<AnnounceService>,
pub scrape_service: Arc<ScrapeService>,
Expand Down
59 changes: 59 additions & 0 deletions packages/http-tracker-core/src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::net::{IpAddr, SocketAddr};

pub mod sender;

/// A HTTP core event.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Event {
TcpAnnounce { connection: ConnectionContext },
TcpScrape { connection: ConnectionContext },
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ConnectionContext {
client: ClientConnectionContext,
server: ServerConnectionContext,
}

impl ConnectionContext {
#[must_use]
pub fn new(client_ip_addr: IpAddr, opt_client_port: Option<u16>, server_socket_addr: SocketAddr) -> Self {
Self {
client: ClientConnectionContext {
ip_addr: client_ip_addr,
port: opt_client_port,
},
server: ServerConnectionContext {
socket_addr: server_socket_addr,
},
}
}

#[must_use]
pub fn client_ip_addr(&self) -> IpAddr {
self.client.ip_addr
}

#[must_use]
pub fn client_port(&self) -> Option<u16> {
self.client.port
}

#[must_use]
pub fn server_socket_addr(&self) -> SocketAddr {
self.server.socket_addr
}
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ClientConnectionContext {
ip_addr: IpAddr,

/// It's provided if you use the `torrust-axum-http-tracker-server` crate.
port: Option<u16>,
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ServerConnectionContext {
socket_addr: SocketAddr,
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,35 @@ use tokio::sync::broadcast::error::SendError;

use super::Event;

/// A trait to allow sending statistics events
const CHANNEL_CAPACITY: usize = 1024;

/// A trait for sending sending.
#[cfg_attr(test, automock)]
pub trait Sender: Sync + Send {
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>>;
}

/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation.
///
/// It uses a channel sender to send the statistic events. The channel is created by a
/// [`statistics::Keeper`](crate::statistics::keeper::Keeper)
#[allow(clippy::module_name_repetitions)]
pub struct ChannelSender {
/// An event sender implementation using a broadcast channel.
pub struct Broadcaster {
pub(crate) sender: broadcast::Sender<Event>,
}

impl Sender for ChannelSender {
impl Sender for Broadcaster {
fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
async move { Some(self.sender.send(event)) }.boxed()
}
}

impl Default for Broadcaster {
fn default() -> Self {
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
Self { sender }
}
}

impl Broadcaster {
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
}
1 change: 1 addition & 0 deletions packages/http-tracker-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod container;
pub mod event;
pub mod services;
pub mod statistics;

Expand Down
42 changes: 22 additions & 20 deletions packages/http-tracker-core/src/services/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! It delegates the `announce` logic to the [`AnnounceHandler`] and it returns
//! the [`AnnounceData`].
//!
//! It also sends an [`http_tracker_core::statistics::event::Event`]
//! It also sends an [`http_tracker_core::event::Event`]
//! because events are specific for the HTTP tracker.
use std::net::{IpAddr, SocketAddr};
use std::panic::Location;
Expand All @@ -22,7 +22,8 @@ use bittorrent_tracker_core::whitelist;
use torrust_tracker_configuration::Core;
use torrust_tracker_primitives::core::AnnounceData;

use crate::statistics;
use crate::event;
use crate::event::Event;

/// The HTTP tracker `announce` service.
///
Expand All @@ -35,7 +36,7 @@ pub struct AnnounceService {
announce_handler: Arc<AnnounceHandler>,
authentication_service: Arc<AuthenticationService>,
whitelist_authorization: Arc<whitelist::authorization::WhitelistAuthorization>,
opt_http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
opt_http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
}

impl AnnounceService {
Expand All @@ -45,7 +46,7 @@ impl AnnounceService {
announce_handler: Arc<AnnounceHandler>,
authentication_service: Arc<AuthenticationService>,
whitelist_authorization: Arc<whitelist::authorization::WhitelistAuthorization>,
opt_http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
opt_http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
) -> Self {
Self {
core_config,
Expand Down Expand Up @@ -86,7 +87,7 @@ impl AnnounceService {
.announce(&announce_request.info_hash, &mut peer, &remote_client_ip, &peers_wanted)
.await?;

self.send_stats_event(remote_client_ip, opt_remote_client_port, *server_socket_addr)
self.send_event(remote_client_ip, opt_remote_client_port, *server_socket_addr)
.await;

Ok(announce_data)
Expand Down Expand Up @@ -137,11 +138,11 @@ impl AnnounceService {
}
}

async fn send_stats_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option<u16>, server_socket_addr: SocketAddr) {
async fn send_event(&self, peer_ip: IpAddr, opt_peer_ip_port: Option<u16>, server_socket_addr: SocketAddr) {
if let Some(http_stats_event_sender) = self.opt_http_stats_event_sender.as_deref() {
http_stats_event_sender
.send_event(statistics::event::Event::TcpAnnounce {
connection: statistics::event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr),
.send_event(Event::TcpAnnounce {
connection: event::ConnectionContext::new(peer_ip, opt_peer_ip_port, server_socket_addr),
})
.await;
}
Expand Down Expand Up @@ -227,7 +228,7 @@ mod tests {
}

struct CoreHttpTrackerServices {
pub http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>>,
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
}

fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrackerServices) {
Expand Down Expand Up @@ -317,13 +318,14 @@ mod tests {
use mockall::mock;
use tokio::sync::broadcast::error::SendError;

use crate::statistics;
use crate::event::Event;
use crate::tests::sample_info_hash;
use crate::{event, statistics};

mock! {
HttpStatsEventSender {}
impl statistics::event::sender::Sender for HttpStatsEventSender {
fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option<Result<usize,SendError<statistics::event::Event> > > > ;
impl event::sender::Sender for HttpStatsEventSender {
fn send_event(&self, event: Event) -> BoxFuture<'static,Option<Result<usize,SendError<Event> > > > ;
}
}

Expand All @@ -340,13 +342,13 @@ mod tests {
use torrust_tracker_test_helpers::configuration;

use super::{sample_peer_using_ipv4, sample_peer_using_ipv6};
use crate::event;
use crate::event::{ConnectionContext, Event};
use crate::services::announce::tests::{
initialize_core_tracker_services, initialize_core_tracker_services_with_config, sample_announce_request_for_peer,
sample_peer, MockHttpStatsEventSender,
};
use crate::services::announce::AnnounceService;
use crate::statistics;
use crate::statistics::event::ConnectionContext;

#[tokio::test]
async fn it_should_return_the_announce_data() {
Expand Down Expand Up @@ -391,12 +393,12 @@ mod tests {
let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new();
http_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::TcpAnnounce {
.with(eq(Event::TcpAnnounce {
connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), Some(8080), server_socket_addr),
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
let http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>> =
Arc::new(Some(Box::new(http_stats_event_sender_mock)));

let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services();
Expand Down Expand Up @@ -447,12 +449,12 @@ mod tests {
let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new();
http_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::TcpAnnounce {
.with(eq(Event::TcpAnnounce {
connection: ConnectionContext::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), Some(8080), server_socket_addr),
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
let http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>> =
Arc::new(Some(Box::new(http_stats_event_sender_mock)));

let (core_tracker_services, mut core_http_tracker_services) =
Expand Down Expand Up @@ -486,7 +488,7 @@ mod tests {
let mut http_stats_event_sender_mock = MockHttpStatsEventSender::new();
http_stats_event_sender_mock
.expect_send_event()
.with(eq(statistics::event::Event::TcpAnnounce {
.with(eq(Event::TcpAnnounce {
connection: ConnectionContext::new(
IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969)),
Some(8080),
Expand All @@ -495,7 +497,7 @@ mod tests {
}))
.times(1)
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
let http_stats_event_sender: Arc<Option<Box<dyn statistics::event::sender::Sender>>> =
let http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>> =
Arc::new(Some(Box::new(http_stats_event_sender_mock)));

let (core_tracker_services, mut core_http_tracker_services) = initialize_core_tracker_services();
Expand Down
Loading
Loading