Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlxmq"
version = "0.6.0"
version = "0.6.1"
authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand Down
22 changes: 11 additions & 11 deletions migrations/20210316025847_setup.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ CREATE TABLE mq_msgs (
channel_name TEXT NOT NULL,
channel_args TEXT NOT NULL,
commit_interval INTERVAL,
after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT
after_message_id UUID DEFAULT public.uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT
);

-- Insert dummy message so that the 'nil' UUID can be referenced
INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL);
INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (public.uuid_nil(), '', '', NULL);

-- Internal helper function to check that a UUID is neither NULL nor NIL
CREATE FUNCTION mq_uuid_exists(
id UUID
) RETURNS BOOLEAN AS $$
SELECT id IS NOT NULL AND id != uuid_nil()
SELECT id IS NOT NULL AND id != public.uuid_nil()
$$ LANGUAGE SQL IMMUTABLE;

-- Index for polling
CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id);
CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != public.uuid_nil() AND NOT mq_uuid_exists(after_message_id);
-- Index for adding messages
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != public.uuid_nil() AND after_message_id IS NOT NULL;

-- Index for ensuring strict message order
CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
Expand All @@ -76,11 +76,11 @@ RETURNS UUID AS $$
WHERE channel_name = from_channel_name
AND channel_args = from_channel_args
AND after_message_id IS NOT NULL
AND id != uuid_nil()
AND id != public.uuid_nil()
ORDER BY created_at DESC, id DESC
LIMIT 1
),
uuid_nil()
public.uuid_nil()
)
$$ LANGUAGE SQL STABLE;

Expand All @@ -89,7 +89,7 @@ CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT)
RETURNS TABLE(name TEXT, args TEXT) AS $$
SELECT channel_name, channel_args
FROM mq_msgs
WHERE id != uuid_nil()
WHERE id != public.uuid_nil()
AND attempt_at <= NOW()
AND (channel_names IS NULL OR channel_name = ANY(channel_names))
AND NOT mq_uuid_exists(after_message_id)
Expand Down Expand Up @@ -121,7 +121,7 @@ BEGIN
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT * FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
Expand Down Expand Up @@ -152,7 +152,7 @@ BEGIN
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
Expand Down Expand Up @@ -234,7 +234,7 @@ BEGIN
PERFORM pg_notify(CONCAT('mq_', channel_name), '')
FROM mq_msgs
WHERE id = ANY(msg_ids)
AND after_message_id = uuid_nil()
AND after_message_id = public.uuid_nil()
GROUP BY channel_name;

