Skip to content

Commit e62e44f

Browse files
committed
Allow allocating ids that are unique in multiple allocators
1 parent a5a58f1 commit e62e44f

File tree

5 files changed

+61
-47
lines changed

5 files changed

+61
-47
lines changed

src/adapter/src/catalog/open.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,6 @@ impl Catalog {
204204
txn.set_system_config_synced_once()?;
205205
}
206206
// Add any new builtin objects and remove old ones.
207-
let new_builtin_collections =
208-
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
209207
let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
210208
system_cluster: config.builtin_system_cluster_config,
211209
catalog_server_cluster: config.builtin_catalog_server_cluster_config,
@@ -217,11 +215,14 @@ impl Catalog {
217215
&mut txn,
218216
&builtin_bootstrap_cluster_config_map,
219217
)?;
220-
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
221218
add_new_remove_old_builtin_cluster_replicas_migration(
222219
&mut txn,
223220
&builtin_bootstrap_cluster_config_map,
224221
)?;
222+
223+
let new_builtin_collections =
224+
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
225+
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
225226
add_new_remove_old_builtin_roles_migration(&mut txn)?;
226227
remove_invalid_config_param_role_defaults_migration(&mut txn)?;
227228
remove_pending_cluster_replicas_migration(&mut txn)?;

src/catalog/src/durable.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
6161
pub const USER_ITEM_ALLOC_KEY: &str = "user";
6262
pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
6363
pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64-
pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65-
pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66-
pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67-
pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
6864
pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
6965
pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
7066
pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
@@ -74,6 +70,13 @@ pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
7470
pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
7571
pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
7672

73+
// Note: these ID types are generally merged with the main system and user item keys,
74+
// but are kept separate here for backwards-compatibility reasons.
75+
pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
76+
pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
77+
pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
78+
pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
79+
7780
#[derive(Clone, Debug)]
7881
pub struct BootstrapArgs {
7982
pub cluster_replica_size_map: ClusterReplicaSizeMap,
@@ -319,15 +322,15 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
319322
#[mz_ore::instrument(level = "debug")]
320323
async fn allocate_id(
321324
&mut self,
322-
id_type: &str,
325+
id_type: &[&str],
323326
amount: u64,
324327
commit_ts: Timestamp,
325328
) -> Result<Vec<u64>, CatalogError> {
326329
if amount == 0 {
327330
return Ok(Vec::new());
328331
}
329332
let mut txn = self.transaction().await?;
330-
let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
333+
let ids = txn.get_and_increment_id_by(id_type, amount)?;
331334
txn.commit_internal(commit_ts).await?;
332335
Ok(ids)
333336
}
@@ -341,7 +344,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
341344
commit_ts: Timestamp,
342345
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
343346
let ids = self
344-
.allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
347+
.allocate_id(&[USER_ITEM_ALLOC_KEY], amount, commit_ts)
345348
.await?;
346349
let ids = ids
347350
.iter()
@@ -357,7 +360,9 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
357360
&mut self,
358361
commit_ts: Timestamp,
359362
) -> Result<(CatalogItemId, GlobalId), CatalogError> {
360-
let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
363+
let id = self
364+
.allocate_id(&[USER_ITEM_ALLOC_KEY], 1, commit_ts)
365+
.await?;
361366
let id = id.into_element();
362367
Ok((CatalogItemId::User(id), GlobalId::User(id)))
363368
}
@@ -370,7 +375,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
370375
commit_ts: Timestamp,
371376
) -> Result<ClusterId, CatalogError> {
372377
let id = self
373-
.allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
378+
.allocate_id(&[USER_CLUSTER_ID_ALLOC_KEY], 1, commit_ts)
374379
.await?
375380
.into_element();
376381
Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)

