Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul stats: Emit events from the torrent-repository package #1358

Open
2 tasks
josecelano opened this issue Mar 5, 2025 · 0 comments
Open
2 tasks

Overhaul stats: Emit events from the torrent-repository package #1358

josecelano opened this issue Mar 5, 2025 · 0 comments
Assignees
Labels
- Developer - Torrust Improvement Experience Code Cleanup / Refactoring Tidying and Making Neat Enhancement / Feature Request Something New

Comments

@josecelano
Copy link
Member

josecelano commented Mar 5, 2025

Context

I've been working on this issue #1264 recently.

The issue was a bug with the number of downloads for a given torrent in stats.

If you enable it in the configuration the number of downloads for all torrents is persisted into the database.

The tracker must:

  • Update the value in the database every time the counter is increased.
  • Load the value from the database every time a torrent is added to the torrent repository (active torrent) because a peer is announcing that torrent.

Torrent are removed from the in-memory torrent repository when:

  • The tracker is restarted.
  • The torrent is removed because it does not have any peer (peerless torrent). You can enable a cronjob to clean peerless torrents periodically.

Problem

Internally all torrent repository implementations keep torrent metrics like:

  • complete: number of seeders.
  • incomplete: number of leechers.
  • downloaded: the number of peers that have ever completed downloading.

The following is the one we are using in production:

  • I've omitted some parts
  • We keep the other implementation only to document discarded implementation and to be able to run benchmark again with newer version.
pub struct CrossbeamSkipList<T> {
    pub torrents: SkipMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for CrossbeamSkipList<EntryMutexStd>
where
    EntryMutexStd: EntrySync,
    EntrySingle: Entry,
{
    fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option<PersistentTorrent>) -> bool {
        if let Some(existing_entry) = self.torrents.get(info_hash) {
            existing_entry.value().upsert_peer(peer)
        } else {
            let new_entry = if let Some(number_of_downloads) = opt_persistent_torrent {
                EntryMutexStd::new(
                    EntrySingle {
                        swarm: PeerList::default(),
                        downloaded: number_of_downloads,
                    }
                    .into(),
                )
            } else {
                EntryMutexStd::default()
            };

            let inserted_entry = self.torrents.get_or_insert(*info_hash, new_entry);

            inserted_entry.value().upsert_peer(peer)
        }
    }

    // ...

    fn get_metrics(&self) -> TorrentsMetrics {
        let mut metrics = TorrentsMetrics::default();

        for entry in &self.torrents {
            let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata();
            metrics.complete += u64::from(stats.complete);
            metrics.downloaded += u64::from(stats.downloaded);
            metrics.incomplete += u64::from(stats.incomplete);
            metrics.torrents += 1;
        }

        metrics
    }

    // ...

    fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
        for entry in &self.torrents {
            entry.value().remove_inactive_peers(current_cutoff);
        }
    }

    fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
        for entry in &self.torrents {
            if entry.value().meets_retaining_policy(policy) {
                continue;
            }

            entry.remove();
        }
    }
}

As you can see the upsert_peer method return a value that indicates if the number of downloads was increased.

That boolean returned value is used by the tracker core announce handler to decide whether the downloads counter has to be increased or not.

pub async fn announce(
    &self,
    info_hash: &InfoHash,
    peer: &mut peer::Peer,
    remote_client_ip: &IpAddr,
    peers_wanted: &PeersWanted,
) -> Result<AnnounceData, AnnounceError> {
    self.whitelist_authorization.authorize(info_hash).await?;

    let opt_persistent_torrent = if self.config.tracker_policy.persistent_torrent_completed_stat {
        self.db_torrent_repository.load(info_hash)?
    } else {
        None
    };

    peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.config.net.external_ip));

    let number_of_downloads_increased =
        self.in_memory_torrent_repository
            .upsert_peer(info_hash, peer, opt_persistent_torrent);

    if self.config.tracker_policy.persistent_torrent_completed_stat && number_of_downloads_increased {
        self.db_torrent_repository.increase_number_of_downloads(info_hash)?;
    }

    Ok(self.build_announce_data(info_hash, peer, peers_wanted))
}

From my point of view there is an smell there. Ideally the upsert_peer command should not return anything (Command–query separation principle).

