Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ impl Catalog {
txn.set_system_config_synced_once()?;
}
// Add any new builtin objects and remove old ones.
let new_builtin_collections =
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
system_cluster: config.builtin_system_cluster_config,
catalog_server_cluster: config.builtin_catalog_server_cluster_config,
Expand All @@ -217,11 +215,14 @@ impl Catalog {
&mut txn,
&builtin_bootstrap_cluster_config_map,
)?;
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
add_new_remove_old_builtin_cluster_replicas_migration(
&mut txn,
&builtin_bootstrap_cluster_config_map,
)?;

let new_builtin_collections =
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
add_new_remove_old_builtin_roles_migration(&mut txn)?;
remove_invalid_config_param_role_defaults_migration(&mut txn)?;
remove_pending_cluster_replicas_migration(&mut txn)?;
Expand Down
10 changes: 5 additions & 5 deletions src/adapter/tests/testdata/sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
add-table
foo
----
u1
u2

resolve
SELECT 1 FROM foo
----
SELECT 1 FROM [u1 AS materialize.public.foo]
SELECT 1 FROM [u2 AS materialize.public.foo]

resolve
SELECT 1 FROM bar
Expand All @@ -25,7 +25,7 @@ error: unknown catalog item 'bar'
resolve
SELECT 1 FROM materialize.public.foo
----
SELECT 1 FROM [u1 AS materialize.public.foo]
SELECT 1 FROM [u2 AS materialize.public.foo]

resolve
WITH foo AS (SELECT 1)
Expand All @@ -43,13 +43,13 @@ resolve
WITH bar AS (SELECT 1)
SELECT 1 FROM foo CROSS JOIN bar
----
WITH bar AS (SELECT 1) SELECT 1 FROM [u1 AS materialize.public.foo] CROSS JOIN bar
WITH bar AS (SELECT 1) SELECT 1 FROM [u2 AS materialize.public.foo] CROSS JOIN bar

resolve
WITH bar AS (SELECT 1), baz AS (SELECT 2)
SELECT 1 FROM foo CROSS JOIN bar CROSS JOIN baz
----
WITH bar AS (SELECT 1), baz AS (SELECT 2) SELECT 1 FROM [u1 AS materialize.public.foo] CROSS JOIN bar CROSS JOIN baz
WITH bar AS (SELECT 1), baz AS (SELECT 2) SELECT 1 FROM [u2 AS materialize.public.foo] CROSS JOIN bar CROSS JOIN baz

