Skip to content

Commit e680774

Browse files
authored
Merge pull request #1 from vladson/hms_update_table_refactoring
Hms update table refactoring
2 parents 1da8b12 + 7ab5e6d commit e680774

File tree

3 files changed

+238
-57
lines changed

3 files changed

+238
-57
lines changed

crates/catalog/hms/src/catalog.rs

Lines changed: 162 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub const HMS_HIVE_LOCKS_DISABLED: &str = "hive_locks_disabled";
5757
const HMS_EXPECTED_PARAMETER_KEY: &str = "expected_parameter_key";
5858
const HMS_EXPECTED_PARAMETER_VALUE: &str = "expected_parameter_value";
5959

60-
/// Builder for [`RestCatalog`].
60+
/// Builder for [`HmsCatalog`].
6161
#[derive(Debug)]
6262
pub struct HmsCatalogBuilder(HmsCatalogConfig);
6363

@@ -174,6 +174,43 @@ impl Debug for HmsCatalog {
174174
}
175175
}
176176

177+
/// RAII guard for HMS table locks. Automatically releases the lock when dropped.
178+
struct HmsLockGuard {
179+
client: ThriftHiveMetastoreClient,
180+
lockid: i64,
181+
}
182+
183+
impl HmsLockGuard {
184+
async fn acquire(
185+
client: &ThriftHiveMetastoreClient,
186+
db_name: &str,
187+
tbl_name: &str,
188+
) -> Result<Self> {
189+
let lock = client
190+
.lock(create_lock_request(db_name, tbl_name))
191+
.await
192+
.map(from_thrift_exception)
193+
.map_err(from_thrift_error)??;
194+
195+
Ok(Self {
196+
client: client.clone(),
197+
lockid: lock.lockid,
198+
})
199+
}
200+
}
201+
202+
impl Drop for HmsLockGuard {
203+
fn drop(&mut self) {
204+
let client = self.client.clone();
205+
let lockid = self.lockid;
206+
tokio::spawn(async move {
207+
let _ = client
208+
.unlock(hive_metastore::UnlockRequest { lockid })
209+
.await;
210+
});
211+
}
212+
}
213+
177214
impl HmsCatalog {
178215
/// Create a new hms catalog.
179216
fn new(config: HmsCatalogConfig) -> Result<Self> {
@@ -215,6 +252,64 @@ impl HmsCatalog {
215252
pub fn file_io(&self) -> FileIO {
216253
self.file_io.clone()
217254
}
255+
256+
/// Applies a commit to a table and prepares the update for HMS.
257+
/// # Returns
258+
/// A tuple of (staged_table, new_hive_table) ready for HMS alter_table operation
259+
async fn apply_and_prepare_update(
260+
&self,
261+
commit: TableCommit,
262+
db_name: &str,
263+
tbl_name: &str,
264+
hive_table: &hive_metastore::Table,
265+
) -> Result<(Table, hive_metastore::Table)> {
266+
let metadata_location = get_metadata_location(&hive_table.parameters)?;
267+
268+
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
269+
270+
let cur_table = Table::builder()
271+
.file_io(self.file_io())
272+
.metadata_location(metadata_location)
273+
.metadata(metadata)
274+
.identifier(TableIdent::new(
275+
NamespaceIdent::new(db_name.to_string()),
276+
tbl_name.to_string(),
277+
))
278+
.build()?;
279+
280+
let staged_table = commit.apply(cur_table)?;
281+
staged_table
282+
.metadata()
283+
.write_to(
284+
staged_table.file_io(),
285+
staged_table.metadata_location_result()?,
286+
)
287+
.await?;
288+
289+
let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?;
290+
291+
Ok((staged_table, new_hive_table))
292+
}
293+
294+
/// Builds an EnvironmentContext for optimistic locking with HMS.
295+
///
296+
/// The context includes the expected metadata_location, which HMS will use
297+
/// to validate that the table hasn't been modified concurrently.
298+
fn build_environment_context(metadata_location: &str) -> hive_metastore::EnvironmentContext {
299+
let mut env_context_properties = pilota::AHashMap::new();
300+
env_context_properties.insert(
301+
HMS_EXPECTED_PARAMETER_KEY.into(),
302+
"metadata_location".into(),
303+
);
304+
env_context_properties.insert(
305+
HMS_EXPECTED_PARAMETER_VALUE.into(),
306+
pilota::FastStr::from_string(metadata_location.to_string()),
307+
);
308+
309+
hive_metastore::EnvironmentContext {
310+
properties: Some(env_context_properties),
311+
}
312+
}
218313
}
219314

220315
#[async_trait]
@@ -610,25 +705,74 @@ impl Catalog for HmsCatalog {
610705
))
611706
}
612707

708+
/// Updates an existing table by applying a commit operation.
709+
///
710+
/// This method supports two update strategies depending on the catalog configuration:
711+
///
712+
/// **Optimistic Locking** (when `hive_locks_disabled` is set):
713+
/// - Retrieves the current table state from HMS without acquiring locks
714+
/// - Constructs an `EnvironmentContext` with the expected metadata location
715+
/// - Uses `alter_table_with_environment_context` to perform an atomic
716+
/// compare-and-swap operation.
717+
/// - HMS will reject the update if the metadata location has changed,
718+
/// indicating a concurrent modification
719+
///
720+
/// **Traditional Locking** (default):
721+
/// - Acquires an exclusive HMS lock on the table before making changes
722+
/// - Retrieves the current table state
723+
/// - Applies the commit and writes new metadata
724+
/// - Updates the table in HMS using `alter_table`
725+
/// - Releases the lock after the operation completes
726+
///
727+
/// # Returns
728+
/// A `Result` wrapping the updated `Table` object with new metadata.
729+
///
730+
/// # Errors
731+
/// This function may return an error in several scenarios:
732+
/// - Failure to validate the namespace or table identifier
733+
/// - Inability to acquire a lock (traditional locking mode)
734+
/// - Failure to retrieve the table from HMS
735+
/// - Errors reading or writing table metadata
736+
/// - HMS rejects the update due to concurrent modification (optimistic locking)
737+
/// - Errors from the underlying Thrift communication with HMS
613738
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
614739
let ident = commit.identifier().clone();
615740
let db_name = validate_namespace(ident.namespace())?;
616741
let tbl_name = ident.name.clone();
617742

