Skip to content

Commit b5c3d53

Browse files
committed
HMS commit implementation
Partial implementation of HMS traditional commit stabilising fix issues with hms lock flow refactoring for optimistic locks address format
1 parent 9fa3776 commit b5c3d53

File tree

5 files changed

+396
-7
lines changed

5 files changed

+396
-7
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog/hms/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ linkedbytes = { workspace = true }
5252
metainfo = { workspace = true }
5353
motore-macros = { workspace = true }
5454
volo = { workspace = true }
55+
whoami = "1.6.1"
5556

5657
[dev-dependencies]
5758
ctor = { workspace = true }

crates/catalog/hms/src/catalog.rs

Lines changed: 192 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,14 @@ pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
5050
/// HMS Catalog warehouse location
5151
pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
5252

53-
/// Builder for [`RestCatalog`].
53+
///HMS Hive Locks Disabled
54+
pub const HMS_HIVE_LOCKS_DISABLED: &str = "hive_locks_disabled";
55+
56+
/// HMS Environment Context
57+
const HMS_EXPECTED_PARAMETER_KEY: &str = "expected_parameter_key";
58+
const HMS_EXPECTED_PARAMETER_VALUE: &str = "expected_parameter_value";
59+
60+
/// Builder for [`HmsCatalog`].
5461
#[derive(Debug)]
5562
pub struct HmsCatalogBuilder(HmsCatalogConfig);
5663

@@ -167,6 +174,43 @@ impl Debug for HmsCatalog {
167174
}
168175
}
169176

177+
/// RAII guard for HMS table locks. Automatically releases the lock when dropped.
178+
struct HmsLockGuard {
179+
client: ThriftHiveMetastoreClient,
180+
lockid: i64,
181+
}
182+
183+
impl HmsLockGuard {
184+
async fn acquire(
185+
client: &ThriftHiveMetastoreClient,
186+
db_name: &str,
187+
tbl_name: &str,
188+
) -> Result<Self> {
189+
let lock = client
190+
.lock(create_lock_request(db_name, tbl_name))
191+
.await
192+
.map(from_thrift_exception)
193+
.map_err(from_thrift_error)??;
194+
195+
Ok(Self {
196+
client: client.clone(),
197+
lockid: lock.lockid,
198+
})
199+
}
200+
}
201+
202+
impl Drop for HmsLockGuard {
203+
fn drop(&mut self) {
204+
let client = self.client.clone();
205+
let lockid = self.lockid;
206+
tokio::spawn(async move {
207+
let _ = client
208+
.unlock(hive_metastore::UnlockRequest { lockid })
209+
.await;
210+
});
211+
}
212+
}
213+
170214
impl HmsCatalog {
171215
/// Create a new hms catalog.
172216
fn new(config: HmsCatalogConfig) -> Result<Self> {
@@ -208,6 +252,64 @@ impl HmsCatalog {
208252
pub fn file_io(&self) -> FileIO {
209253
self.file_io.clone()
210254
}
255+
256+
/// Applies a commit to a table and prepares the update for HMS.
257+
/// # Returns
258+
/// A tuple of (staged_table, new_hive_table) ready for HMS alter_table operation
259+
async fn apply_and_prepare_update(
260+
&self,
261+
commit: TableCommit,
262+
db_name: &str,
263+
tbl_name: &str,
264+
hive_table: &hive_metastore::Table,
265+
) -> Result<(Table, hive_metastore::Table)> {
266+
let metadata_location = get_metadata_location(&hive_table.parameters)?;
267+
268+
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
269+
270+
let cur_table = Table::builder()
271+
.file_io(self.file_io())
272+
.metadata_location(metadata_location)
273+
.metadata(metadata)
274+
.identifier(TableIdent::new(
275+
NamespaceIdent::new(db_name.to_string()),
276+
tbl_name.to_string(),
277+
))
278+
.build()?;
279+
280+
let staged_table = commit.apply(cur_table)?;
281+
staged_table
282+
.metadata()
283+
.write_to(
284+
staged_table.file_io(),
285+
staged_table.metadata_location_result()?,
286+
)
287+
.await?;
288+
289+
let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?;
290+
291+
Ok((staged_table, new_hive_table))
292+
}
293+
294+
/// Builds an EnvironmentContext for optimistic locking with HMS.
295+
///
296+
/// The context includes the expected metadata_location, which HMS will use
297+
/// to validate that the table hasn't been modified concurrently.
298+
fn build_environment_context(metadata_location: &str) -> hive_metastore::EnvironmentContext {
299+
let mut env_context_properties = pilota::AHashMap::new();
300+
env_context_properties.insert(
301+
HMS_EXPECTED_PARAMETER_KEY.into(),
302+
"metadata_location".into(),
303+
);
304+
env_context_properties.insert(
305+
HMS_EXPECTED_PARAMETER_VALUE.into(),
306+
pilota::FastStr::from_string(metadata_location.to_string()),
307+
);
308+
309+
hive_metastore::EnvironmentContext {
310+
properties: Some(env_context_properties),
311+
}
312+
}
211313
}
212314

213315
#[async_trait]
@@ -603,10 +705,94 @@ impl Catalog for HmsCatalog {
603705
))
604706
}
605707

