Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the sliding sync stream method #1446

Merged
merged 2 commits into from
Feb 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 98 additions & 86 deletions crates/matrix-sdk/src/sliding_sync.rs
Original file line number Diff line number Diff line change
@@ -948,6 +948,90 @@ impl SlidingSync {
Ok(update)
}

async fn sync_once(
&self,
views: &mut BTreeMap<String, SlidingSyncViewRequestGenerator>,
) -> Result<Option<UpdateSummary>> {
let mut requests = BTreeMap::new();
let mut to_remove = Vec::new();

for (name, generator) in views.iter_mut() {
if let Some(request) = generator.next() {
requests.insert(name.clone(), request);
} else {
to_remove.push(name.clone());
}
}

for n in to_remove {
views.remove(&n);
}

if views.is_empty() {
return Ok(None);
}

let pos = self.pos.get_cloned();
let delta_token = self.delta_token.get_cloned();
let room_subscriptions = self.subscriptions.lock_ref().clone();
let unsubscribe_rooms = {
let unsubs = self.unsubscribe.lock_ref().to_vec();
if !unsubs.is_empty() {
self.unsubscribe.lock_mut().clear();
}
unsubs
};
let timeout = Duration::from_secs(30);

// implement stickiness by only sending extensions if they have
// changed since the last time we sent them
let extensions = {
let extensions = self.extensions.lock().unwrap();
if *extensions == *self.sent_extensions.lock().unwrap() {
None
} else {
extensions.clone()
}
};

let request = assign!(v4::Request::new(), {
lists: requests,
pos,
delta_token,
timeout: Some(timeout),
room_subscriptions,
unsubscribe_rooms,
extensions: extensions.clone().unwrap_or_default(),
});
debug!("requesting");

// 30s for the long poll + 30s for network delays
let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30));
let request = self.client.send_with_homeserver(
request,
Some(request_config),
self.homeserver.as_ref().map(ToString::to_string),
);

#[cfg(feature = "e2e-encryption")]
let response = {
let (e2ee_uploads, resp) =
futures_util::join!(self.client.send_outgoing_requests(), request);
if let Err(e) = e2ee_uploads {
error!(error = ?e, "Error while sending outgoing E2EE requests");
}
resp
}?;
#[cfg(not(feature = "e2e-encryption"))]
let response = request.await?;
debug!("received");

let updates = self.handle_response(response, extensions, views).await?;
debug!("handled");

Ok(Some(updates))
}

/// Create the inner stream for the view.
///
/// Run this stream to receive new updates from the server.
@@ -960,94 +1044,31 @@ impl SlidingSync {
}
views
};
let client = self.client.clone();

debug!(?self.extensions, "Setting view stream going");
async_stream::stream! {

async_stream::stream! {
loop {
debug!(?self.extensions, "Sync loop running");

let mut requests = BTreeMap::new();
let mut to_remove = Vec::new();

for (name, generator) in views.iter_mut() {
if let Some(request) = generator.next() {
requests.insert(name.clone(), request);
} else {
to_remove.push(name.clone());
}
}
for n in to_remove {
views.remove(&n);
}

if views.is_empty() {
return
}

let pos = self.pos.get_cloned();
let delta_token = self.delta_token.get_cloned();
let room_subscriptions = self.subscriptions.lock_ref().clone();
let unsubscribe_rooms = {
let unsubs = self.unsubscribe.lock_ref().to_vec();
if !unsubs.is_empty() {
self.unsubscribe.lock_mut().clear();
}
unsubs
};
let timeout = Duration::from_secs(30);

// implement stickiness by only sending extensions if they have
// changed since the last time we sent them
let extensions = {
let extensions = self.extensions.lock().unwrap();
if *extensions == *self.sent_extensions.lock().unwrap() {
None
} else {
extensions.clone()
}
};

let req = assign!(v4::Request::new(), {
lists: requests,
pos,
delta_token,
timeout: Some(timeout),
room_subscriptions,
unsubscribe_rooms,
extensions: extensions.clone().unwrap_or_default(),
});
debug!("requesting");

// 30s for the long poll + 30s for network delays
let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30));
let req = client.send_with_homeserver(req, Some(request_config), self.homeserver.as_ref().map(ToString::to_string));

#[cfg(feature = "e2e-encryption")]
let resp_res = {
let (e2ee_uploads, resp) = futures_util::join!(client.send_outgoing_requests(), req);
if let Err(e) = e2ee_uploads {
error!(error = ?e, "Error while sending outgoing E2EE requests");
}
resp
};
#[cfg(not(feature = "e2e-encryption"))]
let resp_res = req.await;

let resp = match resp_res {
Ok(r) => {
match self.sync_once(&mut views).await {
Ok(Some(updates)) => {
self.failure_count.store(0, Ordering::SeqCst);
r
yield Ok(updates)
},
Ok(None) => {
break;
}
Err(e) => {
if e.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
// session expired, let's reset
if self.failure_count.fetch_add(1, Ordering::SeqCst) >= 3 {
error!("session expired three times in a row");
yield Err(e.into());

break
}

warn!("Session expired. Restarting sliding sync.");
*self.pos.lock_mut() = None;

@@ -1056,22 +1077,12 @@ impl SlidingSync {

debug!(?self.extensions, "Resetting view stream");
}
yield Err(e.into());
continue
}
};

debug!("received");

let updates = match self.handle_response(resp, extensions, &mut views).await {
Ok(r) => r,
Err(e) => {
yield Err(e.into());

continue
}
};
debug!("handled");
yield Ok(updates);
}
}
}
}
@@ -1088,9 +1099,10 @@ impl SlidingSync {
/// # block_on(async {
/// # let homeserver = Url::parse("http://example.com")?;
/// let client = Client::new(homeserver).await?;
/// let sliding_sync = client.sliding_sync().default_with_fullsync().build()?;
/// let sliding_sync =
/// client.sliding_sync().await.add_fullsync_view().build().await?;
///
/// # })
/// # anyhow::Ok(()) });
/// ```
#[derive(Clone, Debug, Builder)]
#[builder(build_fn(name = "finish_build"), pattern = "owned", derive(Clone, Debug))]