618-
// if HMS_HIVE_LOCKS_DISABLED is set
619-
if let Some(tt) = &self.config.props.get(HMS_HIVE_LOCKS_DISABLED) {
620-
// Do alter table with env context
621-
Err(Error::new(ErrorKind::Unexpected, "Optimistic locks are not supported yet"))
622-
} else {
623-
// start with trying to acquire a lock
624-
let lock = &self
743+
if self.config.props.contains_key(HMS_HIVE_LOCKS_DISABLED) {
744+
// Optimistic locking path: read first, then validate with EnvironmentContext
745+
let hive_table = self
625746
.client
626747
.0
627-
.lock(create_lock_request(&db_name, &tbl_name))
748+
.get_table(db_name.clone().into(), tbl_name.clone().into())
628749
.await
629750
.map(from_thrift_exception)
630751
.map_err(from_thrift_error)??;
631752

753+
let metadata_location = get_metadata_location(&hive_table.parameters)?;
754+
let env_context = Self::build_environment_context(&metadata_location);
755+
756+
let (staged_table, new_hive_table) =
757+
self.apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table)
758+
.await?;
759+
760+
self.client
761+
.0
762+
.alter_table_with_environment_context(
763+
db_name.into(),
764+
tbl_name.into(),
765+
new_hive_table,
766+
env_context,
767+
)
768+
.await
769+
.map_err(from_thrift_error)?;
770+
771+
Ok(staged_table)
772+
} else {
773+
// Traditional locking path: acquire lock first, then read
774+
let _guard = HmsLockGuard::acquire(&self.client.0, &db_name, &tbl_name).await?;
775+
632776
let hive_table = self
633777
.client
634778
.0
@@ -637,41 +781,18 @@ impl Catalog for HmsCatalog {
637781
.map(from_thrift_exception)
638782
.map_err(from_thrift_error)??;
639783

640-
let metadata_location = get_metadata_location(&hive_table.parameters)?;
641-
642-
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
784+
let (staged_table, new_hive_table) =
785+
self.apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table)
786+
.await?;
643787

644-
let cur_table = Table::builder()
645-
.file_io(self.file_io())
646-
.metadata_location(metadata_location)
647-
.metadata(metadata)
648-
.identifier(TableIdent::new(
649-
NamespaceIdent::new(db_name.clone()),
650-
tbl_name.clone(),
651-
))
652-
.build()?;
653-
654-
let staged_table = commit.apply(cur_table)?;
655-
staged_table
656-
.metadata()
657-
.write_to(
658-
staged_table.file_io(),
659-
staged_table.metadata().metadata_location(),
660-
)
661-
.await?;
662-
let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?;
663-
664-
let updated = self.client.0.alter_table(
665-
db_name.clone().into(),
666-
tbl_name.clone().into(),
667-
new_hive_table,
668-
).await
669-
.map(from_thrift_exception)
670-
.map_err(from_thrift_error)??;
788+
self.client
789+
.0
790+
.alter_table(db_name.into(), tbl_name.into(), new_hive_table)
791+
.await
792+
.map_err(from_thrift_error)?;
671793

672-
// unlock the table after alter table
673-
&self.client.0.unlock(lock.lockid).await.map(from_thrift_error)?;
674794
Ok(staged_table)
795+
// Lock automatically released here via Drop
675796
}
676797
}
677798
}

crates/catalog/hms/src/utils.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pub(crate) fn update_hive_table_from_table(
187187
if k == METADATA_LOCATION || k == TABLE_TYPE || k == EXTERNAL {
188188
continue;
189189
}
190-
params.insert(k.as_str().clone().into(), v.as_str().clone().into());
190+
params.insert(FastStr::from_string(k.to_string()), FastStr::from_string(v.to_string()));
191191
}
192192

193193
params.insert(FastStr::from(EXTERNAL), FastStr::from("TRUE"));
@@ -357,14 +357,14 @@ fn get_current_time() -> Result<i32> {
357357
}
358358

359359
pub(crate) fn create_lock_request(
360-
tbl_name: &String,
361-
db_name: &String,
360+
db_name: &str,
361+
tbl_name: &str,
362362
) -> hive_metastore::LockRequest {
363363
let component = hive_metastore::LockComponent {
364364
r#type: hive_metastore::LockType::EXCLUSIVE,
365365
level: hive_metastore::LockLevel::TABLE,
366-
dbname: db_name.clone().into(),
367-
tablename: Some(tbl_name.clone().into()),
366+
dbname: FastStr::from_string(db_name.to_string()),
367+
tablename: Some(FastStr::from_string(tbl_name.to_string())),
368368
partitionname: None,
369369
operation_type: None,
370370
is_acid: Some(true),
@@ -374,7 +374,7 @@ pub(crate) fn create_lock_request(
374374
component: vec![component],
375375
txnid: None,
376376
user: FastStr::from(whoami::username()),
377-
hostname: FastStr::from(whoami::fallible::hostname()),
377+
hostname: FastStr::from(whoami::fallible::hostname().unwrap()),
378378
agent_info: None,
379379
}
380380
}

0 commit comments

Comments
 (0)