However, we need to communicate the the upper layers that something happened. What happened is a peer announce that completed a download (after announcing it had started).

Solution add domain events to the torrent-repository package

Instead of communicate with upper layers via the returning value we could emit an event (focused on the type of the announce event) from the torrent-repository:

pub enum AnnounceEvent {
    AnnounceEvent::Started { info_hash: InfoHash, peer: Peer},
    AnnounceEvent::Stopped { info_hash: InfoHash, peer: Peer},
    AnnounceEvent::Complete { info_hash: InfoHash, peer: Peer, number_of_downloads_increased: bool },
    AnnounceEvent::None { info_hash: InfoHash, peer: Peer},
}

Other alternative for events (focused on changes on the peer list) could be:

// or PeerListEvent
pub enum TorrentEvent {
    PeerAdded {
        info_hash: InfoHash,
        peer: Peer,
        is_seeder: bool,
    },
    PeerRemoved {
        info_hash: InfoHash,
        peer_id: PeerId,
    },
    DownloadCompleted {
        info_hash: InfoHash,
        peer: Peer,
        total_downloads: u64,
    },
}

Maybe we can emit more than one type.

NOTICE: the event includes the peer data that can be useful for listeners (in-process listeners or external services, like the Index, if we emit the event as an integration event in the future).

The event is the same we have at the delivery layer but with different information.

We can trigger the event in the Entry::upsert_peer or Repository<EntryRwLockParkingLot>::upsert_peer. I would do it in the repo.

The Entry::upsert_peer can return the event instead of the boolean that is returning currently:

fn upsert_peer(&mut self, peer: &peer::Peer) -> bool {
    let mut number_of_downloads_increased: bool = false;

    match peer::ReadInfo::get_event(peer) {
        AnnounceEvent::Stopped => {
            drop(self.swarm.remove(&peer::ReadInfo::get_id(peer)));
        }
        AnnounceEvent::Completed => {
            let previous = self.swarm.upsert(Arc::new(*peer));
            // Don't count if peer was not previously known and not already completed.
            if previous.is_some_and(|p| p.event != AnnounceEvent::Completed) {
                self.downloaded += 1;
                number_of_downloads_increased = true;
            }
        }
        _ => {
            // `Started` event (first announced event) or
            // `None` event (announcements done at regular intervals).
            drop(self.swarm.upsert(Arc::new(*peer)));
        }
    }

    number_of_downloads_increased
}

The tracker core could listen to these events and update the downloads counter in the database.

Pros:

  • We would remove the logic to persist the downloads counter from the tracker core AnnounceHandler.
  • This events could be used by other parts of the application.
  • This events could also be exposed to third-party system via the new GraphQL API.

Secondary refactors enabled by this feature

  1. Simplify tracker core AnnounceHandler

The new tracker core AnnounceHandler would be simpler and be something like:

pub async fn announce(
    &self,
    info_hash: &InfoHash,
    peer: &mut peer::Peer,
    remote_client_ip: &IpAddr,
    peers_wanted: &PeersWanted,
) -> Result<AnnounceData, AnnounceError> {
    self.whitelist_authorization.authorize(info_hash).await?;

    peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.config.net.external_ip));

    self.in_memory_torrent_repository.upsert_peer(info_hash, peer);

    Ok(self.build_announce_data(info_hash, peer, peers_wanted))
}
  1. Simplify the torrent repository

The torrent repo keeps tracks of these metrics:

pub struct TorrentsMetrics {
    /// Total number of seeders for all torrents
    pub complete: u64,

    /// Total number of peers that have ever completed downloading for all torrents.
    pub downloaded: u64,

    /// Total number of leechers for all torrents.
    pub incomplete: u64,

    /// Total number of torrents.
    pub torrents: u64,
}

Notice the number of downloads value is used in the API:

