From 5435ed2554a48df8019efc7f19cf79a5844d111f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Magalh=C3=A3es?= Date: Sun, 21 Dec 2025 21:23:07 +0000 Subject: [PATCH] feat: implement Hi-Lo ID generation strategy in EntityManager for improved database efficiency --- CHANGELOG.md | 23 ++++ data/src/entity_manager/system.py | 200 ++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..cc5894dac --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,23 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added + +* Hi-Lo generator strategy (`generator_type="hilo"`) for entity ID generation that reduces database contention by pre-allocating ID pools +* New `HILO_POOL_SIZE` constant (default: 100) for configuring pool size globally +* Support for per-field pool size customization via `generator_pool_size` attribute +* Thread-safe pool management with local locking that only accesses the database during pool allocation + +### Changed + +* + +### Fixed + +* diff --git a/data/src/entity_manager/system.py b/data/src/entity_manager/system.py index c40a9abea..bef9beeff 100644 --- a/data/src/entity_manager/system.py +++ b/data/src/entity_manager/system.py @@ -37,6 +37,7 @@ import calendar import datetime import tempfile +import threading import colony @@ -62,6 +63,11 @@ entities where a queries is splitted in multiple queries for the yield based loading of entities (spares memory) """ +HILO_POOL_SIZE = 100 +""" The default pool size for the Hi-Lo generator strategy, +meaning the number of IDs to pre-allocate per pool request, +reducing database round trips by a factor of pool size """ + SAVED_STATE_VALUE = 1 """ The saved state value, set in the entity after the save operation to indicate the result of the save operation """ @@ -344,6 +350,14 @@ class EntityManager(object): schema created in the underlying data source and are considered to exist in the data source """ + _hilo_pools = {} + """ Map associating field names with their respective Hi-Lo pool + state as a tuple of (current_id, max_id) for the pool range """ + + _hilo_lock = None + """ Lock for thread-safe access to the Hi-Lo pools, ensures + that pool allocation and ID consumption are atomic """ + def __init__( self, entity_manager_plugin, engine_plugin, id, entities_map, options={} ): @@ -377,6 +391,8 @@ def __init__( self.commit_callbacks = {} self.rollback_callbacks = {} self._exists = {} + self._hilo_pools = {} + self._hilo_lock = threading.Lock() self.apply_types() @@ -2862,6 +2878,190 @@ def _generate_uuid_hex(self, entity, name): # of the identifier value entity.set_value(name, value) + def _generate_hilo(self, entity, name): + # retrieves the (entity) class associated with + # the entity to generate the value + entity_class = entity.__class__ + + # uses the name to retrieve the value (map) + # containing the definition of the attribute + value = getattr(entity_class, name) + + # retrieves the map containing the various entity names + # associated with their respective classes and then + # retrieves the entity class that "owns" the value + names_map = entity_class.get_names_map() + name_class = names_map[name] + + # retrieves the name of the table associated with the + # current name and uses it to create the default field + # name for the generation table (class name and field name) + table_name = name_class.get_name() + field_name = "%s_%s" % (table_name, name) + + # tries to retrieve the (generator) field name defaulting + # to the name of the default field name, the pool size can + # also be customized per field via the generator_pool_size + field_name = value.get("generator_field_name", field_name) + pool_size = value.get("generator_pool_size", HILO_POOL_SIZE) + + # grabs an id value using the Hi-Lo pool allocation strategy + # which reduces database contention by pre-allocating ranges + value = self._hilo_grab_id(field_name, pool_size) + value = int(value) + + # sets the generated value in the entity, final setting + # of the generated value + entity.set_value(name, value) + + def _hilo_grab_id(self, field_name, pool_size): + """ + Retrieves the next available ID using the Hi-Lo pool + allocation strategy, allocating a new pool from the + database when the current pool is exhausted. + + This method significantly reduces database contention + by acquiring pool_size IDs per database access instead + of one ID per access (as the table generator does). + + :type field_name: String + :param field_name: The name of the field for which to + retrieve the next ID value. + :type pool_size: int + :param pool_size: The number of IDs to pre-allocate + when the pool is exhausted. + :rtype: int + :return: The next available ID for the field. + """ + + # acquires the lock to ensure thread-safe access to the + # Hi-Lo pools, this is a local lock that does not involve + # any database locking during normal ID consumption + with self._hilo_lock: + # checks if a pool exists for this field and if so + # retrieves the current state of the pool + if field_name in self._hilo_pools: + current_id, max_id = self._hilo_pools[field_name] + else: + # no pool exists, force allocation by setting + # current above max + current_id = 1 + max_id = 0 + + # if the current id exceeds the max id, the pool is + # exhausted and a new pool must be allocated from + # the database + if current_id > max_id: + self._hilo_allocate_pool(field_name, pool_size) + current_id, max_id = self._hilo_pools[field_name] + + # consume one ID from the pool and update the pool state + self._hilo_pools[field_name] = (current_id + 1, max_id) + return current_id + + def _hilo_allocate_pool(self, field_name, pool_size): + """ + Allocates a new pool of IDs from the database by atomically + incrementing the generator counter by pool_size. + + The pool is stored as (low, high) where low is the first + ID to use and high is the last ID in the pool range. + + :type field_name: String + :param field_name: The name of the field for which to + allocate a new pool. + :type pool_size: int + :param pool_size: The number of IDs to allocate in + the new pool. + """ + + # ensures the generator table exists before attempting + # to allocate a pool + self.create_generator() + + # atomically increment the generator counter by pool_size + # and retrieve the new high value + new_high = self._hilo_atomic_increment(field_name, pool_size) + + # our allocated range is [new_high - pool_size, new_high - 1] + # since new_high is the next available value after our range + low = new_high - pool_size + high = new_high - 1 + + # store the pool state as (current_id, max_id) + self._hilo_pools[field_name] = (low, high) + + def _hilo_atomic_increment(self, field_name, increment): + """ + Atomically increments the generator counter for the given + field by the specified increment amount and returns the + new `next_id` value. + + This method uses database locking to ensure atomicity + across concurrent processes/threads, but the lock is only + held for the duration of the pool allocation, not for each + individual ID. + + :type field_name: String + :param field_name: The name of the field for which to + increment the generator counter. + :type increment: int + :param increment: The amount by which to increment the + counter (typically the pool size). + :rtype: int + :return: The new `next_id` value after incrementing. + """ + + # escapes the field name value to avoid possible + # security problems (SQL injection) + name = self._escape_text(field_name) + + # retrieves the current modification time for + # the generator as the current system time + _mtime = time.time() + + # locks the generator table row and retrieves the current + # next_id value, this uses SELECT ... FOR UPDATE on MySQL + # or equivalent locking on other engines + self.lock_table( + GENERATOR_VALUE, {"field_name": "name", "field_value": "'" + name + "'"} + ) + query = ( + "select name, next_id from " + GENERATOR_VALUE + " where name = '%s'" % name + ) + cursor = self.execute_query(query, False) + try: + rows = [value[1] for value in cursor] + finally: + cursor.close() + + # checks if a row exists for this field name + if not rows or rows[0] == None: + # first allocation for this field, start at 1 and + # reserve IDs up to 1 + increment (so next available + # after our pool is 1 + increment) + next_id = 1 + increment + query = "insert into %s(name, next_id, _mtime) values('%s', %d, %f)" % ( + GENERATOR_VALUE, + name, + next_id, + _mtime, + ) + self.execute_query(query) + return next_id + else: + # increment the existing counter by the pool size + current_next = rows[0] + new_next = current_next + increment + query = "update %s set next_id = %d, _mtime = %f where name = '%s'" % ( + GENERATOR_VALUE, + new_next, + _mtime, + name, + ) + self.execute_query(query) + return new_next + def _create_generator_query(self): # creates the list to hold the various queries # to be used to create indexes