From dff858d831b928f2644b2c0e7f8f5f81b249bdb5 Mon Sep 17 00:00:00 2001 From: Vlad Zagvozdkin Date: Thu, 25 Apr 2024 17:54:42 +0500 Subject: [PATCH 1/3] add max_retry_backoff option to job runner (Diggsey#19) Credits to https://github.com/StructionSite/sqlxmq/commit/0489b5178ae01ae828c7cbe69d60f298e3e71cb2 --- .../20240424183215_add_max_retry.down.sql | 61 +++++++++++++++++++ .../20240424183215_add_max_retry.up.sql | 61 +++++++++++++++++++ src/lib.rs | 52 ++++++++++++++++ src/runner.rs | 14 ++++- 4 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 migrations/20240424183215_add_max_retry.down.sql create mode 100644 migrations/20240424183215_add_max_retry.up.sql diff --git a/migrations/20240424183215_add_max_retry.down.sql b/migrations/20240424183215_add_max_retry.down.sql new file mode 100644 index 0000000..d8f3728 --- /dev/null +++ b/migrations/20240424183215_add_max_retry.down.sql @@ -0,0 +1,61 @@ +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/migrations/20240424183215_add_max_retry.up.sql b/migrations/20240424183215_add_max_retry.up.sql new file mode 100644 index 0000000..b8e067e --- /dev/null +++ b/migrations/20240424183215_add_max_retry.up.sql @@ -0,0 +1,61 @@ +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1, max_retry_text INTERVAL DEFAULT NULL) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = CASE WHEN max_retry_text IS NULL THEN mq_msgs.retry_backoff * 2 ELSE LEAST(mq_msgs.retry_backoff * 2, max_retry_text) END + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/src/lib.rs b/src/lib.rs index fc279e4..88a4537 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -558,6 +558,58 @@ mod tests { pause().await; } + #[tokio::test] + async fn it_uses_max_retry_backoff_correctly() { + { + let pool = &*test_pool().await; + + let backoff = default_pause() + 300; + + let counter = Arc::new(AtomicUsize::new(0)); + let counter2 = counter.clone(); + let _runner = JobRunnerOptions::new(pool, move |_job| { + counter2.fetch_add(1, Ordering::SeqCst); + }) + .set_max_retry_backoff(Duration::from_millis(backoff * 4)) + .run() + .await + .unwrap(); + + assert_eq!(counter.load(Ordering::SeqCst), 0); + JobBuilder::new("foo") + .set_retry_backoff(Duration::from_millis(backoff)) + .set_retries(4) + .spawn(pool) + .await + .unwrap(); + + // First attempt + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 1); + + // Second attempt + pause_ms(backoff).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + + // Third attempt + pause_ms(backoff * 2).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 3); + + // Fourth attempt + pause_ms(backoff * 4).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 4); + + // Fifth attempt with now constant pause + pause_ms(backoff * 4).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 5); + } + pause().await; + } + #[tokio::test] async fn it_can_checkpoint_jobs() { { diff --git a/src/runner.rs b/src/runner.rs index 056a67a..515d7ab 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -19,6 +19,7 @@ use crate::utils::{Opaque, OwnedHandle}; pub struct JobRunnerOptions { min_concurrency: usize, max_concurrency: usize, + max_retry_backoff: Option, channel_names: Option>, dispatch: Opaque>, pool: Pool, @@ -222,12 +223,22 @@ impl JobRunnerOptions { Self { min_concurrency: 16, max_concurrency: 32, + max_retry_backoff: None, channel_names: None, keep_alive: true, dispatch: Opaque(Arc::new(f)), pool: pool.clone(), } } + /// Set the max retry backoff for this job runner. If the initial retry backoff is 1, + /// and `max_retry_backoff` is 8, then delays between retries are going to be + /// from this sequence: 1, 2, 4, 8, 8, 8... + /// + /// Default is None, meaning no limit + pub fn set_max_retry_backoff(&mut self, duration: Duration) -> &mut Self { + self.max_retry_backoff = Some(duration); + self + } /// Set the concurrency limits for this job runner. When the number of active /// jobs falls below the minimum, the runner will poll for more, up to the maximum. /// @@ -403,9 +414,10 @@ async fn poll_and_dispatch( log::info!("Polling for messages"); let options = &job_runner.options; - let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)") + let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2, $3)") .bind(&options.channel_names) .bind(batch_size) + .bind(&options.max_retry_backoff) .fetch_all(&options.pool) .await?; From c3fa3a585f1202939e98616c6b5c22e70c1fd44d Mon Sep 17 00:00:00 2001 From: Emelie Graven Date: Thu, 8 Jan 2026 13:20:51 +0100 Subject: [PATCH 2/3] fix: Explicitly use uuid_nil on public schema --- migrations/20210316025847_setup.up.sql | 22 +++++++++---------- ...11013151757_fix_mq_latest_message.down.sql | 4 ++-- ...0211013151757_fix_mq_latest_message.up.sql | 4 ++-- ...0220208120856_fix_concurrent_poll.down.sql | 4 ++-- .../20220208120856_fix_concurrent_poll.up.sql | 4 ++-- ...2907_fix-clear_all-keep-nil-message.up.sql | 4 ++-- .../20240424183215_add_max_retry.down.sql | 4 ++-- .../20240424183215_add_max_retry.up.sql | 4 ++-- .../migrations/20210316025847_setup.up.sql | 22 +++++++++---------- ...11013151757_fix_mq_latest_message.down.sql | 4 ++-- ...0211013151757_fix_mq_latest_message.up.sql | 4 ++-- src/lib.rs | 2 +- 12 files changed, 41 insertions(+), 41 deletions(-) diff --git a/migrations/20210316025847_setup.up.sql b/migrations/20210316025847_setup.up.sql index bf7f8f8..af459b7 100644 --- a/migrations/20210316025847_setup.up.sql +++ b/migrations/20210316025847_setup.up.sql @@ -37,23 +37,23 @@ CREATE TABLE mq_msgs ( channel_name TEXT NOT NULL, channel_args TEXT NOT NULL, commit_interval INTERVAL, - after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT + after_message_id UUID DEFAULT public.uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT ); -- Insert dummy message so that the 'nil' UUID can be referenced -INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL); +INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (public.uuid_nil(), '', '', NULL); -- Internal helper function to check that a UUID is neither NULL nor NIL CREATE FUNCTION mq_uuid_exists( id UUID ) RETURNS BOOLEAN AS $$ - SELECT id IS NOT NULL AND id != uuid_nil() + SELECT id IS NOT NULL AND id != public.uuid_nil() $$ LANGUAGE SQL IMMUTABLE; -- Index for polling -CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id); +CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != public.uuid_nil() AND NOT mq_uuid_exists(after_message_id); -- Index for adding messages -CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; +CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != public.uuid_nil() AND after_message_id IS NOT NULL; -- Index for ensuring strict message order CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); @@ -76,11 +76,11 @@ RETURNS UUID AS $$ WHERE channel_name = from_channel_name AND channel_args = from_channel_args AND after_message_id IS NOT NULL - AND id != uuid_nil() + AND id != public.uuid_nil() ORDER BY created_at DESC, id DESC LIMIT 1 ), - uuid_nil() + public.uuid_nil() ) $$ LANGUAGE SQL STABLE; @@ -89,7 +89,7 @@ CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) RETURNS TABLE(name TEXT, args TEXT) AS $$ SELECT channel_name, channel_args FROM mq_msgs - WHERE id != uuid_nil() + WHERE id != public.uuid_nil() AND attempt_at <= NOW() AND (channel_names IS NULL OR channel_name = ANY(channel_names)) AND NOT mq_uuid_exists(after_message_id) @@ -121,7 +121,7 @@ BEGIN FROM mq_active_channels(channel_names, batch_size) AS active_channels INNER JOIN LATERAL ( SELECT * FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND mq_msgs.attempt_at <= NOW() AND mq_msgs.channel_name = active_channels.name AND mq_msgs.channel_args = active_channels.args @@ -152,7 +152,7 @@ BEGIN NULL::INTERVAL, MIN(mq_msgs.attempt_at) - NOW() FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND NOT mq_uuid_exists(mq_msgs.after_message_id) AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); END IF; @@ -234,7 +234,7 @@ BEGIN PERFORM pg_notify(CONCAT('mq_', channel_name), '') FROM mq_msgs WHERE id = ANY(msg_ids) - AND after_message_id = uuid_nil() + AND after_message_id = public.uuid_nil() GROUP BY channel_name; IF FOUND THEN diff --git a/migrations/20211013151757_fix_mq_latest_message.down.sql b/migrations/20211013151757_fix_mq_latest_message.down.sql index d09bd4a..a541abf 100644 --- a/migrations/20211013151757_fix_mq_latest_message.down.sql +++ b/migrations/20211013151757_fix_mq_latest_message.down.sql @@ -6,10 +6,10 @@ RETURNS UUID AS $$ WHERE channel_name = from_channel_name AND channel_args = from_channel_args AND after_message_id IS NOT NULL - AND id != uuid_nil() + AND id != public.uuid_nil() ORDER BY created_at DESC, id DESC LIMIT 1 ), - uuid_nil() + public.uuid_nil() ) $$ LANGUAGE SQL STABLE; diff --git a/migrations/20211013151757_fix_mq_latest_message.up.sql b/migrations/20211013151757_fix_mq_latest_message.up.sql index b987c5e..c9cb6d3 100644 --- a/migrations/20211013151757_fix_mq_latest_message.up.sql +++ b/migrations/20211013151757_fix_mq_latest_message.up.sql @@ -6,7 +6,7 @@ RETURNS UUID AS $$ WHERE channel_name = from_channel_name AND channel_args = from_channel_args AND after_message_id IS NOT NULL - AND id != uuid_nil() + AND id != public.uuid_nil() AND NOT EXISTS( SELECT * FROM mq_msgs AS mq_msgs2 WHERE mq_msgs2.after_message_id = mq_msgs.id @@ -14,6 +14,6 @@ RETURNS UUID AS $$ ORDER BY created_at DESC LIMIT 1 ), - uuid_nil() + public.uuid_nil() ) $$ LANGUAGE SQL STABLE; \ No newline at end of file diff --git a/migrations/20220208120856_fix_concurrent_poll.down.sql b/migrations/20220208120856_fix_concurrent_poll.down.sql index 6cd2d21..fd766c7 100644 --- a/migrations/20220208120856_fix_concurrent_poll.down.sql +++ b/migrations/20220208120856_fix_concurrent_poll.down.sql @@ -21,7 +21,7 @@ BEGIN FROM mq_active_channels(channel_names, batch_size) AS active_channels INNER JOIN LATERAL ( SELECT * FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND mq_msgs.attempt_at <= NOW() AND mq_msgs.channel_name = active_channels.name AND mq_msgs.channel_args = active_channels.args @@ -52,7 +52,7 @@ BEGIN NULL::INTERVAL, MIN(mq_msgs.attempt_at) - NOW() FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND NOT mq_uuid_exists(mq_msgs.after_message_id) AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); END IF; diff --git a/migrations/20220208120856_fix_concurrent_poll.up.sql b/migrations/20220208120856_fix_concurrent_poll.up.sql index cae6151..8cb0cc1 100644 --- a/migrations/20220208120856_fix_concurrent_poll.up.sql +++ b/migrations/20220208120856_fix_concurrent_poll.up.sql @@ -22,7 +22,7 @@ BEGIN FROM mq_active_channels(channel_names, batch_size) AS active_channels INNER JOIN LATERAL ( SELECT mq_msgs.id FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND mq_msgs.attempt_at <= NOW() AND mq_msgs.channel_name = active_channels.name AND mq_msgs.channel_args = active_channels.args @@ -54,7 +54,7 @@ BEGIN NULL::INTERVAL, MIN(mq_msgs.attempt_at) - NOW() FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND NOT mq_uuid_exists(mq_msgs.after_message_id) AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); END IF; diff --git a/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql b/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql index 4dd1f0b..311a63a 100644 --- a/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql +++ b/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql @@ -4,7 +4,7 @@ BEGIN WITH deleted_ids AS ( DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) - AND id != uuid_nil() + AND id != public.uuid_nil() RETURNING id ) DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); @@ -19,7 +19,7 @@ RETURNS VOID AS $$ BEGIN WITH deleted_ids AS ( DELETE FROM mq_msgs - WHERE id != uuid_nil() + WHERE id != public.uuid_nil() RETURNING id ) DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); diff --git a/migrations/20240424183215_add_max_retry.down.sql b/migrations/20240424183215_add_max_retry.down.sql index d8f3728..ba42fe0 100644 --- a/migrations/20240424183215_add_max_retry.down.sql +++ b/migrations/20240424183215_add_max_retry.down.sql @@ -21,7 +21,7 @@ BEGIN FROM mq_active_channels(channel_names, batch_size) AS active_channels INNER JOIN LATERAL ( SELECT mq_msgs.id FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND mq_msgs.attempt_at <= NOW() AND mq_msgs.channel_name = active_channels.name AND mq_msgs.channel_args = active_channels.args @@ -53,7 +53,7 @@ BEGIN NULL::INTERVAL, MIN(mq_msgs.attempt_at) - NOW() FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND NOT mq_uuid_exists(mq_msgs.after_message_id) AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); END IF; diff --git a/migrations/20240424183215_add_max_retry.up.sql b/migrations/20240424183215_add_max_retry.up.sql index b8e067e..82700d9 100644 --- a/migrations/20240424183215_add_max_retry.up.sql +++ b/migrations/20240424183215_add_max_retry.up.sql @@ -21,7 +21,7 @@ BEGIN FROM mq_active_channels(channel_names, batch_size) AS active_channels INNER JOIN LATERAL ( SELECT mq_msgs.id FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND mq_msgs.attempt_at <= NOW() AND mq_msgs.channel_name = active_channels.name AND mq_msgs.channel_args = active_channels.args @@ -53,7 +53,7 @@ BEGIN NULL::INTERVAL, MIN(mq_msgs.attempt_at) - NOW() FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND NOT mq_uuid_exists(mq_msgs.after_message_id) AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); END IF; diff --git a/sqlxmq_stress/migrations/20210316025847_setup.up.sql b/sqlxmq_stress/migrations/20210316025847_setup.up.sql index bf7f8f8..af459b7 100644 --- a/sqlxmq_stress/migrations/20210316025847_setup.up.sql +++ b/sqlxmq_stress/migrations/20210316025847_setup.up.sql @@ -37,23 +37,23 @@ CREATE TABLE mq_msgs ( channel_name TEXT NOT NULL, channel_args TEXT NOT NULL, commit_interval INTERVAL, - after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT + after_message_id UUID DEFAULT public.uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT ); -- Insert dummy message so that the 'nil' UUID can be referenced -INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL); +INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (public.uuid_nil(), '', '', NULL); -- Internal helper function to check that a UUID is neither NULL nor NIL CREATE FUNCTION mq_uuid_exists( id UUID ) RETURNS BOOLEAN AS $$ - SELECT id IS NOT NULL AND id != uuid_nil() + SELECT id IS NOT NULL AND id != public.uuid_nil() $$ LANGUAGE SQL IMMUTABLE; -- Index for polling -CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id); +CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != public.uuid_nil() AND NOT mq_uuid_exists(after_message_id); -- Index for adding messages -CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; +CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != public.uuid_nil() AND after_message_id IS NOT NULL; -- Index for ensuring strict message order CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); @@ -76,11 +76,11 @@ RETURNS UUID AS $$ WHERE channel_name = from_channel_name AND channel_args = from_channel_args AND after_message_id IS NOT NULL - AND id != uuid_nil() + AND id != public.uuid_nil() ORDER BY created_at DESC, id DESC LIMIT 1 ), - uuid_nil() + public.uuid_nil() ) $$ LANGUAGE SQL STABLE; @@ -89,7 +89,7 @@ CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) RETURNS TABLE(name TEXT, args TEXT) AS $$ SELECT channel_name, channel_args FROM mq_msgs - WHERE id != uuid_nil() + WHERE id != public.uuid_nil() AND attempt_at <= NOW() AND (channel_names IS NULL OR channel_name = ANY(channel_names)) AND NOT mq_uuid_exists(after_message_id) @@ -121,7 +121,7 @@ BEGIN FROM mq_active_channels(channel_names, batch_size) AS active_channels INNER JOIN LATERAL ( SELECT * FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND mq_msgs.attempt_at <= NOW() AND mq_msgs.channel_name = active_channels.name AND mq_msgs.channel_args = active_channels.args @@ -152,7 +152,7 @@ BEGIN NULL::INTERVAL, MIN(mq_msgs.attempt_at) - NOW() FROM mq_msgs - WHERE mq_msgs.id != uuid_nil() + WHERE mq_msgs.id != public.uuid_nil() AND NOT mq_uuid_exists(mq_msgs.after_message_id) AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); END IF; @@ -234,7 +234,7 @@ BEGIN PERFORM pg_notify(CONCAT('mq_', channel_name), '') FROM mq_msgs WHERE id = ANY(msg_ids) - AND after_message_id = uuid_nil() + AND after_message_id = public.uuid_nil() GROUP BY channel_name; IF FOUND THEN diff --git a/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.down.sql b/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.down.sql index d09bd4a..a541abf 100644 --- a/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.down.sql +++ b/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.down.sql @@ -6,10 +6,10 @@ RETURNS UUID AS $$ WHERE channel_name = from_channel_name AND channel_args = from_channel_args AND after_message_id IS NOT NULL - AND id != uuid_nil() + AND id != public.uuid_nil() ORDER BY created_at DESC, id DESC LIMIT 1 ), - uuid_nil() + public.uuid_nil() ) $$ LANGUAGE SQL STABLE; diff --git a/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.up.sql b/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.up.sql index b987c5e..c9cb6d3 100644 --- a/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.up.sql +++ b/sqlxmq_stress/migrations/20211013151757_fix_mq_latest_message.up.sql @@ -6,7 +6,7 @@ RETURNS UUID AS $$ WHERE channel_name = from_channel_name AND channel_args = from_channel_args AND after_message_id IS NOT NULL - AND id != uuid_nil() + AND id != public.uuid_nil() AND NOT EXISTS( SELECT * FROM mq_msgs AS mq_msgs2 WHERE mq_msgs2.after_message_id = mq_msgs.id @@ -14,6 +14,6 @@ RETURNS UUID AS $$ ORDER BY created_at DESC LIMIT 1 ), - uuid_nil() + public.uuid_nil() ) $$ LANGUAGE SQL STABLE; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 88a4537..9e998f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -308,7 +308,7 @@ mod tests { .execute(&pool) .await .unwrap(); - sqlx::query("DELETE FROM mq_msgs WHERE id != uuid_nil()") + sqlx::query("DELETE FROM mq_msgs WHERE id != public.uuid_nil()") .execute(&pool) .await .unwrap(); From 47dfe13fa7f885bf5d0bd46ee6488eb8a2b8de49 Mon Sep 17 00:00:00 2001 From: Emelie Graven Date: Thu, 8 Jan 2026 13:21:13 +0100 Subject: [PATCH 3/3] release: v0.6.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b8faf10..7a40805 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq" -version = "0.6.0" +version = "0.6.1" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0"