IF FOUND THEN
Expand Down
4 changes: 2 additions & 2 deletions migrations/20211013151757_fix_mq_latest_message.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ RETURNS UUID AS $$
WHERE channel_name = from_channel_name
AND channel_args = from_channel_args
AND after_message_id IS NOT NULL
AND id != uuid_nil()
AND id != public.uuid_nil()
ORDER BY created_at DESC, id DESC
LIMIT 1
),
uuid_nil()
public.uuid_nil()
)
$$ LANGUAGE SQL STABLE;
4 changes: 2 additions & 2 deletions migrations/20211013151757_fix_mq_latest_message.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ RETURNS UUID AS $$
WHERE channel_name = from_channel_name
AND channel_args = from_channel_args
AND after_message_id IS NOT NULL
AND id != uuid_nil()
AND id != public.uuid_nil()
AND NOT EXISTS(
SELECT * FROM mq_msgs AS mq_msgs2
WHERE mq_msgs2.after_message_id = mq_msgs.id
)
ORDER BY created_at DESC
LIMIT 1
),
uuid_nil()
public.uuid_nil()
)
$$ LANGUAGE SQL STABLE;
4 changes: 2 additions & 2 deletions migrations/20220208120856_fix_concurrent_poll.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BEGIN
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT * FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
Expand Down Expand Up @@ -52,7 +52,7 @@ BEGIN
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
Expand Down
4 changes: 2 additions & 2 deletions migrations/20220208120856_fix_concurrent_poll.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ BEGIN
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
Expand Down Expand Up @@ -54,7 +54,7 @@ BEGIN
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs
WHERE channel_name = ANY(channel_names)
AND id != uuid_nil()
AND id != public.uuid_nil()
RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
Expand All @@ -19,7 +19,7 @@ RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs
WHERE id != uuid_nil()
WHERE id != public.uuid_nil()
RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
Expand Down
61 changes: 61 additions & 0 deletions migrations/20240424183215_add_max_retry.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = mq_msgs.retry_backoff * 2
FROM (
SELECT
msgs.id
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != public.uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
AND mq_msgs.attempt_at <= NOW()
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;

IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != public.uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;
61 changes: 61 additions & 0 deletions migrations/20240424183215_add_max_retry.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1, max_retry_text INTERVAL DEFAULT NULL)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = CASE WHEN max_retry_text IS NULL THEN mq_msgs.retry_backoff * 2 ELSE LEAST(mq_msgs.retry_backoff * 2, max_retry_text) END
FROM (
SELECT
msgs.id
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != public.uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
AND mq_msgs.attempt_at <= NOW()
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;

IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != public.uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;
22 changes: 11 additions & 11 deletions sqlxmq_stress/migrations/20210316025847_setup.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ CREATE TABLE mq_msgs (
channel_name TEXT NOT NULL,
channel_args TEXT NOT NULL,
commit_interval INTERVAL,
after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT
after_message_id UUID DEFAULT public.uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT
);

-- Insert dummy message so that the 'nil' UUID can be referenced
INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL);
INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (public.uuid_nil(), '', '', NULL);

-- Internal helper function to check that a UUID is neither NULL nor NIL
CREATE FUNCTION mq_uuid_exists(
id UUID
) RETURNS BOOLEAN AS $$
SELECT id IS NOT NULL AND id != uuid_nil()
SELECT id IS NOT NULL AND id != public.uuid_nil()
$$ LANGUAGE SQL IMMUTABLE;

-- Index for polling
CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id);
CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != public.uuid_nil() AND NOT mq_uuid_exists(after_message_id);
-- Index for adding messages
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != public.uuid_nil() AND after_message_id IS NOT NULL;

-- Index for ensuring strict message order
CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
Expand All @@ -76,11 +76,11 @@ RETURNS UUID AS $$
WHERE channel_name = from_channel_name
AND channel_args = from_channel_args
AND after_message_id IS NOT NULL
AND id != uuid_nil()
AND id != public.uuid_nil()
ORDER BY created_at DESC, id DESC
LIMIT 1
),
uuid_nil()
public.uuid_nil()
)
$$ LANGUAGE SQL STABLE;

Expand All @@ -89,7 +89,7 @@ CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT)
RETURNS TABLE(name TEXT, args TEXT) AS $$
SELECT channel_name, channel_args
FROM mq_msgs
WHERE id != uuid_nil()
WHERE id != public.uuid_nil()
AND attempt_at <= NOW()
AND (channel_names IS NULL OR channel_name = ANY(channel_names))
AND NOT mq_uuid_exists(after_message_id)
Expand Down Expand Up @@ -121,7 +121,7 @@ BEGIN
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT * FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
Expand Down Expand Up @@ -152,7 +152,7 @@ BEGIN
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
WHERE mq_msgs.id != public.uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
Expand Down Expand Up @@ -234,7 +234,7 @@ BEGIN
PERFORM pg_notify(CONCAT('mq_', channel_name), '')
FROM mq_msgs
WHERE id = ANY(msg_ids)
AND after_message_id = uuid_nil()
AND after_message_id = public.uuid_nil()
GROUP BY channel_name;

IF FOUND THEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ RETURNS UUID AS $$
WHERE channel_name = from_channel_name
AND channel_args = from_channel_args
AND after_message_id IS NOT NULL
AND id != uuid_nil()
AND id != public.uuid_nil()
ORDER BY created_at DESC, id DESC
LIMIT 1
),
uuid_nil()
public.uuid_nil()
)
$$ LANGUAGE SQL STABLE;
Loading
Loading