resolve
WITH outermost(x) AS (
Expand Down
35 changes: 24 additions & 11 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
pub const USER_ITEM_ALLOC_KEY: &str = "user";
pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
Expand All @@ -74,6 +70,25 @@ pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";

// Note: these ID types are generally merged with the main system and user item keys,
// but are kept separate here for backwards-compatibility reasons.
pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";

pub const SYSTEM_ALLOC_KEYS: &[&str] = &[
SYSTEM_ITEM_ALLOC_KEY,
SYSTEM_CLUSTER_ID_ALLOC_KEY,
SYSTEM_REPLICA_ID_ALLOC_KEY,
];

pub const USER_ALLOC_KEYS: &[&str] = &[
USER_ITEM_ALLOC_KEY,
USER_CLUSTER_ID_ALLOC_KEY,
USER_REPLICA_ID_ALLOC_KEY,
];

#[derive(Clone, Debug)]
pub struct BootstrapArgs {
pub cluster_replica_size_map: ClusterReplicaSizeMap,
Expand Down Expand Up @@ -319,15 +334,15 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
#[mz_ore::instrument(level = "debug")]
async fn allocate_id(
&mut self,
id_type: &str,
id_type: &[&str],
amount: u64,
commit_ts: Timestamp,
) -> Result<Vec<u64>, CatalogError> {
if amount == 0 {
return Ok(Vec::new());
}
let mut txn = self.transaction().await?;
let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
let ids = txn.get_and_increment_id_by(id_type, amount)?;
txn.commit_internal(commit_ts).await?;
Ok(ids)
}
Expand All @@ -340,9 +355,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
amount: u64,
commit_ts: Timestamp,
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
let ids = self
.allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
.await?;
let ids = self.allocate_id(USER_ALLOC_KEYS, amount, commit_ts).await?;
let ids = ids
.iter()
.map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
Expand All @@ -357,7 +370,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
&mut self,
commit_ts: Timestamp,
) -> Result<(CatalogItemId, GlobalId), CatalogError> {
let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
let id = self.allocate_id(USER_ALLOC_KEYS, 1, commit_ts).await?;
let id = id.into_element();
Ok((CatalogItemId::User(id), GlobalId::User(id)))
}
Expand All @@ -370,7 +383,7 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
commit_ts: Timestamp,
) -> Result<ClusterId, CatalogError> {
let id = self
.allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
.allocate_id(USER_ALLOC_KEYS, 1, commit_ts)
.await?
.into_element();
Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/durable/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ pub(crate) async fn initialize(
// We created a network policy with a prefined ID user(1) and OID. We need
// to increment the id alloc key. It should be safe to assume that there's
// no user(1), as a sanity check, we'll assert this is the case.
let id = tx.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
let id = tx.get_and_increment_id(&[USER_NETWORK_POLICY_ID_ALLOC_KEY])?;
assert!(DEFAULT_USER_NETWORK_POLICY_ID == NetworkPolicyId::User(id));

audit_events.extend([(
Expand Down Expand Up @@ -720,7 +720,7 @@ pub(crate) async fn initialize(
// Allocate an ID for each audit log event.
let mut audit_events_with_id = Vec::with_capacity(audit_events.len());
for (ty, obj, details) in audit_events {
let id = tx.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
let id = tx.get_and_increment_id(&[AUDIT_LOG_ID_ALLOC_KEY])?;
audit_events_with_id.push((id, ty, obj, details));
}

Expand Down
77 changes: 42 additions & 35 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ use crate::durable::{
AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_ALLOC_KEYS, SYSTEM_ITEM_ALLOC_KEY,
Snapshot, SystemConfiguration, USER_ALLOC_KEYS, USER_NETWORK_POLICY_ID_ALLOC_KEY,
USER_ROLE_ID_ALLOC_KEY,
};
use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
Expand Down Expand Up @@ -230,7 +229,7 @@ impl<'a> Transaction<'a> {
privileges: Vec<MzAclItem>,
temporary_oids: &HashSet<u32>,
) -> Result<(DatabaseId, u32), CatalogError> {
let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
let id = self.get_and_increment_id(&[DATABASE_ID_ALLOC_KEY])?;
let id = DatabaseId::User(id);
let oid = self.allocate_oid(temporary_oids)?;
self.insert_database(id, database_name, owner_id, privileges, oid)?;
Expand Down Expand Up @@ -268,7 +267,7 @@ impl<'a> Transaction<'a> {
privileges: Vec<MzAclItem>,
temporary_oids: &HashSet<u32>,
) -> Result<(SchemaId, u32), CatalogError> {
let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
let id = self.get_and_increment_id(&[SCHEMA_ID_ALLOC_KEY])?;
let id = SchemaId::User(id);
let oid = self.allocate_oid(temporary_oids)?;
self.insert_schema(
Expand Down Expand Up @@ -341,7 +340,7 @@ impl<'a> Transaction<'a> {
vars: RoleVars,
temporary_oids: &HashSet<u32>,
) -> Result<(RoleId, u32), CatalogError> {
let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
let id = self.get_and_increment_id(&[USER_ROLE_ID_ALLOC_KEY])?;
let id = RoleId::User(id);
let oid = self.allocate_oid(temporary_oids)?;
self.insert_role(id, name, attributes, membership, vars, oid)?;
Expand Down Expand Up @@ -436,7 +435,7 @@ impl<'a> Transaction<'a> {
config: ClusterConfig,
temporary_oids: &HashSet<u32>,
) -> Result<(), CatalogError> {
let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
let cluster_id = self.get_and_increment_id(SYSTEM_ALLOC_KEYS)?;
let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
self.insert_cluster(
cluster_id,
Expand Down Expand Up @@ -615,7 +614,7 @@ impl<'a> Transaction<'a> {
temporary_oids: &HashSet<u32>,
) -> Result<NetworkPolicyId, CatalogError> {
let oid = self.allocate_oid(temporary_oids)?;
let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
let id = self.get_and_increment_id(&[USER_NETWORK_POLICY_ID_ALLOC_KEY])?;
let id = NetworkPolicyId::User(id);
self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
}
Expand Down Expand Up @@ -733,40 +732,48 @@ impl<'a> Transaction<'a> {
}
}

pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
Ok(self.get_and_increment_id_by(key, 1)?.into_element())
pub fn get_and_increment_id(&mut self, keys: &[&str]) -> Result<u64, CatalogError> {
Ok(self.get_and_increment_id_by(keys, 1)?.into_element())
}

pub fn get_and_increment_id_by(
&mut self,
key: String,
keys: &[&str],
amount: u64,
) -> Result<Vec<u64>, CatalogError> {
let items = self.id_allocator.items();

assert!(
key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
keys != &[SYSTEM_ITEM_ALLOC_KEY] || !self.durable_catalog.is_bootstrap_complete(),
"system item IDs cannot be allocated outside of bootstrap"
);

let current_id = self
.id_allocator
.items()
.get(&IdAllocKey { name: key.clone() })
.unwrap_or_else(|| panic!("{key} id allocator missing"))
.next_id;
let mut current_id = 1;
for key in keys {
let next_id = items
.get(&IdAllocKey {
name: key.to_string(),
})
.unwrap_or_else(|| panic!("{key} id allocator missing"))
.next_id;

current_id = current_id.max(next_id)
}

let next_id = current_id
.checked_add(amount)
.ok_or(SqlCatalogError::IdExhaustion)?;
let prev = self.id_allocator.set(
IdAllocKey { name: key },
Some(IdAllocValue { next_id }),
self.op_id,
)?;
assert_eq!(
prev,
Some(IdAllocValue {
next_id: current_id
})
);
for key in keys {
let prev = self.id_allocator.set(
IdAllocKey {
name: key.to_string(),
},
Some(IdAllocValue { next_id }),
self.op_id,
)?;
assert!(prev.is_some_and(|value| value.next_id <= current_id),);
}

Ok((current_id..next_id).collect())
}

Expand All @@ -779,7 +786,7 @@ impl<'a> Transaction<'a> {
"we can only allocate system item IDs during bootstrap"
);
Ok(self
.get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
.get_and_increment_id_by(SYSTEM_ALLOC_KEYS, amount)?
.into_iter()
// TODO(alter_table): Use separate ID allocators.
.map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
Expand Down Expand Up @@ -882,29 +889,29 @@ impl<'a> Transaction<'a> {
amount: u64,
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
Ok(self
.get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
.get_and_increment_id_by(USER_ALLOC_KEYS, amount)?
.into_iter()
// TODO(alter_table): Use separate ID allocators.
.map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
.collect())
}

pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
let id = self.get_and_increment_id(USER_ALLOC_KEYS)?;
Ok(ReplicaId::User(id))
}

pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
let id = self.get_and_increment_id(SYSTEM_ALLOC_KEYS)?;
Ok(ReplicaId::System(id))
}

pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
self.get_and_increment_id(&[AUDIT_LOG_ID_ALLOC_KEY])
}

pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
self.get_and_increment_id(&[STORAGE_USAGE_ID_ALLOC_KEY])
}

/// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/tests/read-write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn test_allocate_id(state_builder: TestCatalogStateBuilder) {

let start_id = state.get_next_id(id_type).await.unwrap();
let commit_ts = state.current_upper().await;
let ids = state.allocate_id(id_type, 3, commit_ts).await.unwrap();
let ids = state.allocate_id(&[id_type], 3, commit_ts).await.unwrap();
assert_eq!(ids, (start_id..(start_id + 3)).collect::<Vec<_>>());

let snapshot_id_allocs: Vec<_> = state
Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ fn test_persistence() {
.into_iter()
.map(|row| row.get(0))
.collect::<Vec<String>>(),
vec!["u1", "u2", "u3", "u4", "u5", "u6", "u7"]
vec!["u2", "u3", "u4", "u5", "u6", "u7", "u8"]
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,13 +1244,13 @@ largest not in advance of upper:<TIMESTAMP>
timeline: Some(EpochMilliseconds)
session wall time:<TIMESTAMP>

source materialize.public.t1 (u1, storage):
source materialize.public.t1 (u2, storage):
read frontier:[<TIMESTAMP>]
write frontier:[<TIMESTAMP>]

binding constraints:
lower:
(StorageInput([User(1)])): [<TIMESTAMP>]
(StorageInput([User(2)])): [<TIMESTAMP>]
(IsolationLevel(StrictSerializable)): [<TIMESTAMP>]\n";

let row = client
Expand Down
Loading