Skip to content

Commit 01be5c0

Browse files
committed
please work
Created using spr 1.3.6-beta.1
1 parent d9147e2 commit 01be5c0

File tree

3 files changed

+84
-119
lines changed

3 files changed

+84
-119
lines changed

src/async_traits.rs

+58-88
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ where
4848
Conn: 'static + DieselConnection + R2D2Connection,
4949
Self: Send + Sized + 'static,
5050
{
51-
async fn ping_async(&mut self) -> Result<(), RunError<DieselError>> {
51+
async fn ping_async(&mut self) -> Result<(), RunError> {
5252
self.as_async_conn().run(|conn| conn.ping()).await
5353
}
5454

5555
async fn is_broken_async(&mut self) -> bool {
5656
self.as_async_conn()
57-
.run(|conn| Ok::<bool, ()>(conn.is_broken()))
57+
.run(|conn| Ok::<bool, _>(conn.is_broken()))
5858
.await
5959
.unwrap()
6060
}
@@ -75,42 +75,36 @@ where
7575
fn as_async_conn(&self) -> &Connection<Conn>;
7676

7777
/// Runs the function `f` in an context where blocking is safe.
78-
async fn run<R, E, Func>(&self, f: Func) -> Result<R, RunError<E>>
78+
async fn run<R, Func>(&self, f: Func) -> Result<R, RunError>
7979
where
8080
R: Send + 'static,
81-
E: Send + 'static,
82-
Func: FnOnce(&mut Conn) -> Result<R, E> + Send + 'static,
81+
Func: FnOnce(&mut Conn) -> Result<R, DieselError> + Send + 'static,
8382
{
8483
let connection = self.get_owned_connection();
8584
connection.run_with_connection(f).await
8685
}
8786

8887
#[doc(hidden)]
89-
async fn run_with_connection<R, E, Func>(self, f: Func) -> Result<R, RunError<E>>
88+
async fn run_with_connection<R, Func>(self, f: Func) -> Result<R, RunError>
9089
where
9190
R: Send + 'static,
92-
E: Send + 'static,
93-
Func: FnOnce(&mut Conn) -> Result<R, E> + Send + 'static,
91+
Func: FnOnce(&mut Conn) -> Result<R, DieselError> + Send + 'static,
9492
{
9593
handle_spawn_blocking_error(spawn_blocking(move || f(&mut *self.as_sync_conn())).await)
9694
}
9795

9896
#[doc(hidden)]
99-
async fn run_with_shared_connection<R, E, Func>(
100-
self: &Arc<Self>,
101-
f: Func,
102-
) -> Result<R, RunError<E>>
97+
async fn run_with_shared_connection<R, Func>(self: &Arc<Self>, f: Func) -> Result<R, RunError>
10398
where
10499
R: Send + 'static,
105-
E: Send + 'static,
106-
Func: FnOnce(&mut Conn) -> Result<R, E> + Send + 'static,
100+
Func: FnOnce(&mut Conn) -> Result<R, DieselError> + Send + 'static,
107101
{
108102
let conn = self.clone();
109103
handle_spawn_blocking_error(spawn_blocking(move || f(&mut *conn.as_sync_conn())).await)
110104
}
111105

112106
#[doc(hidden)]
113-
async fn transaction_depth(&self) -> Result<u32, RunError<DieselError>> {
107+
async fn transaction_depth(&self) -> Result<u32, RunError> {
114108
let conn = self.get_owned_connection();
115109

116110
Self::run_with_connection(conn, |conn| {
@@ -130,9 +124,9 @@ where
130124
// This method is a wrapper around that call, with validation that
131125
// we're actually issuing the BEGIN statement here.
132126
#[doc(hidden)]
133-
async fn start_transaction(self: &Arc<Self>) -> Result<(), RunError<DieselError>> {
127+
async fn start_transaction(self: &Arc<Self>) -> Result<(), RunError> {
134128
if self.transaction_depth().await? != 0 {
135-
return Err(RunError::User(DieselError::AlreadyInTransaction));
129+
return Err(RunError::DieselError(DieselError::AlreadyInTransaction));
136130
}
137131
self.run_with_shared_connection(|conn| Conn::TransactionManager::begin_transaction(conn))
138132
.await?;
@@ -145,11 +139,11 @@ where
145139
// This method is a wrapper around that call, with validation that
146140
// we're actually issuing our first SAVEPOINT here.
147141
#[doc(hidden)]
148-
async fn add_retry_savepoint(self: &Arc<Self>) -> Result<(), RunError<DieselError>> {
142+
async fn add_retry_savepoint(self: &Arc<Self>) -> Result<(), RunError> {
149143
match self.transaction_depth().await? {
150-
0 => return Err(RunError::User(DieselError::NotInTransaction)),
144+
0 => return Err(RunError::DieselError(DieselError::NotInTransaction)),
151145
1 => (),
152-
_ => return Err(RunError::User(DieselError::AlreadyInTransaction)),
146+
_ => return Err(RunError::DieselError(DieselError::AlreadyInTransaction)),
153147
};
154148

155149
self.run_with_shared_connection(|conn| Conn::TransactionManager::begin_transaction(conn))
@@ -158,14 +152,14 @@ where
158152
}
159153

160154
#[doc(hidden)]
161-
async fn commit_transaction(self: &Arc<Self>) -> Result<(), RunError<DieselError>> {
155+
async fn commit_transaction(self: &Arc<Self>) -> Result<(), RunError> {
162156
self.run_with_shared_connection(|conn| Conn::TransactionManager::commit_transaction(conn))
163157
.await?;
164158
Ok(())
165159
}
166160

167161
#[doc(hidden)]
168-
async fn rollback_transaction(self: &Arc<Self>) -> Result<(), RunError<DieselError>> {
162+
async fn rollback_transaction(self: &Arc<Self>) -> Result<(), RunError> {
169163
self.run_with_shared_connection(|conn| {
170164
Conn::TransactionManager::rollback_transaction(conn)
171165
})
@@ -184,10 +178,10 @@ where
184178
&'a self,
185179
f: Func,
186180
retry: RetryFunc,
187-
) -> Result<R, RunError<DieselError>>
181+
) -> Result<R, RunError>
188182
where
189183
R: Any + Send + 'static,
190-
Fut: FutureExt<Output = Result<R, RunError<DieselError>>> + Send,
184+
Fut: FutureExt<Output = Result<R, RunError>> + Send,
191185
Func: (Fn(Connection<Conn>) -> Fut) + Send + Sync,
192186
RetryFut: FutureExt<Output = bool> + Send,
193187
RetryFunc: Fn() -> RetryFut + Send + Sync,
@@ -220,13 +214,11 @@ where
220214
#[cfg(feature = "cockroach")]
221215
async fn transaction_async_with_retry_inner(
222216
&self,
223-
f: &(dyn Fn(
224-
Connection<Conn>,
225-
) -> BoxFuture<'_, Result<Box<dyn Any + Send>, RunError<DieselError>>>
217+
f: &(dyn Fn(Connection<Conn>) -> BoxFuture<'_, Result<Box<dyn Any + Send>, RunError>>
226218
+ Send
227219
+ Sync),
228220
retry: &(dyn Fn() -> BoxFuture<'_, bool> + Send + Sync),
229-
) -> Result<Box<dyn Any + Send>, RunError<DieselError>> {
221+
) -> Result<Box<dyn Any + Send>, RunError> {
230222
// Check out a connection once, and use it for the duration of the
231223
// operation.
232224
let conn = Arc::new(self.get_owned_connection());
@@ -264,7 +256,7 @@ where
264256
// We're still in the transaction, but we at least
265257
// tried to ROLLBACK to our savepoint.
266258
let retried = match &err {
267-
RunError::User(err) => retryable_error(err) && retry().await,
259+
RunError::DieselError(err) => retryable_error(err) && retry().await,
268260
RunError::RuntimeShutdown => false,
269261
};
270262
if retried {
@@ -282,7 +274,7 @@ where
282274
Self::commit_transaction(&conn).await?;
283275
return Ok(value);
284276
}
285-
Err(RunError::User(user_error)) => {
277+
Err(RunError::DieselError(user_error)) => {
286278
// The user-level operation failed: ROLLBACK to the retry
287279
// savepoint.
288280
if let Err(first_rollback_err) = Self::rollback_transaction(&conn).await {
@@ -302,7 +294,7 @@ where
302294

303295
// If we aren't retrying, ROLLBACK the BEGIN statement too.
304296
return match Self::rollback_transaction(&conn).await {
305-
Ok(()) => Err(RunError::User(user_error)),
297+
Ok(()) => Err(RunError::DieselError(user_error)),
306298
Err(err) => Err(err),
307299
};
308300
}
@@ -321,7 +313,7 @@ where
321313
async fn transaction_async<R, E, Func, Fut, 'a>(&'a self, f: Func) -> Result<R, E>
322314
where
323315
R: Send + 'static,
324-
E: From<RunError<DieselError>> + Send + 'static,
316+
E: From<RunError> + Send + 'static,
325317
Fut: Future<Output = Result<R, E>> + Send,
326318
Func: FnOnce(Connection<Conn>) -> Fut + Send,
327319
{
@@ -354,7 +346,7 @@ where
354346
>,
355347
) -> Result<Box<dyn Any + Send>, E>
356348
where
357-
E: From<RunError<DieselError>> + Send + 'static,
349+
E: From<RunError> + Send + 'static,
358350
{
359351
// Check out a connection once, and use it for the duration of the
360352
// operation.
@@ -365,15 +357,8 @@ where
365357
//
366358
// However, it modifies all callsites to instead issue
367359
// known-to-be-synchronous operations from an asynchronous context.
368-
conn.run_with_shared_connection(|conn| {
369-
Conn::TransactionManager::begin_transaction(conn)
370-
.map_err(|err| E::from(RunError::User(err)))
371-
})
372-
.await
373-
.map_err(|err| match err {
374-
RunError::User(err) => err,
375-
RunError::RuntimeShutdown => RunError::RuntimeShutdown.into(),
376-
})?;
360+
conn.run_with_shared_connection(|conn| Conn::TransactionManager::begin_transaction(conn))
361+
.await?;
377362

378363
// TODO: The ideal interface would pass the "async_conn" object to the
379364
// underlying function "f" by reference.
@@ -390,42 +375,29 @@ where
390375
let async_conn = Connection(Self::as_async_conn(&conn).0.clone());
391376
match f(async_conn).await {
392377
Ok(value) => {
393-
match conn
394-
.run_with_shared_connection(|conn| {
395-
Conn::TransactionManager::commit_transaction(conn)
396-
.map_err(|err| E::from(RunError::User(err)))
397-
})
398-
.await
399-
{
400-
Ok(()) => Ok(value),
401-
// XXX: we should try to roll this back
402-
Err(RunError::User(err)) => Err(err),
403-
Err(RunError::RuntimeShutdown) => Err(RunError::RuntimeShutdown.into()),
404-
}
378+
conn.run_with_shared_connection(|conn| {
379+
Conn::TransactionManager::commit_transaction(conn)
380+
})
381+
.await?;
382+
Ok(value)
405383
}
406384
Err(user_error) => {
407-
match conn
408-
.run_with_shared_connection(|conn| {
409-
Conn::TransactionManager::rollback_transaction(conn)
410-
.map_err(|err| E::from(RunError::User(err)))
411-
})
412-
.await
413-
{
414-
Ok(()) => Err(user_error),
415-
Err(RunError::User(err)) => Err(err),
416-
Err(RunError::RuntimeShutdown) => Err(RunError::RuntimeShutdown.into()),
417-
}
385+
conn.run_with_shared_connection(|conn| {
386+
Conn::TransactionManager::rollback_transaction(conn)
387+
})
388+
.await?;
389+
Err(user_error)
418390
}
419391
}
420392
}
421393
}
422394

423-
fn handle_spawn_blocking_error<T, E>(
424-
result: Result<Result<T, E>, JoinError>,
425-
) -> Result<T, RunError<E>> {
395+
fn handle_spawn_blocking_error<T>(
396+
result: Result<Result<T, DieselError>, JoinError>,
397+
) -> Result<T, RunError> {
426398
match result {
427399
Ok(Ok(v)) => Ok(v),
428-
Ok(Err(err)) => Err(RunError::User(err)),
400+
Ok(Err(err)) => Err(RunError::DieselError(err)),
429401
Err(err) => {
430402
if err.is_cancelled() {
431403
// The only way a spawn_blocking task can be marked cancelled
@@ -438,7 +410,11 @@ fn handle_spawn_blocking_error<T, E>(
438410
} else {
439411
// Not possible to reach this as of Tokio 1.40, but maybe in
440412
// future versions.
441-
panic!("unexpected JoinError: {:?}", err);
413+
panic!(
414+
"unexpected JoinError, did a new version of Tokio add \
415+
a source other than panics and cancellations? {:?}",
416+
err
417+
);
442418
}
443419
}
444420
}
@@ -450,26 +426,26 @@ pub trait AsyncRunQueryDsl<Conn, AsyncConn>
450426
where
451427
Conn: 'static + DieselConnection,
452428
{
453-
async fn execute_async(self, asc: &AsyncConn) -> Result<usize, RunError<DieselError>>
429+
async fn execute_async(self, asc: &AsyncConn) -> Result<usize, RunError>
454430
where
455431
Self: ExecuteDsl<Conn>;
456432

457-
async fn load_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError<DieselError>>
433+
async fn load_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError>
458434
where
459435
U: Send + 'static,
460436
Self: LoadQuery<'static, Conn, U>;
461437

462-
async fn get_result_async<U>(self, asc: &AsyncConn) -> Result<U, RunError<DieselError>>
438+
async fn get_result_async<U>(self, asc: &AsyncConn) -> Result<U, RunError>
463439
where
464440
U: Send + 'static,
465441
Self: LoadQuery<'static, Conn, U>;
466442

467-
async fn get_results_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError<DieselError>>
443+
async fn get_results_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError>
468444
where
469445
U: Send + 'static,
470446
Self: LoadQuery<'static, Conn, U>;
471447

472-
async fn first_async<U>(self, asc: &AsyncConn) -> Result<U, RunError<DieselError>>
448+
async fn first_async<U>(self, asc: &AsyncConn) -> Result<U, RunError>
473449
where
474450
U: Send + 'static,
475451
Self: LimitDsl,
@@ -483,38 +459,38 @@ where
483459
Conn: 'static + DieselConnection,
484460
AsyncConn: Send + Sync + AsyncConnection<Conn>,
485461
{
486-
async fn execute_async(self, asc: &AsyncConn) -> Result<usize, RunError<DieselError>>
462+
async fn execute_async(self, asc: &AsyncConn) -> Result<usize, RunError>
487463
where
488464
Self: ExecuteDsl<Conn>,
489465
{
490466
asc.run(|conn| self.execute(conn)).await
491467
}
492468

493-
async fn load_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError<DieselError>>
469+
async fn load_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError>
494470
where
495471
U: Send + 'static,
496472
Self: LoadQuery<'static, Conn, U>,
497473
{
498474
asc.run(|conn| self.load(conn)).await
499475
}
500476

501-
async fn get_result_async<U>(self, asc: &AsyncConn) -> Result<U, RunError<DieselError>>
477+
async fn get_result_async<U>(self, asc: &AsyncConn) -> Result<U, RunError>
502478
where
503479
U: Send + 'static,
504480
Self: LoadQuery<'static, Conn, U>,
505481
{
506482
asc.run(|conn| self.get_result(conn)).await
507483
}
508484

509-
async fn get_results_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError<DieselError>>
485+
async fn get_results_async<U>(self, asc: &AsyncConn) -> Result<Vec<U>, RunError>
510486
where
511487
U: Send + 'static,
512488
Self: LoadQuery<'static, Conn, U>,
513489
{
514490
asc.run(|conn| self.get_results(conn)).await
515491
}
516492

517-
async fn first_async<U>(self, asc: &AsyncConn) -> Result<U, RunError<DieselError>>
493+
async fn first_async<U>(self, asc: &AsyncConn) -> Result<U, RunError>
518494
where
519495
U: Send + 'static,
520496
Self: LimitDsl,
@@ -529,10 +505,7 @@ pub trait AsyncSaveChangesDsl<Conn, AsyncConn>
529505
where
530506
Conn: 'static + DieselConnection,
531507
{
532-
async fn save_changes_async<Output>(
533-
self,
534-
asc: &AsyncConn,
535-
) -> Result<Output, RunError<DieselError>>
508+
async fn save_changes_async<Output>(self, asc: &AsyncConn) -> Result<Output, RunError>
536509
where
537510
Self: Sized,
538511
Conn: diesel::query_dsl::UpdateAndFetchResults<Self, Output>,
@@ -546,10 +519,7 @@ where
546519
Conn: 'static + DieselConnection,
547520
AsyncConn: Send + Sync + AsyncConnection<Conn>,
548521
{
549-
async fn save_changes_async<Output>(
550-
self,
551-
asc: &AsyncConn,
552-
) -> Result<Output, RunError<DieselError>>
522+
async fn save_changes_async<Output>(self, asc: &AsyncConn) -> Result<Output, RunError>
553523
where
554524
Conn: diesel::query_dsl::UpdateAndFetchResults<Self, Output>,
555525
Output: Send + 'static,

0 commit comments

Comments
 (0)