@@ -938,6 +938,90 @@ impl SlidingSync {
938
938
Ok ( update)
939
939
}
940
940
941
+ async fn sync_once (
942
+ & self ,
943
+ views : & mut BTreeMap < String , SlidingSyncViewRequestGenerator > ,
944
+ ) -> Result < Option < UpdateSummary > > {
945
+ let mut requests = BTreeMap :: new ( ) ;
946
+ let mut to_remove = Vec :: new ( ) ;
947
+
948
+ for ( name, generator) in views. iter_mut ( ) {
949
+ if let Some ( request) = generator. next ( ) {
950
+ requests. insert ( name. clone ( ) , request) ;
951
+ } else {
952
+ to_remove. push ( name. clone ( ) ) ;
953
+ }
954
+ }
955
+
956
+ for n in to_remove {
957
+ views. remove ( & n) ;
958
+ }
959
+
960
+ if views. is_empty ( ) {
961
+ return Ok ( None ) ;
962
+ }
963
+
964
+ let pos = self . pos . get_cloned ( ) ;
965
+ let delta_token = self . delta_token . get_cloned ( ) ;
966
+ let room_subscriptions = self . subscriptions . lock_ref ( ) . clone ( ) ;
967
+ let unsubscribe_rooms = {
968
+ let unsubs = self . unsubscribe . lock_ref ( ) . to_vec ( ) ;
969
+ if !unsubs. is_empty ( ) {
970
+ self . unsubscribe . lock_mut ( ) . clear ( ) ;
971
+ }
972
+ unsubs
973
+ } ;
974
+ let timeout = Duration :: from_secs ( 30 ) ;
975
+
976
+ // implement stickiness by only sending extensions if they have
977
+ // changed since the last time we sent them
978
+ let extensions = {
979
+ let extensions = self . extensions . lock ( ) . unwrap ( ) ;
980
+ if * extensions == * self . sent_extensions . lock ( ) . unwrap ( ) {
981
+ None
982
+ } else {
983
+ extensions. clone ( )
984
+ }
985
+ } ;
986
+
987
+ let request = assign ! ( v4:: Request :: new( ) , {
988
+ lists: requests,
989
+ pos,
990
+ delta_token,
991
+ timeout: Some ( timeout) ,
992
+ room_subscriptions,
993
+ unsubscribe_rooms,
994
+ extensions: extensions. clone( ) . unwrap_or_default( ) ,
995
+ } ) ;
996
+ debug ! ( "requesting" ) ;
997
+
998
+ // 30s for the long poll + 30s for network delays
999
+ let request_config = RequestConfig :: default ( ) . timeout ( timeout + Duration :: from_secs ( 30 ) ) ;
1000
+ let request = self . client . send_with_homeserver (
1001
+ request,
1002
+ Some ( request_config) ,
1003
+ self . homeserver . as_ref ( ) . map ( ToString :: to_string) ,
1004
+ ) ;
1005
+
1006
+ #[ cfg( feature = "e2e-encryption" ) ]
1007
+ let response = {
1008
+ let ( e2ee_uploads, resp) =
1009
+ futures_util:: join!( self . client. send_outgoing_requests( ) , request) ;
1010
+ if let Err ( e) = e2ee_uploads {
1011
+ error ! ( error = ?e, "Error while sending outgoing E2EE requests" ) ;
1012
+ }
1013
+ resp
1014
+ } ?;
1015
+ #[ cfg( not( feature = "e2e-encryption" ) ) ]
1016
+ let response = request. await ?;
1017
+ debug ! ( "received" ) ;
1018
+
1019
+ let updates = self . handle_response ( response, extensions, views) . await ?;
1020
+ debug ! ( "handled" ) ;
1021
+
1022
+ Ok ( Some ( updates) )
1023
+ }
1024
+
941
1025
/// Create the inner stream for the view.
942
1026
///
943
1027
/// Run this stream to receive new updates from the server.
@@ -950,94 +1034,31 @@ impl SlidingSync {
950
1034
}
951
1035
views
952
1036
} ;
953
- let client = self . client . clone ( ) ;
954
1037
955
1038
debug ! ( ?self . extensions, "Setting view stream going" ) ;
956
- async_stream:: stream! {
957
1039
1040
+ async_stream:: stream! {
958
1041
loop {
959
1042
debug!( ?self . extensions, "Sync loop running" ) ;
960
1043
961
- let mut requests = BTreeMap :: new( ) ;
962
- let mut to_remove = Vec :: new( ) ;
963
-
964
- for ( name, generator) in views. iter_mut( ) {
965
- if let Some ( request) = generator. next( ) {
966
- requests. insert( name. clone( ) , request) ;
967
- } else {
968
- to_remove. push( name. clone( ) ) ;
969
- }
970
- }
971
- for n in to_remove {
972
- views. remove( & n) ;
973
- }
974
-
975
- if views. is_empty( ) {
976
- return
977
- }
978
-
979
- let pos = self . pos. get_cloned( ) ;
980
- let delta_token = self . delta_token. get_cloned( ) ;
981
- let room_subscriptions = self . subscriptions. lock_ref( ) . clone( ) ;
982
- let unsubscribe_rooms = {
983
- let unsubs = self . unsubscribe. lock_ref( ) . to_vec( ) ;
984
- if !unsubs. is_empty( ) {
985
- self . unsubscribe. lock_mut( ) . clear( ) ;
986
- }
987
- unsubs
988
- } ;
989
- let timeout = Duration :: from_secs( 30 ) ;
990
-
991
- // implement stickiness by only sending extensions if they have
992
- // changed since the last time we sent them
993
- let extensions = {
994
- let extensions = self . extensions. lock( ) . unwrap( ) ;
995
- if * extensions == * self . sent_extensions. lock( ) . unwrap( ) {
996
- None
997
- } else {
998
- extensions. clone( )
999
- }
1000
- } ;
1001
-
1002
- let req = assign!( v4:: Request :: new( ) , {
1003
- lists: requests,
1004
- pos,
1005
- delta_token,
1006
- timeout: Some ( timeout) ,
1007
- room_subscriptions,
1008
- unsubscribe_rooms,
1009
- extensions: extensions. clone( ) . unwrap_or_default( ) ,
1010
- } ) ;
1011
- debug!( "requesting" ) ;
1012
-
1013
- // 30s for the long poll + 30s for network delays
1014
- let request_config = RequestConfig :: default ( ) . timeout( timeout + Duration :: from_secs( 30 ) ) ;
1015
- let req = client. send_with_homeserver( req, Some ( request_config) , self . homeserver. as_ref( ) . map( ToString :: to_string) ) ;
1016
-
1017
- #[ cfg( feature = "e2e-encryption" ) ]
1018
- let resp_res = {
1019
- let ( e2ee_uploads, resp) = futures_util:: join!( client. send_outgoing_requests( ) , req) ;
1020
- if let Err ( e) = e2ee_uploads {
1021
- error!( error = ?e, "Error while sending outgoing E2EE requests" ) ;
1022
- }
1023
- resp
1024
- } ;
1025
- #[ cfg( not( feature = "e2e-encryption" ) ) ]
1026
- let resp_res = req. await ;
1027
-
1028
- let resp = match resp_res {
1029
- Ok ( r) => {
1044
+ match self . sync_once( & mut views) . await {
1045
+ Ok ( Some ( updates) ) => {
1030
1046
self . failure_count. store( 0 , Ordering :: SeqCst ) ;
1031
- r
1047
+ yield Ok ( updates )
1032
1048
} ,
1049
+ Ok ( None ) => {
1050
+ break ;
1051
+ }
1033
1052
Err ( e) => {
1034
1053
if e. client_api_error_kind( ) == Some ( & ErrorKind :: UnknownPos ) {
1035
1054
// session expired, let's reset
1036
1055
if self . failure_count. fetch_add( 1 , Ordering :: SeqCst ) >= 3 {
1037
1056
error!( "session expired three times in a row" ) ;
1038
1057
yield Err ( e. into( ) ) ;
1058
+
1039
1059
break
1040
1060
}
1061
+
1041
1062
warn!( "Session expired. Restarting sliding sync." ) ;
1042
1063
* self . pos. lock_mut( ) = None ;
1043
1064
@@ -1046,22 +1067,12 @@ impl SlidingSync {
1046
1067
1047
1068
debug!( ?self . extensions, "Resetting view stream" ) ;
1048
1069
}
1049
- yield Err ( e. into( ) ) ;
1050
- continue
1051
- }
1052
- } ;
1053
1070
1054
- debug!( "received" ) ;
1055
-
1056
- let updates = match self . handle_response( resp, extensions, & mut views) . await {
1057
- Ok ( r) => r,
1058
- Err ( e) => {
1059
1071
yield Err ( e. into( ) ) ;
1072
+
1060
1073
continue
1061
1074
}
1062
- } ;
1063
- debug!( "handled" ) ;
1064
- yield Ok ( updates) ;
1075
+ }
1065
1076
}
1066
1077
}
1067
1078
}
0 commit comments