diff --git a/crates/matrix-sdk/src/client/builder.rs b/crates/matrix-sdk/src/client/builder.rs index 6303641f130..bea2d937fbd 100644 --- a/crates/matrix-sdk/src/client/builder.rs +++ b/crates/matrix-sdk/src/client/builder.rs @@ -29,6 +29,11 @@ use ruma::{ use thiserror::Error; #[cfg(not(target_arch = "wasm32"))] use tokio::sync::OnceCell; +use tracing::{ + debug, + field::{self, debug}, + instrument, span, Level, Span, +}; use url::Url; use super::{Client, ClientInner}; @@ -88,10 +93,19 @@ pub struct ClientBuilder { appservice_mode: bool, server_versions: Option>, handle_refresh_tokens: bool, + root_span: Span, } impl ClientBuilder { pub(crate) fn new() -> Self { + let root_span = span!( + Level::INFO, + "matrix-sdk", + user_id = field::Empty, + device_id = field::Empty, + ed25519_key = field::Empty + ); + Self { homeserver_cfg: None, http_cfg: None, @@ -101,6 +115,7 @@ impl ClientBuilder { appservice_mode: false, server_versions: None, handle_refresh_tokens: false, + root_span, } } @@ -326,8 +341,12 @@ impl ClientBuilder { /// server discovery request is made which can fail; if you didn't set /// [`server_versions(false)`][Self::server_versions], that amounts to /// another request that can fail + #[instrument(skip_all, parent = &self.root_span, target = "matrix_sdk::client", fields(homeserver))] pub async fn build(self) -> Result { + debug!("Starting to build the Client"); + let homeserver_cfg = self.homeserver_cfg.ok_or(ClientBuildError::MissingHomeserver)?; + Span::current().record("homeserver", debug(&homeserver_cfg)); let inner_http_client = match self.http_cfg.unwrap_or_default() { #[allow(unused_mut)] @@ -364,6 +383,8 @@ impl ClientBuilder { let homeserver = match homeserver_cfg { HomeserverConfig::Url(url) => url, HomeserverConfig::ServerName(server_name) => { + debug!("Trying to discover the homeserver"); + let homeserver = homeserver_from_name(&server_name); let well_known = http_client .send( @@ -387,6 +408,7 @@ impl ClientBuilder { if let Some(proxy) = well_known.sliding_sync_proxy.map(|p| p.url) { sliding_sync_proxy = Url::parse(&proxy).ok(); } + debug!(homserver_url = well_known.homeserver.base_url, "Discovered the homeserver"); well_known.homeserver.base_url } @@ -421,7 +443,15 @@ impl ClientBuilder { refresh_token_lock: Mutex::new(Ok(())), }); - Ok(Client { inner }) + debug!("Done building the Client"); + + // We drop the root span here so it gets pushed to the subscribers, i.e. it gets + // only uploaded to a OpenTelemetry collector if the span gets dropped. + // We still want it around so other methods that get called by this + // client instance are connected to it, so we clone. + drop(self.root_span.clone()); + + Ok(Client { inner, root_span: self.root_span }) } } diff --git a/crates/matrix-sdk/src/client/login_builder.rs b/crates/matrix-sdk/src/client/login_builder.rs index 8bfc6130c91..efc4c602a28 100644 --- a/crates/matrix-sdk/src/client/login_builder.rs +++ b/crates/matrix-sdk/src/client/login_builder.rs @@ -142,6 +142,7 @@ impl LoginBuilder { /// Instead of calling this function and `.await`ing its return value, you /// can also `.await` the `LoginBuilder` directly. #[instrument( + parent = &self.client.root_span, target = "matrix_sdk::client", name = "login", skip_all, @@ -278,7 +279,13 @@ where /// /// Instead of calling this function and `.await`ing its return value, you /// can also `.await` the `SsoLoginBuilder` directly. - #[instrument(target = "matrix_sdk::client", name = "login", skip_all, fields(method = "sso"))] + #[instrument( + parent = &self.client.root_span, + target = "matrix_sdk::client", + name = "login", + skip_all, + fields(method = "sso"), + )] pub async fn send(self) -> Result { use std::{ convert::Infallible, diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index f0fc8b8d341..7d40f9e2720 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -69,7 +69,7 @@ use serde::de::DeserializeOwned; use tokio::sync::OnceCell; #[cfg(feature = "e2e-encryption")] use tracing::error; -use tracing::{debug, info, instrument}; +use tracing::{debug, field::display, info, instrument, trace, Instrument, Span}; use url::Url; #[cfg(feature = "e2e-encryption")] @@ -128,6 +128,7 @@ pub enum LoopCtrl { #[derive(Clone)] pub struct Client { pub(crate) inner: Arc, + pub(crate) root_span: Span, } pub(crate) struct ClientInner { @@ -1152,6 +1153,15 @@ impl Client { } } + self.root_span + .record("user_id", display(&response.user_id)) + .record("device_id", display(&response.device_id)); + + #[cfg(feature = "e2e-encryption")] + if let Some(key) = self.encryption().ed25519_key().await { + self.root_span.record("ed25519_key", key); + } + self.inner.base_client.receive_login_response(response).await?; Ok(()) @@ -1216,10 +1226,27 @@ impl Client { /// ``` /// /// [`login`]: #method.login + #[instrument(skip_all, parent = &self.root_span)] pub async fn restore_session(&self, session: Session) -> Result<()> { + debug!("Restoring session"); + let (meta, tokens) = session.into_parts(); + + self.root_span + .record("user_id", display(&meta.user_id)) + .record("device_id", display(&meta.device_id)); + self.base_client().set_session_tokens(tokens); - Ok(self.base_client().set_session_meta(meta).await?) + self.base_client().set_session_meta(meta).await?; + + #[cfg(feature = "e2e-encryption")] + if let Some(key) = self.encryption().ed25519_key().await { + self.root_span.record("ed25519_key", key); + } + + debug!("Done restoring session"); + + Ok(()) } /// Refresh the access token. @@ -1401,7 +1428,7 @@ impl Client { /// client.register(request).await; /// # }) /// ``` - #[instrument(skip_all)] + #[instrument(skip_all, parent = &self.root_span)] pub async fn register( &self, request: register::v3::Request, @@ -1464,7 +1491,7 @@ impl Client { /// /// let response = client.sync_once(sync_settings).await.unwrap(); /// # }); - #[instrument(skip(self, definition))] + #[instrument(skip(self, definition), parent = &self.root_span)] pub async fn get_or_upload_filter( &self, filter_name: &str, @@ -2157,7 +2184,7 @@ impl Client { /// .await; /// }) /// ``` - #[instrument(skip(self, callback))] + #[instrument(skip_all, parent = &self.root_span)] pub async fn sync_with_callback( &self, sync_settings: crate::config::SyncSettings, @@ -2254,11 +2281,15 @@ impl Client { } loop { + trace!("Syncing"); let result = self.sync_loop_helper(&mut sync_settings).await; + trace!("Running callback"); if callback(result).await? == LoopCtrl::Break { + trace!("Callback told us to stop"); break; } + trace!("Done running callback"); Client::delay_sync(&mut last_sync_time).await } @@ -2308,7 +2339,7 @@ impl Client { /// /// # anyhow::Ok(()) }); /// ``` - #[instrument(skip(self))] + #[instrument(skip(self), parent = &self.root_span)] pub async fn sync_stream( &self, mut sync_settings: crate::config::SyncSettings, @@ -2319,9 +2350,11 @@ impl Client { sync_settings.token = self.sync_token().await; } + let parent_span = Span::current(); + async_stream::stream! { loop { - yield self.sync_loop_helper(&mut sync_settings).await; + yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await; Client::delay_sync(&mut last_sync_time).await } diff --git a/crates/matrix-sdk/src/room/joined.rs b/crates/matrix-sdk/src/room/joined.rs index 3f6ca8dea18..869e76c7c10 100644 --- a/crates/matrix-sdk/src/room/joined.rs +++ b/crates/matrix-sdk/src/room/joined.rs @@ -31,9 +31,7 @@ use ruma::{ EventId, OwnedTransactionId, TransactionId, UserId, }; use serde_json::Value; -use tracing::debug; -#[cfg(feature = "e2e-encryption")] -use tracing::instrument; +use tracing::{debug, instrument}; use super::Left; use crate::{ @@ -84,6 +82,7 @@ impl Joined { } /// Leave this room. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn leave(&self) -> Result { self.inner.leave().await } @@ -95,6 +94,7 @@ impl Joined { /// * `user_id` - The user to ban with `UserId`. /// /// * `reason` - The reason for banning this user. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> { let request = assign!( ban_user::v3::Request::new(self.inner.room_id().to_owned(), user_id.to_owned()), @@ -112,6 +112,7 @@ impl Joined { /// room. /// /// * `reason` - Optional reason why the room member is being kicked out. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> { let request = assign!( kick_user::v3::Request::new(self.inner.room_id().to_owned(), user_id.to_owned()), @@ -126,6 +127,7 @@ impl Joined { /// # Arguments /// /// * `user_id` - The `UserId` of the user to invite to the room. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> { let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() }; @@ -140,6 +142,7 @@ impl Joined { /// # Arguments /// /// * `invite_id` - A third party id of a user to invite to the room. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> { let recipient = InvitationRecipient::ThirdPartyId(invite_id); let request = invite_user::v3::Request::new(self.inner.room_id().to_owned(), recipient); @@ -207,28 +210,36 @@ impl Joined { }; if send { - let typing = if typing { - self.client - .inner - .typing_notice_times - .insert(self.inner.room_id().to_owned(), Instant::now()); - Typing::Yes(TYPING_NOTICE_TIMEOUT) - } else { - self.client.inner.typing_notice_times.remove(self.inner.room_id()); - Typing::No - }; - - let request = TypingRequest::new( - self.inner.own_user_id().to_owned(), - self.inner.room_id().to_owned(), - typing, - ); - self.client.send(request, None).await?; + self.send_typing_notice(typing).await?; } Ok(()) } + #[instrument(name = "typing_notice", skip(self), parent = &self.client.root_span)] + async fn send_typing_notice(&self, typing: bool) -> Result<()> { + let typing = if typing { + self.client + .inner + .typing_notice_times + .insert(self.inner.room_id().to_owned(), Instant::now()); + Typing::Yes(TYPING_NOTICE_TIMEOUT) + } else { + self.client.inner.typing_notice_times.remove(self.inner.room_id()); + Typing::No + }; + + let request = TypingRequest::new( + self.inner.own_user_id().to_owned(), + self.inner.room_id().to_owned(), + typing, + ); + + self.client.send(request, None).await?; + + Ok(()) + } + /// Send a request to set a read receipt, notifying this room that the user /// has read a specific event and *some* - but maybe not all - events before /// it. @@ -240,6 +251,7 @@ impl Joined { /// /// * `event_id` - The `EventId` specifies the event to set the read receipt /// on. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn read_receipt(&self, event_id: &EventId) -> Result<()> { let request = create_receipt::v3::Request::new( self.inner.room_id().to_owned(), @@ -263,6 +275,7 @@ impl Joined { /// /// * read_receipt - An `EventId` to specify the event to set the read /// receipt on. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn read_marker( &self, fully_read: &EventId, @@ -308,6 +321,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn enable_encryption(&self) -> Result<()> { use ruma::{ events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm, @@ -408,6 +422,7 @@ impl Joined { /// Warning: This waits until a sync happens and does not return if no sync /// is happening! It can also return early when the room is not a joined /// room anymore! + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn sync_up(&self) { while !self.is_synced() && self.room_type() == RoomType::Joined { self.client.inner.sync_beat.listen().wait_timeout(Duration::from_secs(1)); @@ -678,6 +693,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn send_attachment( &self, body: &str, @@ -847,6 +863,7 @@ impl Joined { /// joined_room.send_state_event(content).await?; /// # anyhow::Ok(()) }; /// ``` + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn send_state_event( &self, content: impl StateEventContent, @@ -946,6 +963,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn send_state_event_raw( &self, content: Value, @@ -996,6 +1014,7 @@ impl Joined { /// } /// # anyhow::Ok(()) }); /// ``` + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn redact( &self, event_id: &EventId, diff --git a/crates/matrix-sdk/src/room/timeline/mod.rs b/crates/matrix-sdk/src/room/timeline/mod.rs index 672a137efa0..fac0ed6f645 100644 --- a/crates/matrix-sdk/src/room/timeline/mod.rs +++ b/crates/matrix-sdk/src/room/timeline/mod.rs @@ -277,7 +277,7 @@ impl Timeline { /// /// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned /// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent - #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))] + #[instrument(skip(self, content), parent = &self.inner.room().client.root_span, fields(room_id = ?self.room().room_id()))] pub async fn send(&self, content: AnyMessageLikeEventContent, txn_id: Option<&TransactionId>) { let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned); self.inner.handle_local_event(txn_id.clone(), content.clone()).await; diff --git a/crates/matrix-sdk/src/sliding_sync.rs b/crates/matrix-sdk/src/sliding_sync.rs index ad032d68f5f..ca05386d96b 100644 --- a/crates/matrix-sdk/src/sliding_sync.rs +++ b/crates/matrix-sdk/src/sliding_sync.rs @@ -49,7 +49,7 @@ use ruma::{ }; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tracing::{debug, error, instrument, trace, warn}; +use tracing::{debug, error, info_span, instrument, trace, warn, Instrument, Span}; use url::Url; #[cfg(feature = "experimental-timeline")] @@ -261,6 +261,7 @@ impl SlidingSyncRoom { /// /// Use `Timeline::latest_event` instead if you already have a timeline for /// this `SlidingSyncRoom`. + #[instrument(skip_all, parent = &self.client.root_span)] pub async fn latest_event(&self) -> Option { self.timeline_builder()?.build().await.latest_event() } @@ -1143,6 +1144,7 @@ impl SlidingSync { /// Create the inner stream for the view. /// /// Run this stream to receive new updates from the server. + #[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)] pub fn stream(&self) -> impl Stream> + '_ { let mut views = { let mut views = BTreeMap::new(); @@ -1154,12 +1156,17 @@ impl SlidingSync { }; debug!(?self.extensions, "Setting view stream going"); + let stream_span = Span::current(); async_stream::stream! { loop { - debug!(?self.extensions, "Sync loop running"); + let sync_span = info_span!(parent: &stream_span, "sync_once"); - match self.sync_once(&mut views).await { + sync_span.in_scope(|| { + debug!(?self.extensions, "Sync loop running"); + }); + + match self.sync_once(&mut views).instrument(sync_span.clone()).await { Ok(Some(updates)) => { self.failure_count.store(0, Ordering::SeqCst); yield Ok(updates) @@ -1171,19 +1178,21 @@ impl SlidingSync { if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) { // session expired, let's reset if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 { - error!("session expired three times in a row"); + sync_span.in_scope(|| error!("session expired three times in a row")); yield Err(e.into()); break } - warn!("Session expired. Restarting sliding sync."); - *self.pos.lock_mut() = None; + sync_span.in_scope(|| { + warn!("Session expired. Restarting sliding sync."); + *self.pos.lock_mut() = None; - // reset our extensions to the last known good ones. - *self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take(); + // reset our extensions to the last known good ones. + *self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take(); - debug!(?self.extensions, "Resetting view stream"); + debug!(?self.extensions, "Resetting view stream"); + }); } yield Err(e.into());