src/catalog/src/durable/initialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ pub(crate) async fn initialize(
602602
// We created a network policy with a prefined ID user(1) and OID. We need
603603
// to increment the id alloc key. It should be safe to assume that there's
604604
// no user(1), as a sanity check, we'll assert this is the case.
605-
let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
605+
let id = tx.get_and_increment_id(&[USER_NETWORK_POLICY_ID_ALLOC_KEY])?;
606606
assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));
607607

608608
audit_events.extend([(
@@ -720,7 +720,7 @@ pub(crate) async fn initialize(
720720
// Allocate an ID for each audit log event.
721721
let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
722722
for (ty, obj, details) in audit_events {
723-
let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
723+
let id = tx.get_and_increment_id(&[AUDIT_LOG_ID_ALLOC_KEY])?;
724724
audit_events_with_id.push((id, ty, obj, details));
725725
}
726726

src/catalog/src/durable/transaction.rs

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl<'a> Transaction<'a> {
230230
privileges: Vec<MzAclItem>,
231231
temporary_oids: &HashSet<u32>,
232232
) -> Result<(DatabaseId, u32), CatalogError> {
233-
let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
233+
let id = self.get_and_increment_id(&[DATABASE_ID_ALLOC_KEY])?;
234234
let id = DatabaseId::User(id);
235235
let oid = self.allocate_oid(temporary_oids)?;
236236
self.insert_database(id, database_name, owner_id, privileges, oid)?;
@@ -268,7 +268,7 @@ impl<'a> Transaction<'a> {
268268
privileges: Vec<MzAclItem>,
269269
temporary_oids: &HashSet<u32>,
270270
) -> Result<(SchemaId, u32), CatalogError> {
271-
let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
271+
let id = self.get_and_increment_id(&[SCHEMA_ID_ALLOC_KEY])?;
272272
let id = SchemaId::User(id);
273273
let oid = self.allocate_oid(temporary_oids)?;
274274
self.insert_schema(
@@ -341,7 +341,7 @@ impl<'a> Transaction<'a> {
341341
vars: RoleVars,
342342
temporary_oids: &HashSet<u32>,
343343
) -> Result<(RoleId, u32), CatalogError> {
344-
let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
344+
let id = self.get_and_increment_id(&[USER_ROLE_ID_ALLOC_KEY])?;
345345
let id = RoleId::User(id);
346346
let oid = self.allocate_oid(temporary_oids)?;
347347
self.insert_role(id, name, attributes, membership, vars, oid)?;
@@ -436,7 +436,7 @@ impl<'a> Transaction<'a> {
436436
config: ClusterConfig,
437437
temporary_oids: &HashSet<u32>,
438438
) -> Result<(), CatalogError> {
439-
let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
439+
let cluster_id = self.get_and_increment_id(&[SYSTEM_CLUSTER_ID_ALLOC_KEY])?;
440440
let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441441
self.insert_cluster(
442442
cluster_id,
@@ -615,7 +615,7 @@ impl<'a> Transaction<'a> {
615615
temporary_oids: &HashSet<u32>,
616616
) -> Result<NetworkPolicyId, CatalogError> {
617617
let oid = self.allocate_oid(temporary_oids)?;
618-
let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
618+
let id = self.get_and_increment_id(&[USER_NETWORK_POLICY_ID_ALLOC_KEY])?;
619619
let id = NetworkPolicyId::User(id);
620620
self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621621
}
@@ -733,40 +733,48 @@ impl<'a> Transaction<'a> {
733733
}
734734
}
735735

736-
pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737-
Ok(self.get_and_increment_id_by(key, 1)?.into_element())
736+
pub fn get_and_increment_id(&mut self, keys: &[&str]) -> Result<u64, CatalogError> {
737+
Ok(self.get_and_increment_id_by(keys, 1)?.into_element())
738738
}
739739

740740
pub fn get_and_increment_id_by(
741741
&mut self,
742-
key: String,
742+
keys: &[&str],
743743
amount: u64,
744744
) -> Result<Vec<u64>, CatalogError> {
745+
let items = self.id_allocator.items();
746+
745747
assert!(
746-
key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
748+
keys != &[SYSTEM_ITEM_ALLOC_KEY] || !self.durable_catalog.is_bootstrap_complete(),
747749
"system item IDs cannot be allocated outside of bootstrap"
748750
);
749751

750-
let current_id = self
751-
.id_allocator
752-
.items()
753-
.get(&IdAllocKey { name: key.clone() })
754-
.unwrap_or_else(|| panic!("{key} id allocator missing"))
755-
.next_id;
752+
let mut current_id = 1;
753+
for key in keys {
754+
let next_id = items
755+
.get(&IdAllocKey {
756+
name: key.to_string(),
757+
})
758+
.unwrap_or_else(|| panic!("{key} id allocator missing"))
759+
.next_id;
760+
761+
current_id = current_id.max(next_id)
762+
}
763+
756764
let next_id = current_id
757765
.checked_add(amount)
758766
.ok_or(SqlCatalogError::IdExhaustion)?;
759-
let prev = self.id_allocator.set(
760-
IdAllocKey { name: key },
761-
Some(IdAllocValue { next_id }),
762-
self.op_id,
763-
)?;
764-
assert_eq!(
765-
prev,
766-
Some(IdAllocValue {
767-
next_id: current_id
768-
})
769-
);
767+
for key in keys {
768+
let prev = self.id_allocator.set(
769+
IdAllocKey {
770+
name: key.to_string(),
771+
},
772+
Some(IdAllocValue { next_id }),
773+
self.op_id,
774+
)?;
775+
assert!(prev.is_some_and(|value| value.next_id <= current_id),);
776+
}
777+
770778
Ok((current_id..next_id).collect())
771779
}
772780

@@ -779,7 +787,7 @@ impl<'a> Transaction<'a> {
779787
"we can only allocate system item IDs during bootstrap"
780788
);
781789
Ok(self
782-
.get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
790+
.get_and_increment_id_by(&[SYSTEM_ITEM_ALLOC_KEY], amount)?
783791
.into_iter()
784792
// TODO(alter_table): Use separate ID allocators.
785793
.map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
@@ -882,29 +890,29 @@ impl<'a> Transaction<'a> {
882890
amount: u64,
883891
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
884892
Ok(self
885-
.get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
893+
.get_and_increment_id_by(&[USER_ITEM_ALLOC_KEY], amount)?
886894
.into_iter()
887895
// TODO(alter_table): Use separate ID allocators.
888896
.map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
889897
.collect())
890898
}
891899

892900
pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
893-
let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
901+
let id = self.get_and_increment_id(&[USER_REPLICA_ID_ALLOC_KEY])?;
894902
Ok(ReplicaId::User(id))
895903
}
896904

897905
pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
898-
let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
906+
let id = self.get_and_increment_id(&[SYSTEM_REPLICA_ID_ALLOC_KEY])?;
899907
Ok(ReplicaId::System(id))
900908
}
901909

902910
pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
903-
self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
911+
self.get_and_increment_id(&[AUDIT_LOG_ID_ALLOC_KEY])
904912
}
905913

906914
pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
907-
self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
915+
self.get_and_increment_id(&[STORAGE_USAGE_ID_ALLOC_KEY])
908916
}
909917

910918
/// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any

src/catalog/tests/read-write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ async fn test_allocate_id(state_builder: TestCatalogStateBuilder) {
9898

9999
let start_id = state.get_next_id(id_type).await.unwrap();
100100
let commit_ts = state.current_upper().await;
101-
let ids = state.allocate_id(id_type, 3, commit_ts).await.unwrap();
101+
let ids = state.allocate_id(&[id_type], 3, commit_ts).await.unwrap();
102102
assert_eq!(ids, (start_id..(start_id + 3)).collect::<Vec<_>>());
103103

104104
let snapshot_id_allocs: Vec<_> = state

0 commit comments

Comments
 (0)