606-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
607-
Err(Error::new(
608-
ErrorKind::FeatureUnsupported,
609-
"Updating a table is not supported yet",
610-
))
708+
/// Updates an existing table by applying a commit operation.
709+
///
710+
/// This method supports two update strategies depending on the catalog configuration:
711+
///
712+
/// **Optimistic Locking** (when `hive_locks_disabled` is set):
713+
/// - Retrieves the current table state from HMS without acquiring locks
714+
/// - Constructs an `EnvironmentContext` with the expected metadata location
715+
/// - Uses `alter_table_with_environment_context` to perform an atomic
716+
/// compare-and-swap operation.
717+
/// - HMS will reject the update if the metadata location has changed,
718+
/// indicating a concurrent modification
719+
///
720+
/// **Traditional Locking** (default):
721+
/// - Acquires an exclusive HMS lock on the table before making changes
722+
/// - Retrieves the current table state
723+
/// - Applies the commit and writes new metadata
724+
/// - Updates the table in HMS using `alter_table`
725+
/// - Releases the lock after the operation completes
726+
///
727+
/// # Returns
728+
/// A `Result` wrapping the updated `Table` object with new metadata.
729+
///
730+
/// # Errors
731+
/// This function may return an error in several scenarios:
732+
/// - Failure to validate the namespace or table identifier
733+
/// - Inability to acquire a lock (traditional locking mode)
734+
/// - Failure to retrieve the table from HMS
735+
/// - Errors reading or writing table metadata
736+
/// - HMS rejects the update due to concurrent modification (optimistic locking)
737+
/// - Errors from the underlying Thrift communication with HMS
738+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
739+
let ident = commit.identifier().clone();
740+
let db_name = validate_namespace(ident.namespace())?;
741+
let tbl_name = ident.name.clone();
742+
743+
if self.config.props.contains_key(HMS_HIVE_LOCKS_DISABLED) {
744+
// Optimistic locking path: read first, then validate with EnvironmentContext
745+
let hive_table = self
746+
.client
747+
.0
748+
.get_table(db_name.clone().into(), tbl_name.clone().into())
749+
.await
750+
.map(from_thrift_exception)
751+
.map_err(from_thrift_error)??;
752+
753+
let metadata_location = get_metadata_location(&hive_table.parameters)?;
754+
let env_context = Self::build_environment_context(&metadata_location);
755+
756+
let (staged_table, new_hive_table) = self
757+
.apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table)
758+
.await?;
759+
760+
self.client
761+
.0
762+
.alter_table_with_environment_context(
763+
db_name.into(),
764+
tbl_name.into(),
765+
new_hive_table,
766+
env_context,
767+
)
768+
.await
769+
.map_err(from_thrift_error)?;
770+
771+
Ok(staged_table)
772+
} else {
773+
// Traditional locking path: acquire lock first, then read
774+
let _guard = HmsLockGuard::acquire(&self.client.0, &db_name, &tbl_name).await?;
775+
776+
let hive_table = self
777+
.client
778+
.0
779+
.get_table(db_name.clone().into(), tbl_name.clone().into())
780+
.await
781+
.map(from_thrift_exception)
782+
.map_err(from_thrift_error)??;
783+
784+
let (staged_table, new_hive_table) = self
785+
.apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table)
786+
.await?;
787+
788+
self.client
789+
.0
790+
.alter_table(db_name.into(), tbl_name.into(), new_hive_table)
791+
.await
792+
.map_err(from_thrift_error)?;
793+
794+
Ok(staged_table)
795+
// Lock automatically released here via Drop
796+
}
611797
}
612798
}

