Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a root span to the client #1447

Merged
merged 4 commits into from
Feb 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion crates/matrix-sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -29,6 +29,11 @@ use ruma::{
use thiserror::Error;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::OnceCell;
use tracing::{
debug,
field::{self, debug},
instrument, span, Level, Span,
};
use url::Url;

use super::{Client, ClientInner};
@@ -88,10 +93,19 @@ pub struct ClientBuilder {
appservice_mode: bool,
server_versions: Option<Box<[MatrixVersion]>>,
handle_refresh_tokens: bool,
root_span: Span,
}

impl ClientBuilder {
pub(crate) fn new() -> Self {
let root_span = span!(
Level::INFO,
"matrix-sdk",
user_id = field::Empty,
device_id = field::Empty,
ed25519_key = field::Empty
);

Self {
homeserver_cfg: None,
http_cfg: None,
@@ -101,6 +115,7 @@ impl ClientBuilder {
appservice_mode: false,
server_versions: None,
handle_refresh_tokens: false,
root_span,
}
}

@@ -326,8 +341,12 @@ impl ClientBuilder {
/// server discovery request is made which can fail; if you didn't set
/// [`server_versions(false)`][Self::server_versions], that amounts to
/// another request that can fail
#[instrument(skip_all, parent = &self.root_span, target = "matrix_sdk::client", fields(homeserver))]
pub async fn build(self) -> Result<Client, ClientBuildError> {
debug!("Starting to build the Client");

let homeserver_cfg = self.homeserver_cfg.ok_or(ClientBuildError::MissingHomeserver)?;
Span::current().record("homeserver", debug(&homeserver_cfg));

let inner_http_client = match self.http_cfg.unwrap_or_default() {
#[allow(unused_mut)]
@@ -364,6 +383,8 @@ impl ClientBuilder {
let homeserver = match homeserver_cfg {
HomeserverConfig::Url(url) => url,
HomeserverConfig::ServerName(server_name) => {
debug!("Trying to discover the homeserver");

let homeserver = homeserver_from_name(&server_name);
let well_known = http_client
.send(
@@ -387,6 +408,7 @@ impl ClientBuilder {
if let Some(proxy) = well_known.sliding_sync_proxy.map(|p| p.url) {
sliding_sync_proxy = Url::parse(&proxy).ok();
}
debug!(homserver_url = well_known.homeserver.base_url, "Discovered the homeserver");

well_known.homeserver.base_url
}
@@ -421,7 +443,15 @@ impl ClientBuilder {
refresh_token_lock: Mutex::new(Ok(())),
});

Ok(Client { inner })
debug!("Done building the Client");

// We drop the root span here so it gets pushed to the subscribers, i.e. it gets
// only uploaded to a OpenTelemetry collector if the span gets dropped.
// We still want it around so other methods that get called by this
// client instance are connected to it, so we clone.
drop(self.root_span.clone());

Ok(Client { inner, root_span: self.root_span })
}
}

9 changes: 8 additions & 1 deletion crates/matrix-sdk/src/client/login_builder.rs
Original file line number Diff line number Diff line change
@@ -142,6 +142,7 @@ impl LoginBuilder {
/// Instead of calling this function and `.await`ing its return value, you
/// can also `.await` the `LoginBuilder` directly.
#[instrument(
parent = &self.client.root_span,
target = "matrix_sdk::client",
name = "login",
skip_all,
@@ -278,7 +279,13 @@ where
///
/// Instead of calling this function and `.await`ing its return value, you
/// can also `.await` the `SsoLoginBuilder` directly.
#[instrument(target = "matrix_sdk::client", name = "login", skip_all, fields(method = "sso"))]
#[instrument(
parent = &self.client.root_span,
target = "matrix_sdk::client",
name = "login",
skip_all,
fields(method = "sso"),
)]
pub async fn send(self) -> Result<login::v3::Response> {
use std::{
convert::Infallible,
47 changes: 40 additions & 7 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ use serde::de::DeserializeOwned;
use tokio::sync::OnceCell;
#[cfg(feature = "e2e-encryption")]
use tracing::error;
use tracing::{debug, info, instrument};
use tracing::{debug, field::display, info, instrument, trace, Instrument, Span};
use url::Url;

#[cfg(feature = "e2e-encryption")]
@@ -128,6 +128,7 @@ pub enum LoopCtrl {
#[derive(Clone)]
pub struct Client {
pub(crate) inner: Arc<ClientInner>,
pub(crate) root_span: Span,
}

pub(crate) struct ClientInner {
@@ -1152,6 +1153,15 @@ impl Client {
}
}

self.root_span
.record("user_id", display(&response.user_id))
.record("device_id", display(&response.device_id));

#[cfg(feature = "e2e-encryption")]
if let Some(key) = self.encryption().ed25519_key().await {
self.root_span.record("ed25519_key", key);
}

self.inner.base_client.receive_login_response(response).await?;

Ok(())
@@ -1216,10 +1226,27 @@ impl Client {
/// ```
///
/// [`login`]: #method.login
#[instrument(skip_all, parent = &self.root_span)]
pub async fn restore_session(&self, session: Session) -> Result<()> {
debug!("Restoring session");

let (meta, tokens) = session.into_parts();

self.root_span
.record("user_id", display(&meta.user_id))
.record("device_id", display(&meta.device_id));

self.base_client().set_session_tokens(tokens);
Ok(self.base_client().set_session_meta(meta).await?)
self.base_client().set_session_meta(meta).await?;

#[cfg(feature = "e2e-encryption")]
if let Some(key) = self.encryption().ed25519_key().await {
self.root_span.record("ed25519_key", key);
}

debug!("Done restoring session");

Ok(())
}

/// Refresh the access token.
@@ -1401,7 +1428,7 @@ impl Client {
/// client.register(request).await;
/// # })
/// ```
#[instrument(skip_all)]
#[instrument(skip_all, parent = &self.root_span)]
pub async fn register(
&self,
request: register::v3::Request,
@@ -1464,7 +1491,7 @@ impl Client {
///
/// let response = client.sync_once(sync_settings).await.unwrap();
/// # });
#[instrument(skip(self, definition))]
#[instrument(skip(self, definition), parent = &self.root_span)]
pub async fn get_or_upload_filter(
&self,
filter_name: &str,
@@ -2157,7 +2184,7 @@ impl Client {
/// .await;
/// })
/// ```
#[instrument(skip(self, callback))]
#[instrument(skip_all, parent = &self.root_span)]
pub async fn sync_with_callback<C>(
Comment on lines +2187 to 2188
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a method that gets called from other sync methods. Using parent = here will make that relation invisible in logs, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called from the sync() method, which doesn't create a span itself.

&self,
sync_settings: crate::config::SyncSettings,
@@ -2254,11 +2281,15 @@ impl Client {
}

loop {
trace!("Syncing");
let result = self.sync_loop_helper(&mut sync_settings).await;

trace!("Running callback");
if callback(result).await? == LoopCtrl::Break {
trace!("Callback told us to stop");
break;
}
trace!("Done running callback");

Client::delay_sync(&mut last_sync_time).await
}
@@ -2308,7 +2339,7 @@ impl Client {
///
/// # anyhow::Ok(()) });
/// ```
#[instrument(skip(self))]
#[instrument(skip(self), parent = &self.root_span)]
pub async fn sync_stream(
&self,
mut sync_settings: crate::config::SyncSettings,
@@ -2319,9 +2350,11 @@ impl Client {
sync_settings.token = self.sync_token().await;
}

let parent_span = Span::current();

async_stream::stream! {
loop {
yield self.sync_loop_helper(&mut sync_settings).await;
yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;

Client::delay_sync(&mut last_sync_time).await
}
59 changes: 39 additions & 20 deletions crates/matrix-sdk/src/room/joined.rs
Original file line number Diff line number Diff line change
@@ -31,9 +31,7 @@ use ruma::{
EventId, OwnedTransactionId, TransactionId, UserId,
};
use serde_json::Value;
use tracing::debug;
#[cfg(feature = "e2e-encryption")]
use tracing::instrument;
use tracing::{debug, instrument};

use super::Left;
use crate::{
@@ -84,6 +82,7 @@ impl Joined {
}

/// Leave this room.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn leave(&self) -> Result<Left> {
self.inner.leave().await
}
@@ -95,6 +94,7 @@ impl Joined {
/// * `user_id` - The user to ban with `UserId`.
///
/// * `reason` - The reason for banning this user.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
let request = assign!(
ban_user::v3::Request::new(self.inner.room_id().to_owned(), user_id.to_owned()),
@@ -112,6 +112,7 @@ impl Joined {
/// room.
///
/// * `reason` - Optional reason why the room member is being kicked out.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
let request = assign!(
kick_user::v3::Request::new(self.inner.room_id().to_owned(), user_id.to_owned()),
@@ -126,6 +127,7 @@ impl Joined {
/// # Arguments
///
/// * `user_id` - The `UserId` of the user to invite to the room.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };

@@ -140,6 +142,7 @@ impl Joined {
/// # Arguments
///
/// * `invite_id` - A third party id of a user to invite to the room.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
let recipient = InvitationRecipient::ThirdPartyId(invite_id);
let request = invite_user::v3::Request::new(self.inner.room_id().to_owned(), recipient);
@@ -207,28 +210,36 @@ impl Joined {
};

if send {
let typing = if typing {
self.client
.inner
.typing_notice_times
.insert(self.inner.room_id().to_owned(), Instant::now());
Typing::Yes(TYPING_NOTICE_TIMEOUT)
} else {
self.client.inner.typing_notice_times.remove(self.inner.room_id());
Typing::No
};

let request = TypingRequest::new(
self.inner.own_user_id().to_owned(),
self.inner.room_id().to_owned(),
typing,
);
self.client.send(request, None).await?;
self.send_typing_notice(typing).await?;
}

Ok(())
}

#[instrument(name = "typing_notice", skip(self), parent = &self.client.root_span)]
async fn send_typing_notice(&self, typing: bool) -> Result<()> {
let typing = if typing {
self.client
.inner
.typing_notice_times
.insert(self.inner.room_id().to_owned(), Instant::now());
Typing::Yes(TYPING_NOTICE_TIMEOUT)
} else {
self.client.inner.typing_notice_times.remove(self.inner.room_id());
Typing::No
};

let request = TypingRequest::new(
self.inner.own_user_id().to_owned(),
self.inner.room_id().to_owned(),
typing,
);

self.client.send(request, None).await?;

Ok(())
}

/// Send a request to set a read receipt, notifying this room that the user
/// has read a specific event and *some* - but maybe not all - events before
/// it.
@@ -240,6 +251,7 @@ impl Joined {
///
/// * `event_id` - The `EventId` specifies the event to set the read receipt
/// on.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn read_receipt(&self, event_id: &EventId) -> Result<()> {
let request = create_receipt::v3::Request::new(
self.inner.room_id().to_owned(),
@@ -263,6 +275,7 @@ impl Joined {
///
/// * read_receipt - An `EventId` to specify the event to set the read
/// receipt on.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn read_marker(
&self,
fully_read: &EventId,
@@ -308,6 +321,7 @@ impl Joined {
/// }
/// # anyhow::Ok(()) });
/// ```
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn enable_encryption(&self) -> Result<()> {
use ruma::{
events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
@@ -408,6 +422,7 @@ impl Joined {
/// Warning: This waits until a sync happens and does not return if no sync
/// is happening! It can also return early when the room is not a joined
/// room anymore!
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn sync_up(&self) {
while !self.is_synced() && self.room_type() == RoomType::Joined {
self.client.inner.sync_beat.listen().wait_timeout(Duration::from_secs(1));
@@ -678,6 +693,7 @@ impl Joined {
/// }
/// # anyhow::Ok(()) });
/// ```
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn send_attachment(
&self,
body: &str,
@@ -847,6 +863,7 @@ impl Joined {
/// joined_room.send_state_event(content).await?;
/// # anyhow::Ok(()) };
/// ```
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn send_state_event(
&self,
content: impl StateEventContent<StateKey = EmptyStateKey>,
@@ -946,6 +963,7 @@ impl Joined {
/// }
/// # anyhow::Ok(()) });
/// ```
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn send_state_event_raw(
&self,
content: Value,
@@ -996,6 +1014,7 @@ impl Joined {
/// }
/// # anyhow::Ok(()) });
/// ```
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn redact(
&self,
event_id: &EventId,
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/room/timeline/mod.rs
Original file line number Diff line number Diff line change
@@ -277,7 +277,7 @@ impl Timeline {
///
/// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned
/// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent
#[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
#[instrument(skip(self, content), parent = &self.inner.room().client.root_span, fields(room_id = ?self.room().room_id()))]
pub async fn send(&self, content: AnyMessageLikeEventContent, txn_id: Option<&TransactionId>) {
let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned);
self.inner.handle_local_event(txn_id.clone(), content.clone()).await;
27 changes: 18 additions & 9 deletions crates/matrix-sdk/src/sliding_sync.rs
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ use ruma::{
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{debug, error, info_span, instrument, trace, warn, Instrument, Span};
use url::Url;

#[cfg(feature = "experimental-timeline")]
@@ -261,6 +261,7 @@ impl SlidingSyncRoom {
///
/// Use `Timeline::latest_event` instead if you already have a timeline for
/// this `SlidingSyncRoom`.
#[instrument(skip_all, parent = &self.client.root_span)]
pub async fn latest_event(&self) -> Option<EventTimelineItem> {
self.timeline_builder()?.build().await.latest_event()
}
@@ -1143,6 +1144,7 @@ impl SlidingSync {
/// Create the inner stream for the view.
///
/// Run this stream to receive new updates from the server.
#[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)]
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
let mut views = {
let mut views = BTreeMap::new();
@@ -1154,12 +1156,17 @@ impl SlidingSync {
};

debug!(?self.extensions, "Setting view stream going");
let stream_span = Span::current();

async_stream::stream! {
loop {
debug!(?self.extensions, "Sync loop running");
let sync_span = info_span!(parent: &stream_span, "sync_once");

match self.sync_once(&mut views).await {
sync_span.in_scope(|| {
debug!(?self.extensions, "Sync loop running");
});

match self.sync_once(&mut views).instrument(sync_span.clone()).await {
Ok(Some(updates)) => {
self.failure_count.store(0, Ordering::SeqCst);
yield Ok(updates)
@@ -1171,19 +1178,21 @@ impl SlidingSync {
if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// session expired, let's reset
if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 {
error!("session expired three times in a row");
sync_span.in_scope(|| error!("session expired three times in a row"));
yield Err(e.into());

break
}

warn!("Session expired. Restarting sliding sync.");
*self.pos.lock_mut() = None;
sync_span.in_scope(|| {
warn!("Session expired. Restarting sliding sync.");
*self.pos.lock_mut() = None;

// reset our extensions to the last known good ones.
*self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take();
// reset our extensions to the last known good ones.
*self.extensions.lock().unwrap() = self.sent_extensions.lock().unwrap().take();

debug!(?self.extensions, "Resetting view stream");
debug!(?self.extensions, "Resetting view stream");
});
}

yield Err(e.into());