{
  "torrents": 402147,
  "seeders": 151234,
  "completed": 106151, <- this value
  "leechers": 269884,
  "tcp4_connections_handled": 34,
  "tcp4_announces_handled": 34,
  "tcp4_scrapes_handled": 0,
  "tcp6_connections_handled": 0,
  "tcp6_announces_handled": 0,
  "tcp6_scrapes_handled": 0,
  "udp_requests_aborted": 893999,
  "udp_requests_banned": 44571419,
  "udp_banned_ips_total": 22819,
  "udp_avg_connect_processing_time_ns": 572207,
  "udp_avg_announce_processing_time_ns": 10269606,
  "udp_avg_scrape_processing_time_ns": 413114,
  "udp4_requests": 493144699,
  "udp4_connections_handled": 163143209,
  "udp4_announces_handled": 266286675,
  "udp4_scrapes_handled": 6344999,
  "udp4_responses": 448572272,
  "udp4_errors_handled": 12738499,
  "udp6_requests": 0,
  "udp6_connections_handled": 0,
  "udp6_announces_handled": 0,
  "udp6_scrapes_handled": 0,
  "udp6_responses": 0,
  "udp6_errors_handled": 0
}

We can change it to avoid loading the data from the database (when persistence is enable), meaning we have to revert this PR.

  • The torrents repo is only responsible for the counter since the torrents was added. The downloaded metrics would count only downloads since the torrents was first announced.
  • We do not need to inject the initial counter value from the database.

The API can take this value

  1. Directly from the persistent torrent repository (query the database when is needed) when persistence is enabled.
  2. From the in-memory torrent repo when is not enabled.

I suggested this here.

IMPORTANT: it would track not the total number of downloads since the tracker started but since the torrent was added to the repo the last time (torrents can be added and removed when they don't have peers).

How to enable torrent stats persistence

The option persistent_torrent_completed_stat must be true.

For example, you can force that option overriding it when you run the tracker.

TORRUST_TRACKER_CONFIG_OVERRIDE_CORE__TRACKER_POLICY__PERSISTENT_TORRENT_COMPLETED_STAT=true cargo run

And this is where the config option is located (path) in the configuration.

{
  "metadata": {
    "app": "torrust-tracker",
    "purpose": "configuration",
    "schema_version": "2.0.0"
  },
  "logging": {
    "threshold": "info"
  },
  "core": {
    "announce_policy": {
      "interval": 120,
      "interval_min": 120
    },
    "database": {
      "driver": "sqlite3",
      "path": "./storage/tracker/lib/database/sqlite3.db"
    },
    "inactive_peer_cleanup_interval": 600,
    "listed": false,
    "net": {
      "external_ip": "0.0.0.0",
      "on_reverse_proxy": false
    },
    "private": false,
    "private_mode": null,
    "tracker_policy": {
      "max_peer_timeout": 900,
      "persistent_torrent_completed_stat": true, <- this
      "remove_peerless_torrents": true
    },
    "tracker_usage_statistics": true
  },
  "udp_trackers": [
    {
      "bind_address": "0.0.0.0:6969",
      "cookie_lifetime": {
        "secs": 120,
        "nanos": 0
      }
    }
  ],
  "http_trackers": [
    {
      "bind_address": "0.0.0.0:7070",
      "tsl_config": null
    }
  ],
  "http_api": {
    "bind_address": "0.0.0.0:1212",
    "tsl_config": null,
    "access_tokens": {
      "admin": "***"
    }
  },
  "health_check_api": {
    "bind_address": "127.0.0.1:1313"
  }
}

TODO

If this issue is implemented, open new issues for the suggested secondary refactors:

  • Open a new issue to simplify tracker core AnnounceHandler.
  • Open a new issue to simplify the torrent repository.

cc @da2ce7

@josecelano josecelano added - Developer - Torrust Improvement Experience Code Cleanup / Refactoring Tidying and Making Neat Enhancement / Feature Request Something New labels Mar 5, 2025
@josecelano josecelano changed the title Overhaul stats events: emit evetns from the torrent-repository package Overhaul stats events: emit events from the torrent-repository package Mar 5, 2025
@josecelano josecelano changed the title Overhaul stats events: emit events from the torrent-repository package Overhaul stats: emit events from the torrent-repository package Mar 12, 2025
@josecelano josecelano self-assigned this Mar 12, 2025
@josecelano josecelano changed the title Overhaul stats: emit events from the torrent-repository package Overhaul stats: Emit events from the torrent-repository package Mar 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
- Developer - Torrust Improvement Experience Code Cleanup / Refactoring Tidying and Making Neat Enhancement / Feature Request Something New
Projects
None yet
Development

No branches or pull requests

1 participant