From 9a483fec9062a62c81b5c82ef1052df8581329a2 Mon Sep 17 00:00:00 2001 From: Lucas Sunsi Abreu Date: Wed, 16 Jul 2025 08:58:09 -0300 Subject: [PATCH 1/4] add reproduction of rollback after commit on serialization error --- tests/transactions.rs | 122 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/tests/transactions.rs b/tests/transactions.rs index b4f44e0..140b623 100644 --- a/tests/transactions.rs +++ b/tests/transactions.rs @@ -104,3 +104,125 @@ async fn concurrent_serializable_transactions_behave_correctly() { res.unwrap_err() ); } + +#[cfg(feature = "postgres")] +#[tokio::test] +async fn commit_with_serialization_failure_already_ends_transaction() { + use diesel::prelude::*; + use diesel_async::{AsyncConnection, RunQueryDsl}; + use std::sync::Arc; + use tokio::sync::Barrier; + + table! { + users3 { + id -> Integer, + } + } + + // create an async connection + let mut conn = super::connection_without_transaction().await; + + struct A(Vec<&'static str>); + impl diesel::connection::Instrumentation for A { + fn on_connection_event(&mut self, event: diesel::connection::InstrumentationEvent<'_>) { + if let diesel::connection::InstrumentationEvent::StartQuery { query, .. } = event { + let q = query.to_string(); + let q = q.split_once(' ').map(|(a, _)| a).unwrap_or(&q); + + if matches!(q, "BEGIN" | "COMMIT" | "ROLLBACK") { + assert_eq!(q, self.0.pop().unwrap()); + } + } + } + } + conn.set_instrumentation(A(vec!["COMMIT", "BEGIN", "COMMIT", "BEGIN"])); + + let mut conn1 = super::connection_without_transaction().await; + + diesel::sql_query("CREATE TABLE IF NOT EXISTS users3 (id int);") + .execute(&mut conn) + .await + .unwrap(); + + let barrier_1 = Arc::new(Barrier::new(2)); + let barrier_2 = Arc::new(Barrier::new(2)); + let barrier_1_for_tx1 = barrier_1.clone(); + let barrier_1_for_tx2 = barrier_1.clone(); + let barrier_2_for_tx1 = barrier_2.clone(); + let barrier_2_for_tx2 = barrier_2.clone(); + + let mut tx = conn.build_transaction().serializable().read_write(); + + let res = tx.run(|conn| { + Box::pin(async { + users3::table.select(users3::id).load::(conn).await?; + + barrier_1_for_tx1.wait().await; + diesel::insert_into(users3::table) + .values(users3::id.eq(1)) + .execute(conn) + .await?; + barrier_2_for_tx1.wait().await; + + Ok::<_, diesel::result::Error>(()) + }) + }); + + let mut tx1 = conn1.build_transaction().serializable().read_write(); + + let res1 = async { + let res = tx1 + .run(|conn| { + Box::pin(async { + users3::table.select(users3::id).load::(conn).await?; + + barrier_1_for_tx2.wait().await; + diesel::insert_into(users3::table) + .values(users3::id.eq(1)) + .execute(conn) + .await?; + + Ok::<_, diesel::result::Error>(()) + }) + }) + .await; + barrier_2_for_tx2.wait().await; + res + }; + + let (res, res1) = tokio::join!(res, res1); + let _ = diesel::sql_query("DROP TABLE users3") + .execute(&mut conn1) + .await; + + assert!( + res1.is_ok(), + "Expected the second transaction to be succussfull, but got an error: {:?}", + res1.unwrap_err() + ); + + assert!(res.is_err(), "Expected the first transaction to fail"); + let err = res.unwrap_err(); + assert!( + matches!( + &err, + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::SerializationFailure, + _ + ) + ), + "Expected an serialization failure but got another error: {err:?}" + ); + + let mut tx = conn.build_transaction(); + + let res = tx + .run(|_| Box::pin(async { Ok::<_, diesel::result::Error>(()) })) + .await; + + assert!( + res.is_ok(), + "Expect transaction to run fine but got an error: {:?}", + res.unwrap_err() + ); +} From 8792d58b94c6003932e62ca3777467461a4799dd Mon Sep 17 00:00:00 2001 From: Lucas Sunsi Abreu Date: Sun, 20 Jul 2025 08:03:16 -0300 Subject: [PATCH 2/4] fix: use another table name to avoid contention (but maybe it should be synchronized) --- tests/transactions.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/transactions.rs b/tests/transactions.rs index 140b623..a7d0a5c 100644 --- a/tests/transactions.rs +++ b/tests/transactions.rs @@ -114,7 +114,7 @@ async fn commit_with_serialization_failure_already_ends_transaction() { use tokio::sync::Barrier; table! { - users3 { + users4 { id -> Integer, } } @@ -139,7 +139,7 @@ async fn commit_with_serialization_failure_already_ends_transaction() { let mut conn1 = super::connection_without_transaction().await; - diesel::sql_query("CREATE TABLE IF NOT EXISTS users3 (id int);") + diesel::sql_query("CREATE TABLE IF NOT EXISTS users4 (id int);") .execute(&mut conn) .await .unwrap(); @@ -155,11 +155,11 @@ async fn commit_with_serialization_failure_already_ends_transaction() { let res = tx.run(|conn| { Box::pin(async { - users3::table.select(users3::id).load::(conn).await?; + users4::table.select(users4::id).load::(conn).await?; barrier_1_for_tx1.wait().await; - diesel::insert_into(users3::table) - .values(users3::id.eq(1)) + diesel::insert_into(users4::table) + .values(users4::id.eq(1)) .execute(conn) .await?; barrier_2_for_tx1.wait().await; @@ -174,11 +174,11 @@ async fn commit_with_serialization_failure_already_ends_transaction() { let res = tx1 .run(|conn| { Box::pin(async { - users3::table.select(users3::id).load::(conn).await?; + users4::table.select(users4::id).load::(conn).await?; barrier_1_for_tx2.wait().await; - diesel::insert_into(users3::table) - .values(users3::id.eq(1)) + diesel::insert_into(users4::table) + .values(users4::id.eq(1)) .execute(conn) .await?; From a175eb264f3e98e8548a9c4952b316a5a8808744 Mon Sep 17 00:00:00 2001 From: Lucas Sunsi Abreu Date: Wed, 23 Jul 2025 06:54:31 -0300 Subject: [PATCH 3/4] avoid rollback after serialization on commit --- src/pg/mod.rs | 8 +++++--- src/transaction_manager.rs | 21 +++++++++++++++++++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/pg/mod.rs b/src/pg/mod.rs index 39811b3..a6fbb07 100644 --- a/src/pg/mod.rs +++ b/src/pg/mod.rs @@ -387,9 +387,11 @@ fn update_transaction_manager_status( if let Err(diesel::result::Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result { - transaction_manager - .status - .set_requires_rollback_maybe_up_to_top_level(true) + if !transaction_manager.is_commit { + transaction_manager + .status + .set_requires_rollback_maybe_up_to_top_level(true); + } } query_result } diff --git a/src/transaction_manager.rs b/src/transaction_manager.rs index 6362498..22115ac 100644 --- a/src/transaction_manager.rs +++ b/src/transaction_manager.rs @@ -146,6 +146,11 @@ pub struct AnsiTransactionManager { // See https://github.com/weiznich/diesel_async/issues/198 for // details pub(crate) is_broken: Arc, + // this boolean flag tracks whether we are currently in this process + // of trying to commit the transaction. this is useful because if we + // are and we get a serialization failure, we might not want to attempt + // a rollback up the chain. + pub(crate) is_commit: bool, } impl AnsiTransactionManager { @@ -355,9 +360,18 @@ where conn.instrumentation() .on_connection_event(InstrumentationEvent::commit_transaction(depth)); - let is_broken = conn.transaction_state().is_broken.clone(); + let is_broken = { + let transaction_state = conn.transaction_state(); + transaction_state.is_commit = true; + transaction_state.is_broken.clone() + }; + + let res = + Self::critical_transaction_block(&is_broken, conn.batch_execute(&commit_sql)).await; + + conn.transaction_state().is_commit = false; - match Self::critical_transaction_block(&is_broken, conn.batch_execute(&commit_sql)).await { + match res { Ok(()) => { match Self::get_transaction_state(conn)? .change_transaction_depth(TransactionDepthChange::DecreaseDepth) @@ -392,6 +406,9 @@ where }); } } + } else { + Self::get_transaction_state(conn)? + .change_transaction_depth(TransactionDepthChange::DecreaseDepth)?; } Err(commit_error) } From 4cdaf87304d5e2af762e6840243a52df1af1f6c9 Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Fri, 25 Jul 2025 10:07:47 +0000 Subject: [PATCH 4/4] Drop the right table --- tests/transactions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/transactions.rs b/tests/transactions.rs index a7d0a5c..6de782c 100644 --- a/tests/transactions.rs +++ b/tests/transactions.rs @@ -191,7 +191,7 @@ async fn commit_with_serialization_failure_already_ends_transaction() { }; let (res, res1) = tokio::join!(res, res1); - let _ = diesel::sql_query("DROP TABLE users3") + let _ = diesel::sql_query("DROP TABLE users4") .execute(&mut conn1) .await;