diff --git a/Cargo.lock b/Cargo.lock index 82422e9..2cc5da4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2066,7 +2066,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.30.1" +version = "0.30.4" dependencies = [ "anyhow", "arc-swap", @@ -2120,7 +2120,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.30.2" +version = "0.30.4" dependencies = [ "anyhow", "chrono", diff --git a/libs/opsqueue_python/Cargo.toml b/libs/opsqueue_python/Cargo.toml index c856d8a..51df4a2 100644 --- a/libs/opsqueue_python/Cargo.toml +++ b/libs/opsqueue_python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opsqueue_python" -version = "0.30.2" +version = "0.30.4" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/libs/opsqueue_python/opsqueue_python.nix b/libs/opsqueue_python/opsqueue_python.nix index dd06c91..e355fda 100644 --- a/libs/opsqueue_python/opsqueue_python.nix +++ b/libs/opsqueue_python/opsqueue_python.nix @@ -30,6 +30,7 @@ buildPythonPackage rec { ".toml" ".lock" ".db" + ".md" ]; }; diff --git a/opsqueue/Cargo.toml b/opsqueue/Cargo.toml index f210031..724d373 100644 --- a/opsqueue/Cargo.toml +++ b/opsqueue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opsqueue" -version = "0.30.1" +version = "0.30.4" edition = "2021" description = "lightweight batch processing queue for heavy loads" repository = "https://github.com/channable/opsqueue" diff --git a/opsqueue/migrations/20250416081453_track_chunk_size.down.sql b/opsqueue/migrations/20250416081453_track_chunk_size.down.sql new file mode 100644 index 0000000..c095e57 --- /dev/null +++ b/opsqueue/migrations/20250416081453_track_chunk_size.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE submissions DROP COLUMN chunk_size; +ALTER TABLE submissions_completed DROP COLUMN chunk_size; +ALTER TABLE submissions_failed DROP COLUMN chunk_size; diff --git a/opsqueue/migrations/20250416081453_track_chunk_size.sql b/opsqueue/migrations/20250416081453_track_chunk_size.up.sql similarity index 100% rename from opsqueue/migrations/20250416081453_track_chunk_size.sql rename to opsqueue/migrations/20250416081453_track_chunk_size.up.sql diff --git a/opsqueue/migrations/20250803174028_better_random_order_formula.down.sql b/opsqueue/migrations/20250803174028_better_random_order_formula.down.sql new file mode 100644 index 0000000..d1913ab --- /dev/null +++ b/opsqueue/migrations/20250803174028_better_random_order_formula.down.sql @@ -0,0 +1,38 @@ +-- 1. Drop all indexes and finally the column itself +DROP INDEX random_chunks_order; +DROP INDEX random_chunks_metadata_order; +DROP INDEX random_chunks_metadata_order2; +ALTER TABLE chunks DROP COLUMN random_order; +ALTER TABLE chunks_metadata DROP COLUMN random_order; + +-- 2. Recreate the column with its **old, bad** definition +ALTER TABLE chunks ADD COLUMN random_order INTEGER NOT NULL GENERATED ALWAYS AS ( + (((submission_id + chunk_index) % 65536) * 40503) % 65536 + ) VIRTUAL; +ALTER TABLE chunks_metadata ADD COLUMN random_order INTEGER NOT NULL GENERATED ALWAYS AS ( + (((submission_id + chunk_index) % 65536) * 40503) % 65536 +) VIRTUAL; + + +-- 3. Recreate all dropped indexes +CREATE INDEX random_chunks_order ON chunks ( + random_order + , submission_id + , chunk_index +); + +CREATE INDEX random_chunks_metadata_order ON chunks_metadata ( + metadata_key + , metadata_value + , random_order + , submission_id + , chunk_index +); + +CREATE INDEX random_chunks_metadata_order2 ON chunks_metadata ( + metadata_key + , random_order + , metadata_value + , submission_id + , chunk_index +); diff --git a/opsqueue/migrations/20250803174028_better_random_order_formula.up.sql b/opsqueue/migrations/20250803174028_better_random_order_formula.up.sql new file mode 100644 index 0000000..98cf1de --- /dev/null +++ b/opsqueue/migrations/20250803174028_better_random_order_formula.up.sql @@ -0,0 +1,43 @@ +-- 1. Drop all indexes and finally the column itself +DROP INDEX random_chunks_order; +DROP INDEX random_chunks_metadata_order; +DROP INDEX random_chunks_metadata_order2; +ALTER TABLE chunks DROP COLUMN random_order; +ALTER TABLE chunks_metadata DROP COLUMN random_order; + +-- 2. Recreate the column with its new, proper definition +-- +-- Compared to the OG definition, we ensure that the top 42 bits of `submission_id` +-- which contain the timestamp part of the snowflake, +-- always participate in the `random_order`, +-- since the lower 22 bits are likely to be `0` except when under peak load. +ALTER TABLE chunks ADD COLUMN random_order INTEGER NOT NULL GENERATED ALWAYS AS ( + (((submission_id + (submission_id >> 22) + chunk_index) % 65536) * 40503) % 65536 + ) VIRTUAL; +ALTER TABLE chunks_metadata ADD COLUMN random_order INTEGER NOT NULL GENERATED ALWAYS AS ( + (((submission_id + (submission_id >> 22) + chunk_index) % 65536) * 40503) % 65536 +) VIRTUAL; + + +-- 3. Recreate all dropped indexes +CREATE INDEX random_chunks_order ON chunks ( + random_order + , submission_id + , chunk_index +); + +CREATE INDEX random_chunks_metadata_order ON chunks_metadata ( + metadata_key + , metadata_value + , random_order + , submission_id + , chunk_index +); + +CREATE INDEX random_chunks_metadata_order2 ON chunks_metadata ( + metadata_key + , random_order + , metadata_value + , submission_id + , chunk_index +); diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index bae0b8c..bced270 100644 Binary files a/opsqueue/opsqueue_example_database_schema.db and b/opsqueue/opsqueue_example_database_schema.db differ diff --git a/opsqueue/src/consumer/strategy.rs b/opsqueue/src/consumer/strategy.rs index c975b5e..ff45e5a 100644 --- a/opsqueue/src/consumer/strategy.rs +++ b/opsqueue/src/consumer/strategy.rs @@ -99,7 +99,12 @@ impl Strategy { match self { Oldest => qb.push("\nORDER BY submission_id ASC"), Newest => qb.push("\nORDER BY submission_id DESC"), - Random => qb.push("\nORDER BY random_order ASC"), + Random => { + // It is **very** important that we do not apply extra sorting here. + // For the implementation of the 'cutting the deck' technique + // we rely on the order being 'whatever comes out of the UNION ALL' + qb + } PreferDistinct { .. } => { // **no** change in sort order. PreferDistinct passes the sort order on to the inner strategies that it unions. qb @@ -114,6 +119,9 @@ pub type ChunkStream<'a> = BoxStream<'a, Result>; #[cfg(test)] #[cfg(feature = "server-logic")] pub mod test { + use crate::common::chunk::ChunkSize; + use crate::common::StrategicMetadataMap; + use super::*; use itertools::Itertools; use sqlx::Row; @@ -180,11 +188,11 @@ pub mod test { assert_streaming_query(qb, &explained); insta::assert_snapshot!(explained, @r" - 1, 0, MERGE (UNION ALL) - 3, 1, LEFT - 7, 3, SEARCH chunks USING INDEX random_chunks_order (random_order>?) - 26, 1, RIGHT - 30, 26, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 22, 1, UNION ALL + 25, 22, SEARCH chunks USING INDEX random_chunks_order (random_order?) - 30, 5, RIGHT - 34, 30, SEARCH chunks USING INDEX random_chunks_order (random_order?) - 155, 130, RIGHT - 159, 155, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 16, 4, CORRELATED SCALAR SUBQUERY 5 + 20, 16, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 26, 16, LIST SUBQUERY 3 + 28, 26, SCAN json_each VIRTUAL TABLE INDEX 0: + 63, 3, UNION ALL + 66, 63, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 137, 125, CORRELATED SCALAR SUBQUERY 7 + 141, 137, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 147, 137, LIST SUBQUERY 3 + 149, 147, SCAN json_each VIRTUAL TABLE INDEX 0: + 184, 124, UNION ALL + 187, 184, SEARCH chunks USING INDEX random_chunks_order (random_order?) - 32, 7, RIGHT - 36, 32, SEARCH chunks USING INDEX random_chunks_order (random_order?) - 197, 172, RIGHT - 201, 197, SEARCH chunks USING INDEX random_chunks_order (random_order?) - 364, 339, RIGHT - 368, 364, SEARCH chunks USING INDEX random_chunks_order (random_order?) - 529, 504, RIGHT - 533, 529, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 18, 6, CORRELATED SCALAR SUBQUERY 5 + 22, 18, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 28, 18, LIST SUBQUERY 3 + 30, 28, SCAN json_each VIRTUAL TABLE INDEX 0: + 57, 6, CORRELATED SCALAR SUBQUERY 11 + 61, 57, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 67, 57, LIST SUBQUERY 9 + 69, 67, SCAN json_each VIRTUAL TABLE INDEX 0: + 104, 5, UNION ALL + 107, 104, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 217, 205, CORRELATED SCALAR SUBQUERY 7 + 221, 217, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 227, 217, LIST SUBQUERY 3 + 229, 227, SCAN json_each VIRTUAL TABLE INDEX 0: + 256, 205, CORRELATED SCALAR SUBQUERY 11 + 260, 256, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 266, 256, LIST SUBQUERY 9 + 268, 266, SCAN json_each VIRTUAL TABLE INDEX 0: + 303, 204, UNION ALL + 306, 303, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 418, 406, CORRELATED SCALAR SUBQUERY 5 + 422, 418, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 428, 418, LIST SUBQUERY 3 + 430, 428, SCAN json_each VIRTUAL TABLE INDEX 0: + 457, 406, CORRELATED SCALAR SUBQUERY 13 + 461, 457, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 467, 457, LIST SUBQUERY 9 + 469, 467, SCAN json_each VIRTUAL TABLE INDEX 0: + 504, 405, UNION ALL + 507, 504, SEARCH chunks USING INDEX random_chunks_order (random_order?) + 617, 605, CORRELATED SCALAR SUBQUERY 7 + 621, 617, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 627, 617, LIST SUBQUERY 3 + 629, 627, SCAN json_each VIRTUAL TABLE INDEX 0: + 656, 605, CORRELATED SCALAR SUBQUERY 13 + 660, 656, SEARCH submissions_metadata USING COVERING INDEX lookup_submission_by_metadata (metadata_key=? AND metadata_value=? AND submission_id=?) + 666, 656, LIST SUBQUERY 9 + 668, 666, SCAN json_each VIRTUAL TABLE INDEX 0: + 703, 604, UNION ALL + 706, 703, SEARCH chunks USING INDEX random_chunks_order (random_order = (0..10_000).map(|x| Some(format!("{x}").into())).collect(); + crate::common::submission::db::insert_submission_from_chunks( + None, + input_chunks.clone(), + None, + StrategicMetadataMap::default(), + ChunkSize::default(), + &mut conn, + ) + .await + .unwrap(); + + let mut conn = db_pools.reader_conn().await.unwrap(); + let mut query_builder = QueryBuilder::default(); + let vals1: Vec = Strategy::Random + .build_query(&mut query_builder, &Default::default()) + .build_query_as() + .fetch(conn.get_inner()) + .try_collect() + .await + .unwrap(); + + let mut query_builder = QueryBuilder::default(); + let vals2: Vec = Strategy::Random + .build_query(&mut query_builder, &Default::default()) + .build_query_as() + .fetch(conn.get_inner()) + .try_collect() + .await + .unwrap(); + + assert!(vals1 != vals2) + } }