Skip to content

Commit 7f8f4f6

Browse files
authored
allow users to include emit_events in their transactions (#25)
1 parent a3efbbf commit 7f8f4f6

File tree

2 files changed

+145
-1
lines changed

2 files changed

+145
-1
lines changed

src/client.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,47 @@ where
506506
payload: &T,
507507
queue_name: Option<&str>,
508508
) -> anyhow::Result<()> {
509+
self.emit_event_with(&self.pool, event_name, payload, queue_name)
510+
.await
511+
}
512+
513+
/// Emit an event with a custom executor (e.g., a transaction).
514+
///
515+
/// This allows you to atomically emit an event as part of a larger transaction.
516+
///
517+
/// # Example
518+
///
519+
/// ```ignore
520+
/// let mut tx = client.pool().begin().await?;
521+
///
522+
/// sqlx::query("INSERT INTO orders (id) VALUES ($1)")
523+
/// .bind(order_id)
524+
/// .execute(&mut *tx)
525+
/// .await?;
526+
///
527+
/// client.emit_event_with(&mut *tx, "order_created", &order_id, None).await?;
528+
///
529+
/// tx.commit().await?;
530+
/// ```
531+
#[cfg_attr(
532+
feature = "telemetry",
533+
tracing::instrument(
534+
name = "durable.client.emit_event",
535+
skip(self, executor, payload),
536+
fields(queue, event_name = %event_name)
537+
)
538+
)]
539+
pub async fn emit_event_with<'e, T, E>(
540+
&self,
541+
executor: E,
542+
event_name: &str,
543+
payload: &T,
544+
queue_name: Option<&str>,
545+
) -> anyhow::Result<()>
546+
where
547+
T: Serialize,
548+
E: Executor<'e, Database = Postgres>,
549+
{
509550
anyhow::ensure!(!event_name.is_empty(), "event_name must be non-empty");
510551

511552
let queue = queue_name.unwrap_or(&self.queue_name);
@@ -520,7 +561,7 @@ where
520561
.bind(queue)
521562
.bind(event_name)
522563
.bind(&payload_json)
523-
.execute(&self.pool)
564+
.execute(executor)
524565
.await?;
525566

526567
#[cfg(feature = "telemetry")]

tests/event_test.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,3 +590,106 @@ async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> {
590590

591591
Ok(())
592592
}
593+
594+
// ============================================================================
595+
// Transaction Tests for emit_event
596+
// ============================================================================
597+
598+
/// Test that emit_event_with in a committed transaction persists the event.
599+
#[sqlx::test(migrator = "MIGRATOR")]
600+
async fn test_emit_event_with_transaction_commit(pool: PgPool) -> sqlx::Result<()> {
601+
let client = create_client(pool.clone(), "event_tx_commit").await;
602+
client.create_queue(None).await.unwrap();
603+
604+
// Create a test table
605+
sqlx::query("CREATE TABLE test_event_orders (id UUID PRIMARY KEY, status TEXT)")
606+
.execute(&pool)
607+
.await?;
608+
609+
let order_id = uuid::Uuid::now_v7();
610+
let event_name = format!("order_created_{}", order_id);
611+
612+
// Start a transaction and do both operations
613+
let mut tx = pool.begin().await?;
614+
615+
sqlx::query("INSERT INTO test_event_orders (id, status) VALUES ($1, $2)")
616+
.bind(order_id)
617+
.bind("pending")
618+
.execute(&mut *tx)
619+
.await?;
620+
621+
client
622+
.emit_event_with(&mut *tx, &event_name, &json!({"order_id": order_id}), None)
623+
.await
624+
.expect("Failed to emit event in transaction");
625+
626+
tx.commit().await?;
627+
628+
// Verify both the order and event exist
629+
let order_exists: bool =
630+
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM test_event_orders WHERE id = $1)")
631+
.bind(order_id)
632+
.fetch_one(&pool)
633+
.await?;
634+
assert!(order_exists, "Order should exist after commit");
635+
636+
let event_exists: bool = sqlx::query_scalar(
637+
"SELECT EXISTS(SELECT 1 FROM durable.e_event_tx_commit WHERE event_name = $1)",
638+
)
639+
.bind(&event_name)
640+
.fetch_one(&pool)
641+
.await?;
642+
assert!(event_exists, "Event should exist after commit");
643+
644+
Ok(())
645+
}
646+
647+
/// Test that emit_event_with in a rolled back transaction does NOT persist the event.
648+
#[sqlx::test(migrator = "MIGRATOR")]
649+
async fn test_emit_event_with_transaction_rollback(pool: PgPool) -> sqlx::Result<()> {
650+
let client = create_client(pool.clone(), "event_tx_rollback").await;
651+
client.create_queue(None).await.unwrap();
652+
653+
// Create a test table
654+
sqlx::query("CREATE TABLE test_event_orders_rb (id UUID PRIMARY KEY, status TEXT)")
655+
.execute(&pool)
656+
.await?;
657+
658+
let order_id = uuid::Uuid::now_v7();
659+
let event_name = format!("order_created_{}", order_id);
660+
661+
// Start a transaction and do both operations, then rollback
662+
let mut tx = pool.begin().await?;
663+
664+
sqlx::query("INSERT INTO test_event_orders_rb (id, status) VALUES ($1, $2)")
665+
.bind(order_id)
666+
.bind("pending")
667+
.execute(&mut *tx)
668+
.await?;
669+
670+
client
671+
.emit_event_with(&mut *tx, &event_name, &json!({"order_id": order_id}), None)
672+
.await
673+
.expect("Failed to emit event in transaction");
674+
675+
// Rollback instead of commit
676+
tx.rollback().await?;
677+
678+
// Verify neither the order nor event exist
679+
let order_exists: bool =
680+
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM test_event_orders_rb WHERE id = $1)")
681+
.bind(order_id)
682+
.fetch_one(&pool)
683+
.await?;
684+
assert!(!order_exists, "Order should NOT exist after rollback");
685+
686+
let event_exists: bool = sqlx::query_scalar(
687+
"SELECT EXISTS(SELECT 1 FROM durable.e_event_tx_rollback WHERE event_name = $1)",
688+
)
689+
.bind(&event_name)
690+
.fetch_one(&pool)
691+
.await?;
692+
assert!(!event_exists, "Event should NOT exist after rollback");
693+
694+
Ok(())
695+
}

0 commit comments

Comments
 (0)