crates/catalog/hms/src/utils.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
2020
use chrono::Utc;
2121
use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor};
2222
use iceberg::spec::Schema;
23+
use iceberg::table::Table;
2324
use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
2425
use pilota::{AHashMap, FastStr};
2526

@@ -155,6 +156,54 @@ pub(crate) fn convert_to_database(
155156
Ok(db)
156157
}
157158

159+
pub(crate) fn update_hive_table_from_table(
160+
hive_tbl: &hive_metastore::Table,
161+
tbl: &Table,
162+
) -> Result<hive_metastore::Table> {
163+
let mut new_tbl = hive_tbl.clone();
164+
let metadata = tbl.metadata();
165+
let schema = metadata.current_schema();
166+
167+
let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build();
168+
169+
match new_tbl.sd.as_mut() {
170+
Some(sd) => {
171+
sd.cols = Some(hive_schema);
172+
}
173+
None => {
174+
// Highly unlikely for a real HMS table, but be defensive
175+
new_tbl.sd = Some(StorageDescriptor {
176+
cols: Some(hive_schema),
177+
..Default::default()
178+
});
179+
}
180+
}
181+
182+
let metadata_location = tbl.metadata_location_result()?.to_string();
183+
184+
let mut params: AHashMap<FastStr, FastStr> = new_tbl.parameters.take().unwrap_or_default();
185+
for (k, v) in metadata.properties().iter() {
186+
if k == METADATA_LOCATION || k == TABLE_TYPE || k == EXTERNAL {
187+
continue;
188+
}
189+
params.insert(
190+
FastStr::from_string(k.to_string()),
191+
FastStr::from_string(v.to_string()),
192+
);
193+
}
194+
195+
params.insert(FastStr::from(EXTERNAL), FastStr::from("TRUE"));
196+
params.insert(FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG"));
197+
params.insert(
198+
FastStr::from(METADATA_LOCATION),
199+
FastStr::from(metadata_location),
200+
);
201+
202+
new_tbl.parameters = Some(params);
203+
204+
Ok(new_tbl)
205+
}
206+
158207
pub(crate) fn convert_to_hive_table(
159208
db_name: String,
160209
schema: &Schema,
@@ -309,6 +358,26 @@ fn get_current_time() -> Result<i32> {
309358
})
310359
}
311360

361+
pub(crate) fn create_lock_request(db_name: &str, tbl_name: &str) -> hive_metastore::LockRequest {
362+
let component = hive_metastore::LockComponent {
363+
r#type: hive_metastore::LockType::EXCLUSIVE,
364+
level: hive_metastore::LockLevel::TABLE,
365+
dbname: FastStr::from_string(db_name.to_string()),
366+
tablename: Some(FastStr::from_string(tbl_name.to_string())),
367+
partitionname: None,
368+
operation_type: None,
369+
is_acid: Some(true),
370+
is_dynamic_partition_write: None,
371+
};
372+
hive_metastore::LockRequest {
373+
component: vec![component],
374+
txnid: None,
375+
user: FastStr::from(whoami::username()),
376+
hostname: FastStr::from(whoami::fallible::hostname().unwrap()),
377+
agent_info: None,
378+
}
379+
}
380+
312381
#[cfg(test)]
313382
mod tests {
314383
use iceberg::spec::{NestedField, PrimitiveType, Type};

0 commit comments

Comments
 (0)