diff --git a/db/migrate/20251008141931_replace_md5_with_hashtext_in_que_job_notify.rb b/db/migrate/20251008141931_replace_md5_with_hashtext_in_que_job_notify.rb new file mode 100644 index 00000000..0b512d77 --- /dev/null +++ b/db/migrate/20251008141931_replace_md5_with_hashtext_in_que_job_notify.rb @@ -0,0 +1,134 @@ +class ReplaceMd5WithHashtextInQueJobNotify < ActiveRecord::Migration[7.0] + # This fixes https://github.com/que-rb/que/pull/437 + # Be careful on Que upgrade in case the final fix differed. + + def up + Que.transaction do + Que.execute <<~SQL + CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$ + DECLARE + locker_pid integer; + sort_key json; + BEGIN + -- Don't do anything if the job is scheduled for a future time. + IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN + RETURN null; + END IF; + + -- Pick a locker to notify of the job's insertion, weighted by their number + -- of workers. Should bounce pseudorandomly between lockers on each + -- invocation, hence the hashtext-ordering, but still touch each one equally, + -- hence the modulo using the job_id. + SELECT pid + INTO locker_pid + FROM ( + SELECT *, last_value(row_number) OVER () + 1 AS count + FROM ( + SELECT *, row_number() OVER () - 1 AS row_number + FROM ( + SELECT * + FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id + WHERE + listening AND + queues @> ARRAY[NEW.queue] AND + ql.job_schema_version = NEW.job_schema_version + ORDER BY hashtext(pid::text || id::text) + ) t1 + ) t2 + ) t3 + WHERE NEW.id % count = row_number; + + IF locker_pid IS NOT NULL THEN + -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so + -- rather than throw errors when someone enqueues a big job, just + -- broadcast the most pertinent information, and let the locker query for + -- the record after it's taken the lock. The worker will have to hit the + -- DB in order to make sure the job is still visible anyway. + SELECT row_to_json(t) + INTO sort_key + FROM ( + SELECT + 'job_available' AS message_type, + NEW.queue AS queue, + NEW.priority AS priority, + NEW.id AS id, + -- Make sure we output timestamps as UTC ISO 8601 + to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at + ) t; + + PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); + END IF; + + RETURN null; + END + $$ + LANGUAGE plpgsql; + SQL + end + end + + def down + Que.transaction do + Que.execute <<~SQL + CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$ + DECLARE + locker_pid integer; + sort_key json; + BEGIN + -- Don't do anything if the job is scheduled for a future time. + IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN + RETURN null; + END IF; + + -- Pick a locker to notify of the job's insertion, weighted by their number + -- of workers. Should bounce pseudorandomly between lockers on each + -- invocation, hence the md5-ordering, but still touch each one equally, + -- hence the modulo using the job_id. + SELECT pid + INTO locker_pid + FROM ( + SELECT *, last_value(row_number) OVER () + 1 AS count + FROM ( + SELECT *, row_number() OVER () - 1 AS row_number + FROM ( + SELECT * + FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id + WHERE + listening AND + queues @> ARRAY[NEW.queue] AND + ql.job_schema_version = NEW.job_schema_version + ORDER BY md5(pid::text || id::text) + ) t1 + ) t2 + ) t3 + WHERE NEW.id % count = row_number; + + IF locker_pid IS NOT NULL THEN + -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so + -- rather than throw errors when someone enqueues a big job, just + -- broadcast the most pertinent information, and let the locker query for + -- the record after it's taken the lock. The worker will have to hit the + -- DB in order to make sure the job is still visible anyway. + SELECT row_to_json(t) + INTO sort_key + FROM ( + SELECT + 'job_available' AS message_type, + NEW.queue AS queue, + NEW.priority AS priority, + NEW.id AS id, + -- Make sure we output timestamps as UTC ISO 8601 + to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at + ) t; + + PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); + END IF; + + RETURN null; + END + $$ + LANGUAGE plpgsql; + SQL + end + end +end diff --git a/db/structure-10.sql b/db/structure-10.sql index de6716bb..acb44596 100644 --- a/db/structure-10.sql +++ b/db/structure-10.sql @@ -112,7 +112,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger -- Pick a locker to notify of the job's insertion, weighted by their number -- of workers. Should bounce pseudorandomly between lockers on each - -- invocation, hence the md5-ordering, but still touch each one equally, + -- invocation, hence the hashtext-ordering, but still touch each one equally, -- hence the modulo using the job_id. SELECT pid INTO locker_pid @@ -127,7 +127,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger listening AND queues @> ARRAY[NEW.queue] AND ql.job_schema_version = NEW.job_schema_version - ORDER BY md5(pid::text || id::text) + ORDER BY hashtext(pid::text || id::text) ) t1 ) t2 ) t3 @@ -1527,6 +1527,7 @@ INSERT INTO "schema_migrations" (version) VALUES ('20230629131935'), ('20230703133544'), ('20230703134109'), -('20230704131552'); +('20230704131552'), +('20251008141931'); diff --git a/db/structure.sql b/db/structure.sql index d24d2102..7cb27aa3 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -114,7 +114,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger -- Pick a locker to notify of the job's insertion, weighted by their number -- of workers. Should bounce pseudorandomly between lockers on each - -- invocation, hence the md5-ordering, but still touch each one equally, + -- invocation, hence the hashtext-ordering, but still touch each one equally, -- hence the modulo using the job_id. SELECT pid INTO locker_pid @@ -129,7 +129,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger listening AND queues @> ARRAY[NEW.queue] AND ql.job_schema_version = NEW.job_schema_version - ORDER BY md5(pid::text || id::text) + ORDER BY hashtext(pid::text || id::text) ) t1 ) t2 ) t3 @@ -1529,6 +1529,7 @@ INSERT INTO "schema_migrations" (version) VALUES ('20230629131935'), ('20230703133544'), ('20230703134109'), -('20230704131552'); +('20230704131552'), +('20251008141931');