diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index abf6ebfa..ae34d115 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -28,6 +28,7 @@ jobs: xpack.security.enabled: false xpack.security.transport.ssl.enabled: false ES_JAVA_OPTS: -Xms512m -Xmx1g + action.destructive_requires_name: false ports: - 9200:9200 @@ -44,6 +45,7 @@ jobs: xpack.security.enabled: false xpack.security.transport.ssl.enabled: false ES_JAVA_OPTS: -Xms512m -Xmx1g + action.destructive_requires_name: false ports: - 9400:9400 @@ -60,6 +62,7 @@ jobs: plugins.security.disabled: true plugins.security.ssl.http.enabled: true OPENSEARCH_JAVA_OPTS: -Xms512m -Xmx512m + action.destructive_requires_name: false ports: - 9202:9202 diff --git a/CHANGELOG.md b/CHANGELOG.md index 421e8315..a25987a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,28 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Added the ability to set timeout for Opensearch and Elasticsearch clients by setting the environmental variable `ES_TIMEOUT` [#408](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/408) +- Added comprehensive index management system with dynamic selection and insertion strategies for improved performance and scalability [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405) +- Added `ENABLE_DATETIME_INDEX_FILTERING` environment variable to enable datetime-based index selection using collection IDs. Requires indexes in format: `STAC_ITEMS_INDEX_PREFIX_collection-id_start_year-start_month-start_day-end_year-end_month-end_day`, e.g. `items_sentinel-2-l2a_2025-06-06-2025-09-22`. Default is `false`. [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405) +- Added `DATETIME_INDEX_MAX_SIZE_GB` environment variable to set maximum size limit in GB for datetime-based indexes. When an index exceeds this size, a new time-partitioned index will be created. Default is `25` GB. Only applies when `ENABLE_DATETIME_INDEX_FILTERING` is enabled. [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405) +- Added search engine adapter system with support for both Elasticsearch and OpenSearch [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405): + - `SearchEngineAdapter` base class with engine-specific implementations + - `ElasticsearchAdapter` and `OpenSearchAdapter` with tailored index creation methods + - Automatic engine type detection based on client class + - `SearchEngineAdapterFactory` for creating appropriate adapters +- Added datetime-based index selection strategies with caching support [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405): + - `AsyncDatetimeBasedIndexSelector` and `SyncDatetimeBasedIndexSelector` for temporal filtering + - `IndexCacheManager` with configurable TTL-based cache expiration (default 1 hour) + - `AsyncIndexAliasLoader` and `SyncIndexAliasLoader` for alias management + - `UnfilteredIndexSelector` as fallback for returning all available indexes +- Added index insertion strategies with automatic partitioning [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405): + - Simple insertion strategy (`AsyncSimpleIndexInserter`, `SyncSimpleIndexInserter`) for traditional single-index-per-collection approach + - Datetime-based insertion strategy (`AsyncDatetimeIndexInserter`, `SyncDatetimeIndexInserter`) with time-based partitioning + - Automatic index size monitoring and splitting when limits exceeded + - Handling of chronologically early data and bulk operations +- Added index management utilities [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405): + - `IndexSizeManager` for size monitoring and overflow handling + - `DatetimeIndexManager` for datetime-based index operations + - Factory patterns (`IndexInsertionFactory`, `IndexSelectorFactory`) for strategy creation based on configuration ## [v6.0.0] - 2025-06-22 @@ -22,6 +44,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Updated stac-fastapi parent libraries to v6.0.0 [#291](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/291) + ## [v5.0.0] - 2025-06-11 ### Added diff --git a/Makefile b/Makefile index c23ca951..b84a1cb7 100644 --- a/Makefile +++ b/Makefile @@ -73,10 +73,10 @@ test-opensearch: .PHONY: test test: - -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing' + -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing' docker compose down - -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing' + -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing' docker compose down .PHONY: run-database-es diff --git a/README.md b/README.md index 4d2cb807..9ba41ed8 100644 --- a/README.md +++ b/README.md @@ -201,30 +201,32 @@ There are two main ways to run the API locally: You can customize additional settings in your `.env` file: -| Variable | Description | Default | Required | -|------------------------------|--------------------------------------------------------------------------------------|--------------------------|---------------------------------------------------------------------------------------------| -| `ES_HOST` | Hostname for external Elasticsearch/OpenSearch. | `localhost` | Optional | -| `ES_PORT` | Port for Elasticsearch/OpenSearch. | `9200` (ES) / `9202` (OS)| Optional | -| `ES_USE_SSL` | Use SSL for connecting to Elasticsearch/OpenSearch. | `false` | Optional | -| `ES_VERIFY_CERTS` | Verify SSL certificates when connecting. | `false` | Optional | +| Variable | Description | Default | Required | +|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|---------------------------------------------------------------------------------------------| +| `ES_HOST` | Hostname for external Elasticsearch/OpenSearch. | `localhost` | Optional | +| `ES_PORT` | Port for Elasticsearch/OpenSearch. | `9200` (ES) / `9202` (OS) | Optional | +| `ES_USE_SSL` | Use SSL for connecting to Elasticsearch/OpenSearch. | `false` | Optional | +| `ES_VERIFY_CERTS` | Verify SSL certificates when connecting. | `false` | Optional | | `ES_TIMEOUT` | Client timeout for Elasticsearch/OpenSearch. | DB client default | Optional | -| `STAC_FASTAPI_TITLE` | Title of the API in the documentation. | `stac-fastapi-` | Optional | -| `STAC_FASTAPI_DESCRIPTION` | Description of the API in the documentation. | N/A | Optional | -| `STAC_FASTAPI_VERSION` | API version. | `2.1` | Optional | -| `STAC_FASTAPI_LANDING_PAGE_ID` | Landing page ID | `stac-fastapi` | Optional | -| `APP_HOST` | Server bind address. | `0.0.0.0` | Optional | -| `APP_PORT` | Server port. | `8080` | Optional | -| `ENVIRONMENT` | Runtime environment. | `local` | Optional | -| `WEB_CONCURRENCY` | Number of worker processes. | `10` | Optional | -| `RELOAD` | Enable auto-reload for development. | `true` | Optional | -| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional | -| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional | -| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | -| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional | -| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | -| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional | -| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional | -| `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. If set to `false`, the POST `/collections` route and related transaction endpoints (including bulk transaction operations) will be unavailable in the API. This is useful for deployments where mutating the catalog via the API should be prevented. | `true` | Optional | +| `STAC_FASTAPI_TITLE` | Title of the API in the documentation. | `stac-fastapi-` | Optional | +| `STAC_FASTAPI_DESCRIPTION` | Description of the API in the documentation. | N/A | Optional | +| `STAC_FASTAPI_VERSION` | API version. | `2.1` | Optional | +| `STAC_FASTAPI_LANDING_PAGE_ID` | Landing page ID | `stac-fastapi` | Optional | +| `APP_HOST` | Server bind address. | `0.0.0.0` | Optional | +| `APP_PORT` | Server port. | `8080` | Optional | +| `ENVIRONMENT` | Runtime environment. | `local` | Optional | +| `WEB_CONCURRENCY` | Number of worker processes. | `10` | Optional | +| `RELOAD` | Enable auto-reload for development. | `true` | Optional | +| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional | +| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional | +| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | | +| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional +| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional +| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional | +| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional | +| `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. If set to `false`, the POST `/collections` route and related transaction endpoints (including bulk transaction operations) will be unavailable in the API. This is useful for deployments where mutating the catalog via the API should be prevented. | `true` | Optional | +| `ENABLE_DATETIME_INDEX_FILTERING` | Enable datetime-based index selection using collection IDs. Requires indexes in format: STAC_ITEMS_INDEX_PREFIX_collection-id_start_year-start_month-start_day-end_year-end_month-end_day, e.g. items_sentinel-2-l2a_2025-06-06-2025-09-22. | `false` | Optional | +| `DATETIME_INDEX_MAX_SIZE_GB` | Maximum size limit in GB for datetime-based indexes. When an index exceeds this size, a new time-partitioned index will be created. Note: This value should account for ~25% overhead due to OS/ES caching of data structures and metadata. Only applies when`ENABLE_DATETIME_INDEX_FILTERING` is enabled. | `25` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, `ES_VERIFY_CERTS` and `ES_TIMEOUT` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/compose.yml b/compose.yml index ba1ac57d..af5c1998 100644 --- a/compose.yml +++ b/compose.yml @@ -72,6 +72,7 @@ services: hostname: elasticsearch environment: ES_JAVA_OPTS: -Xms512m -Xmx1g + action.destructive_requires_name: false volumes: - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots @@ -86,6 +87,7 @@ services: - discovery.type=single-node - plugins.security.disabled=true - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m + - action.destructive_requires_name=false volumes: - ./opensearch/config/opensearch.yml:/usr/share/opensearch/config/opensearch.yml - ./opensearch/snapshots:/usr/share/opensearch/snapshots diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 8d1f472b..61debf2c 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -37,6 +37,7 @@ BulkTransactionMethod, Items, ) +from stac_fastapi.sfeos_helpers.database import return_date from stac_fastapi.types import stac as stac_types from stac_fastapi.types.conformance import BASE_CONFORMANCE_CLASSES from stac_fastapi.types.core import AsyncBaseCoreClient @@ -324,10 +325,15 @@ async def item_collection( search=search, collection_ids=[collection_id] ) - if datetime: + try: + datetime_search = return_date(datetime) search = self.database.apply_datetime_filter( - search=search, interval=datetime + search=search, datetime_search=datetime_search ) + except (ValueError, TypeError) as e: + # Handle invalid interval formats if return_date fails + logger.error(f"Invalid interval format: {datetime}, error: {e}") + datetime_search = None if bbox: bbox = [float(x) for x in bbox] @@ -342,6 +348,7 @@ async def item_collection( sort=None, token=token, collection_ids=[collection_id], + datetime_search=datetime_search, ) items = [ @@ -500,10 +507,17 @@ async def post_search( search=search, collection_ids=search_request.collections ) - if search_request.datetime: + try: + datetime_search = return_date(search_request.datetime) search = self.database.apply_datetime_filter( - search=search, interval=search_request.datetime + search=search, datetime_search=datetime_search + ) + except (ValueError, TypeError) as e: + # Handle invalid interval formats if return_date fails + logger.error( + f"Invalid interval format: {search_request.datetime}, error: {e}" ) + datetime_search = None if search_request.bbox: bbox = search_request.bbox @@ -560,6 +574,7 @@ async def post_search( token=search_request.token, sort=sort, collection_ids=search_request.collections, + datetime_search=datetime_search, ) fields = ( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 195950f3..7b6abdf6 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -4,7 +4,7 @@ import logging from base64 import urlsafe_b64decode, urlsafe_b64encode from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type import attr import elasticsearch.helpers as helpers @@ -36,11 +36,8 @@ get_queryables_mapping_shared, index_alias_by_collection_id, index_by_collection_id, - indices, - mk_actions, mk_item_id, populate_sort_shared, - return_date, validate_refresh, ) from stac_fastapi.sfeos_helpers.database.utils import ( @@ -55,9 +52,14 @@ ITEMS_INDEX_PREFIX, Geometry, ) +from stac_fastapi.sfeos_helpers.search_engine import ( + BaseIndexInserter, + IndexInsertionFactory, + IndexSelectionStrategy, + IndexSelectorFactory, +) from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.links import resolve_links -from stac_fastapi.types.rfc3339 import DateTimeType from stac_fastapi.types.stac import Collection, Item logger = logging.getLogger(__name__) @@ -135,6 +137,10 @@ class DatabaseLogic(BaseDatabaseLogic): sync_settings: SyncElasticsearchSettings = attr.ib( factory=SyncElasticsearchSettings ) + async_index_selector: IndexSelectionStrategy = attr.ib(init=False) + sync_index_selector: IndexSelectionStrategy = attr.ib(init=False) + async_index_inserter: BaseIndexInserter = attr.ib(init=False) + sync_index_inserter: BaseIndexInserter = attr.ib(init=False) client = attr.ib(init=False) sync_client = attr.ib(init=False) @@ -143,6 +149,18 @@ def __attrs_post_init__(self): """Initialize clients after the class is instantiated.""" self.client = self.async_settings.create_client self.sync_client = self.sync_settings.create_client + self.async_index_inserter = ( + IndexInsertionFactory.create_async_insertion_strategy(self.client) + ) + self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy( + self.sync_client + ) + self.async_index_selector = IndexSelectorFactory.create_async_selector( + self.client + ) + self.sync_index_selector = IndexSelectorFactory.create_sync_selector( + self.sync_client + ) item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) collection_serializer: Type[CollectionSerializer] = attr.ib( @@ -256,30 +274,18 @@ def apply_collections_filter(search: Search, collection_ids: List[str]): @staticmethod def apply_datetime_filter( - search: Search, interval: Optional[Union[DateTimeType, str]] + search: Search, datetime_search: Dict[str, Optional[str]] ) -> Search: """Apply a filter to search on datetime, start_datetime, and end_datetime fields. Args: search: The search object to filter. - interval: Optional datetime interval to filter by. Can be: - - A single datetime string (e.g., "2023-01-01T12:00:00") - - A datetime range string (e.g., "2023-01-01/2023-12-31") - - A datetime object - - A tuple of (start_datetime, end_datetime) + datetime_search: Dict[str, Optional[str]] Returns: The filtered search object. """ - if not interval: - return search - - should = [] - try: - datetime_search = return_date(interval) - except (ValueError, TypeError) as e: - # Handle invalid interval formats if return_date fails - logger.error(f"Invalid interval format: {interval}, error: {e}") + if not datetime_search: return search if "eq" in datetime_search: @@ -489,6 +495,7 @@ async def execute_search( token: Optional[str], sort: Optional[Dict[str, Dict[str, str]]], collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], ignore_unavailable: bool = True, ) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: """Execute a search query with limit and other optional parameters. @@ -499,6 +506,7 @@ async def execute_search( token (Optional[str]): The token used to return the next set of results. sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted. collection_ids (Optional[List[str]]): The collection ids to search. + datetime_search (Dict[str, Optional[str]]): Datetime range used for index selection. ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True. Returns: @@ -519,7 +527,9 @@ async def execute_search( query = search.query.to_dict() if search.query else None - index_param = indices(collection_ids) + index_param = await self.async_index_selector.select_indexes( + collection_ids, datetime_search + ) max_result_window = MAX_LIMIT @@ -583,6 +593,7 @@ async def aggregate( geometry_geohash_grid_precision: int, geometry_geotile_grid_precision: int, datetime_frequency_interval: str, + datetime_search, ignore_unavailable: Optional[bool] = True, ): """Return aggregations of STAC Items.""" @@ -618,7 +629,10 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict: if k in aggregations } - index_param = indices(collection_ids) + index_param = await self.async_index_selector.select_indexes( + collection_ids, datetime_search + ) + search_task = asyncio.create_task( self.client.search( index=index_param, @@ -816,9 +830,12 @@ async def create_item( item=item, base_url=base_url, exist_ok=exist_ok ) + target_index = await self.async_index_inserter.get_target_index( + collection_id, item + ) # Index the item in the database await self.client.index( - index=index_alias_by_collection_id(collection_id), + index=target_index, id=mk_item_id(item_id, collection_id), document=item, refresh=refresh, @@ -983,9 +1000,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): try: # Perform the delete operation - await self.client.delete( + await self.client.delete_by_query( index=index_alias_by_collection_id(collection_id), - id=mk_item_id(item_id, collection_id), + body={"query": {"term": {"_id": mk_item_id(item_id, collection_id)}}}, refresh=refresh, ) except ESNotFoundError: @@ -1085,8 +1102,10 @@ async def create_collection(self, collection: Collection, **kwargs: Any): refresh=refresh, ) - # Create the item index for the collection - await create_item_index(collection_id) + if self.async_index_inserter.should_create_collection_index(): + await self.async_index_inserter.create_simple_index( + self.client, collection_id + ) async def find_collection(self, collection_id: str) -> Collection: """Find and return a collection from the database. @@ -1360,9 +1379,12 @@ async def bulk_async( # Perform the bulk insert raise_on_error = self.async_settings.raise_on_bulk_error + actions = await self.async_index_inserter.prepare_bulk_actions( + collection_id, processed_items + ) success, errors = await helpers.async_bulk( self.client, - mk_actions(collection_id, processed_items), + actions, refresh=refresh, raise_on_error=raise_on_error, ) @@ -1426,9 +1448,12 @@ def bulk_sync( # Perform the bulk insert raise_on_error = self.sync_settings.raise_on_bulk_error + actions = self.sync_index_inserter.prepare_bulk_actions( + collection_id, processed_items + ) success, errors = helpers.bulk( self.sync_client, - mk_actions(collection_id, processed_items), + actions, refresh=refresh, raise_on_error=raise_on_error, ) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index e4c88d85..17e8efb4 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -4,7 +4,7 @@ import logging from base64 import urlsafe_b64decode, urlsafe_b64encode from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type import attr import orjson @@ -34,12 +34,8 @@ delete_item_index_shared, get_queryables_mapping_shared, index_alias_by_collection_id, - index_by_collection_id, - indices, - mk_actions, mk_item_id, populate_sort_shared, - return_date, validate_refresh, ) from stac_fastapi.sfeos_helpers.database.utils import ( @@ -51,15 +47,18 @@ COLLECTIONS_INDEX, DEFAULT_SORT, ES_COLLECTIONS_MAPPINGS, - ES_ITEMS_MAPPINGS, - ES_ITEMS_SETTINGS, ITEM_INDICES, ITEMS_INDEX_PREFIX, Geometry, ) +from stac_fastapi.sfeos_helpers.search_engine import ( + BaseIndexInserter, + IndexInsertionFactory, + IndexSelectionStrategy, + IndexSelectorFactory, +) from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.links import resolve_links -from stac_fastapi.types.rfc3339 import DateTimeType from stac_fastapi.types.stac import Collection, Item logger = logging.getLogger(__name__) @@ -100,33 +99,6 @@ async def create_collection_index() -> None: await client.close() -async def create_item_index(collection_id: str) -> None: - """ - Create the index for Items. The settings of the index template will be used implicitly. - - Args: - collection_id (str): Collection identifier. - - Returns: - None - - """ - client = AsyncSearchSettings().create_client - - index_name = f"{index_by_collection_id(collection_id)}-000001" - exists = await client.indices.exists(index=index_name) - if not exists: - await client.indices.create( - index=index_name, - body={ - "aliases": {index_alias_by_collection_id(collection_id): {}}, - "mappings": ES_ITEMS_MAPPINGS, - "settings": ES_ITEMS_SETTINGS, - }, - ) - await client.close() - - async def delete_item_index(collection_id: str) -> None: """Delete the index for items in a collection. @@ -148,6 +120,11 @@ class DatabaseLogic(BaseDatabaseLogic): async_settings: AsyncSearchSettings = attr.ib(factory=AsyncSearchSettings) sync_settings: SyncSearchSettings = attr.ib(factory=SyncSearchSettings) + async_index_selector: IndexSelectionStrategy = attr.ib(init=False) + sync_index_selector: IndexSelectionStrategy = attr.ib(init=False) + async_index_inserter: BaseIndexInserter = attr.ib(init=False) + sync_index_inserter: BaseIndexInserter = attr.ib(init=False) + client = attr.ib(init=False) sync_client = attr.ib(init=False) @@ -155,6 +132,18 @@ def __attrs_post_init__(self): """Initialize clients after the class is instantiated.""" self.client = self.async_settings.create_client self.sync_client = self.sync_settings.create_client + self.async_index_inserter = ( + IndexInsertionFactory.create_async_insertion_strategy(self.client) + ) + self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy( + self.sync_client + ) + self.async_index_selector = IndexSelectorFactory.create_async_selector( + self.client + ) + self.sync_index_selector = IndexSelectorFactory.create_sync_selector( + self.sync_client + ) item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) collection_serializer: Type[CollectionSerializer] = attr.ib( @@ -292,30 +281,18 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str] @staticmethod def apply_datetime_filter( - search: Search, interval: Optional[Union[DateTimeType, str]] + search: Search, datetime_search: Dict[str, Optional[str]] ) -> Search: """Apply a filter to search on datetime, start_datetime, and end_datetime fields. Args: search: The search object to filter. - interval: Optional datetime interval to filter by. Can be: - - A single datetime string (e.g., "2023-01-01T12:00:00") - - A datetime range string (e.g., "2023-01-01/2023-12-31") - - A datetime object - - A tuple of (start_datetime, end_datetime) + datetime_search: Dict[str, Optional[str]] Returns: The filtered search object. """ - if not interval: - return search - - should = [] - try: - datetime_search = return_date(interval) - except (ValueError, TypeError) as e: - # Handle invalid interval formats if return_date fails - logger.error(f"Invalid interval format: {interval}, error: {e}") + if not datetime_search: return search if "eq" in datetime_search: @@ -507,6 +484,7 @@ async def execute_search( token: Optional[str], sort: Optional[Dict[str, Dict[str, str]]], collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], ignore_unavailable: bool = True, ) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: """Execute a search query with limit and other optional parameters. @@ -517,6 +495,7 @@ async def execute_search( token (Optional[str]): The token used to return the next set of results. sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted. collection_ids (Optional[List[str]]): The collection ids to search. + datetime_search (Dict[str, Optional[str]]): Datetime range used for index selection. ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True. Returns: @@ -544,7 +523,9 @@ async def execute_search( search_body["sort"] = sort if sort else DEFAULT_SORT - index_param = indices(collection_ids) + index_param = await self.async_index_selector.select_indexes( + collection_ids, datetime_search + ) max_result_window = MAX_LIMIT @@ -606,6 +587,7 @@ async def aggregate( geometry_geohash_grid_precision: int, geometry_geotile_grid_precision: int, datetime_frequency_interval: str, + datetime_search, ignore_unavailable: Optional[bool] = True, ): """Return aggregations of STAC Items.""" @@ -639,7 +621,10 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict: if k in aggregations } - index_param = indices(collection_ids) + index_param = await self.async_index_selector.select_indexes( + collection_ids, datetime_search + ) + search_task = asyncio.create_task( self.client.search( index=index_param, @@ -832,8 +817,13 @@ async def create_item( item = await self.async_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) + + target_index = await self.async_index_inserter.get_target_index( + collection_id, item + ) + await self.client.index( - index=index_alias_by_collection_id(collection_id), + index=target_index, id=mk_item_id(item_id, collection_id), body=item, refresh=refresh, @@ -992,9 +982,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): ) try: - await self.client.delete( + await self.client.delete_by_query( index=index_alias_by_collection_id(collection_id), - id=mk_item_id(item_id, collection_id), + body={"query": {"term": {"_id": mk_item_id(item_id, collection_id)}}}, refresh=refresh, ) except exceptions.NotFoundError: @@ -1085,8 +1075,10 @@ async def create_collection(self, collection: Collection, **kwargs: Any): body=collection, refresh=refresh, ) - - await create_item_index(collection_id) + if self.async_index_inserter.should_create_collection_index(): + await self.async_index_inserter.create_simple_index( + self.client, collection_id + ) async def find_collection(self, collection_id: str) -> Collection: """Find and return a collection from the database. @@ -1295,6 +1287,7 @@ async def delete_collection(self, collection_id: str, **kwargs: Any): await self.client.delete( index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh ) + # Delete the item index for the collection await delete_item_index(collection_id) async def bulk_async( @@ -1348,9 +1341,13 @@ async def bulk_async( return 0, [] raise_on_error = self.async_settings.raise_on_bulk_error + actions = await self.async_index_inserter.prepare_bulk_actions( + collection_id, processed_items + ) + success, errors = await helpers.async_bulk( self.client, - mk_actions(collection_id, processed_items), + actions, refresh=refresh, raise_on_error=raise_on_error, ) @@ -1411,9 +1408,13 @@ def bulk_sync( return 0, [] raise_on_error = self.sync_settings.raise_on_bulk_error + actions = self.sync_index_inserter.prepare_bulk_actions( + collection_id, processed_items + ) + success, errors = helpers.bulk( self.sync_client, - mk_actions(collection_id, processed_items), + actions, refresh=refresh, raise_on_error=raise_on_error, ) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py index 1f335245..641c81f1 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py @@ -21,6 +21,7 @@ Aggregation, AggregationCollection, ) +from stac_fastapi.sfeos_helpers.database import return_date from stac_fastapi.types.rfc3339 import DateTimeType from .format import frequency_agg, metric_agg @@ -312,9 +313,10 @@ async def aggregate( search=search, item_ids=aggregate_request.ids ) + datetime_search = return_date(aggregate_request.datetime) if aggregate_request.datetime: search = self.database.apply_datetime_filter( - search=search, interval=aggregate_request.datetime + search=search, datetime_search=datetime_search ) if aggregate_request.bbox: @@ -414,6 +416,7 @@ async def aggregate( geometry_geohash_grid_precision, geometry_geotile_grid_precision, datetime_frequency_interval, + datetime_search, ) except Exception as error: if not isinstance(error, IndexError): diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py index 31bf28d8..72d0de0d 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py @@ -30,11 +30,12 @@ """ # Re-export all functions for backward compatibility -from .datetime import return_date -from .document import mk_actions, mk_item_id +from .datetime import extract_date, extract_first_date_from_index, return_date +from .document import mk_item_id from .index import ( create_index_templates_shared, delete_item_index_shared, + filter_indexes_by_datetime, index_alias_by_collection_id, index_by_collection_id, indices, @@ -53,6 +54,7 @@ "delete_item_index_shared", "index_alias_by_collection_id", "index_by_collection_id", + "filter_indexes_by_datetime", "indices", # Query operations "apply_free_text_filter_shared", @@ -62,10 +64,11 @@ "get_queryables_mapping_shared", # Document operations "mk_item_id", - "mk_actions", # Utility functions "validate_refresh", "get_bool_env", # Datetime utilities "return_date", + "extract_date", + "extract_first_date_from_index", ] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py index 352ed4b5..63abf313 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py @@ -3,7 +3,8 @@ This module provides datetime utility functions specifically designed for Elasticsearch and OpenSearch query formatting. """ - +import re +from datetime import date from datetime import datetime as datetime_type from typing import Dict, Optional, Union @@ -39,8 +40,14 @@ def return_date( if isinstance(interval, str): if "/" in interval: parts = interval.split("/") - result["gte"] = parts[0] if parts[0] != ".." else None - result["lte"] = parts[1] if len(parts) > 1 and parts[1] != ".." else None + result["gte"] = ( + parts[0] if parts[0] != ".." else datetime_type.min.isoformat() + "Z" + ) + result["lte"] = ( + parts[1] + if len(parts) > 1 and parts[1] != ".." + else datetime_type.max.isoformat() + "Z" + ) else: converted_time = interval if interval != ".." else None result["gte"] = result["lte"] = converted_time @@ -58,3 +65,34 @@ def return_date( result["lte"] = end.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" return result + + +def extract_date(date_str: str) -> date: + """Extract date from ISO format string. + + Args: + date_str: ISO format date string + + Returns: + A date object extracted from the input string. + """ + date_str = date_str.replace("Z", "+00:00") + return datetime_type.fromisoformat(date_str).date() + + +def extract_first_date_from_index(index_name: str) -> date: + """Extract the first date from an index name containing date patterns. + + Searches for date patterns (YYYY-MM-DD) within the index name string + and returns the first found date as a date object. + + Args: + index_name: Index name containing date patterns. + + Returns: + A date object extracted from the first date pattern found in the index name. + + """ + date_pattern = r"\d{4}-\d{2}-\d{2}" + match = re.search(date_pattern, index_name) + return datetime_type.strptime(match.group(0), "%Y-%m-%d").date() diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py index 0ba0e025..3cc2f14c 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py @@ -4,11 +4,6 @@ including document ID generation and bulk action creation. """ -from typing import Any, Dict, List - -from stac_fastapi.sfeos_helpers.database.index import index_alias_by_collection_id -from stac_fastapi.types.stac import Item - def mk_item_id(item_id: str, collection_id: str) -> str: """Create the document id for an Item in Elasticsearch. @@ -21,28 +16,3 @@ def mk_item_id(item_id: str, collection_id: str) -> str: str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. """ return f"{item_id}|{collection_id}" - - -def mk_actions(collection_id: str, processed_items: List[Item]) -> List[Dict[str, Any]]: - """Create Elasticsearch bulk actions for a list of processed items. - - Args: - collection_id (str): The identifier for the collection the items belong to. - processed_items (List[Item]): The list of processed items to be bulk indexed. - - Returns: - List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, - each action being a dictionary with the following keys: - - `_index`: the index to store the document in. - - `_id`: the document's identifier. - - `_source`: the source of the document. - """ - index_alias = index_alias_by_collection_id(collection_id) - return [ - { - "_index": index_alias, - "_id": mk_item_id(item["id"], item["collection"]), - "_source": item, - } - for item in processed_items - ] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py index 3305f50f..52e43a4a 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py @@ -2,10 +2,13 @@ This module provides functions for creating and managing indices in Elasticsearch/OpenSearch. """ - +import re +from datetime import datetime from functools import lru_cache from typing import Any, List, Optional +from dateutil.parser import parse # type: ignore[import] + from stac_fastapi.sfeos_helpers.mappings import ( _ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE, COLLECTIONS_INDEX, @@ -66,6 +69,59 @@ def indices(collection_ids: Optional[List[str]]) -> str: ) +def filter_indexes_by_datetime( + indexes: List[str], gte: Optional[str], lte: Optional[str] +) -> List[str]: + """Filter indexes based on datetime range extracted from index names. + + Args: + indexes: List of index names containing dates + gte: Greater than or equal date filter (ISO format, optional 'Z' suffix) + lte: Less than or equal date filter (ISO format, optional 'Z' suffix) + + Returns: + List of filtered index names + """ + + def parse_datetime(dt_str: str) -> datetime: + """Parse datetime string, handling both with and without 'Z' suffix.""" + return parse(dt_str).replace(tzinfo=None) + + def extract_date_range_from_index(index_name: str) -> tuple: + """Extract start and end dates from index name.""" + date_pattern = r"(\d{4}-\d{2}-\d{2})" + dates = re.findall(date_pattern, index_name) + + if len(dates) == 1: + start_date = datetime.strptime(dates[0], "%Y-%m-%d") + max_date = datetime.max.replace(microsecond=0) + return start_date, max_date + else: + start_date = datetime.strptime(dates[0], "%Y-%m-%d") + end_date = datetime.strptime(dates[1], "%Y-%m-%d") + return start_date, end_date + + def is_index_in_range( + start_date: datetime, end_date: datetime, gte_dt: datetime, lte_dt: datetime + ) -> bool: + """Check if index date range overlaps with filter range.""" + return not ( + end_date.date() < gte_dt.date() or start_date.date() > lte_dt.date() + ) + + gte_dt = parse_datetime(gte) if gte else datetime.min.replace(microsecond=0) + lte_dt = parse_datetime(lte) if lte else datetime.max.replace(microsecond=0) + + filtered_indexes = [] + + for index in indexes: + start_date, end_date = extract_date_range_from_index(index) + if is_index_in_range(start_date, end_date, gte_dt, lte_dt): + filtered_indexes.append(index) + + return filtered_indexes + + async def create_index_templates_shared(settings: Any) -> None: """Create index templates for Elasticsearch/OpenSearch Collection and Item indices. @@ -120,11 +176,11 @@ async def delete_item_index_shared(settings: Any, collection_id: str) -> None: client = settings.create_client name = index_alias_by_collection_id(collection_id) - resolved = await client.indices.resolve_index(name=name) + resolved = await client.indices.resolve_index(name=name, ignore=[404]) if "aliases" in resolved and resolved["aliases"]: [alias] = resolved["aliases"] await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) await client.indices.delete(index=alias["indices"]) else: - await client.indices.delete(index=name) + await client.indices.delete(index=name, ignore=[404]) await client.close() diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/__init__.py new file mode 100644 index 00000000..2a8ec33a --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/__init__.py @@ -0,0 +1,44 @@ +"""Search engine index management package.""" + +from .adapters import ( + ElasticsearchAdapter, + OpenSearchAdapter, + SearchEngineAdapter, + SearchEngineAdapterFactory, +) +from .async_inserters import AsyncDatetimeIndexInserter, AsyncSimpleIndexInserter +from .base import BaseAsyncIndexInserter, BaseIndexInserter, BaseSyncIndexInserter +from .factory import IndexInsertionFactory +from .managers import DatetimeIndexManager, IndexSizeManager +from .selection import ( + AsyncDatetimeBasedIndexSelector, + IndexSelectionStrategy, + IndexSelectorFactory, + SyncDatetimeBasedIndexSelector, + UnfilteredIndexSelector, +) +from .sync_inserters import SyncDatetimeIndexInserter, SyncSimpleIndexInserter +from .types import SearchEngineType + +__all__ = [ + "SearchEngineType", + "BaseIndexInserter", + "BaseAsyncIndexInserter", + "BaseSyncIndexInserter", + "SearchEngineAdapter", + "ElasticsearchAdapter", + "OpenSearchAdapter", + "SearchEngineAdapterFactory", + "IndexSizeManager", + "DatetimeIndexManager", + "AsyncDatetimeIndexInserter", + "AsyncSimpleIndexInserter", + "SyncDatetimeIndexInserter", + "SyncSimpleIndexInserter", + "IndexInsertionFactory", + "IndexSelectionStrategy", + "AsyncDatetimeBasedIndexSelector", + "SyncDatetimeBasedIndexSelector", + "UnfilteredIndexSelector", + "IndexSelectorFactory", +] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/adapters.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/adapters.py new file mode 100644 index 00000000..a04c7cc4 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/adapters.py @@ -0,0 +1,375 @@ +"""Search engine adapters for different implementations.""" + +from abc import ABC, abstractmethod +from typing import Any, Dict + +from stac_fastapi.sfeos_helpers.database import ( + index_alias_by_collection_id, + index_by_collection_id, +) +from stac_fastapi.sfeos_helpers.mappings import ( + _ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE, + ES_ITEMS_MAPPINGS, + ES_ITEMS_SETTINGS, + ITEMS_INDEX_PREFIX, +) + +from .types import SearchEngineType + + +class SearchEngineAdapter(ABC): + """Abstract base class for search engine adapters.""" + + @abstractmethod + async def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index for the given collection. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + pass + + @abstractmethod + async def create_datetime_index( + self, client: Any, collection_id: str, start_date: str + ) -> str: + """Create a datetime-based index for the given collection. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Created index alias name. + """ + pass + + @abstractmethod + def create_simple_index_sync(self, sync_client: Any, collection_id: str) -> str: + """Create a simple index synchronously. + + Args: + sync_client: Synchronous search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + pass + + @abstractmethod + def create_datetime_index_sync( + self, sync_client: Any, collection_id: str, start_date: str + ) -> str: + """Create a datetime-based index synchronously. + + Args: + sync_client: Synchronous search engine client instance. + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Created index alias name. + """ + pass + + @staticmethod + async def update_index_alias(client: Any, end_date: str, old_alias: str) -> str: + """Update index alias with new end date. + + Args: + client: Search engine client instance. + end_date (str): End date for the alias. + old_alias (str): Current alias name. + + Returns: + str: New alias name. + """ + index = ITEMS_INDEX_PREFIX + old_alias + new_alias = f"{old_alias}-{end_date}" + + await client.indices.update_aliases( + body={ + "actions": [ + {"remove": {"index": index, "alias": old_alias}}, + {"add": {"index": index, "alias": new_alias}}, + ] + } + ) + return new_alias + + @staticmethod + def update_index_alias_sync(client: Any, end_date: str, old_alias: str) -> str: + """Update index alias synchronously. + + Args: + client: Search engine client instance. + end_date (str): End date for the alias. + old_alias (str): Current alias name. + + Returns: + str: New alias name. + """ + index = ITEMS_INDEX_PREFIX + old_alias + new_alias = f"{old_alias}-{end_date}" + + client.indices.update_aliases( + body={ + "actions": [ + {"remove": {"index": index, "alias": old_alias}}, + {"add": {"index": index, "alias": new_alias}}, + ] + } + ) + return new_alias + + @staticmethod + def create_index_name(collection_id: str, start_date: str) -> str: + """Create index name from collection ID and start date. + + Args: + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Formatted index name. + """ + cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) + return f"{ITEMS_INDEX_PREFIX}{cleaned.lower()}_{start_date}" + + +class ElasticsearchAdapter(SearchEngineAdapter): + """Elasticsearch-specific adapter implementation.""" + + async def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index for Elasticsearch. + + Args: + client: Elasticsearch client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + index_name = f"{index_by_collection_id(collection_id)}-000001" + alias_name = index_alias_by_collection_id(collection_id) + + await client.options(ignore_status=400).indices.create( + index=index_name, + body={"aliases": {alias_name: {}}}, + ) + return index_name + + async def create_datetime_index( + self, client: Any, collection_id: str, start_date: str + ) -> str: + """Create a datetime-based index for Elasticsearch. + + Args: + client: Elasticsearch client instance. + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Created index alias name. + """ + index_name = self.create_index_name(collection_id, start_date) + alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX) + collection_alias = index_alias_by_collection_id(collection_id) + + await client.options(ignore_status=400).indices.create( + index=index_name, + body={"aliases": {collection_alias: {}, alias_name: {}}}, + ) + return alias_name + + def create_simple_index_sync(self, sync_client: Any, collection_id: str) -> str: + """Create a simple index for Elasticsearch synchronously. + + Args: + sync_client: Synchronous Elasticsearch client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + index_name = f"{index_by_collection_id(collection_id)}-000001" + alias_name = index_alias_by_collection_id(collection_id) + + sync_client.options(ignore_status=400).indices.create( + index=index_name, + body={"aliases": {alias_name: {}}}, + ) + return index_name + + def create_datetime_index_sync( + self, sync_client: Any, collection_id: str, start_date: str + ) -> str: + """Create a datetime-based index for Elasticsearch synchronously. + + Args: + sync_client: Synchronous Elasticsearch client instance. + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Created index alias name. + """ + index_name = self.create_index_name(collection_id, start_date) + alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX) + collection_alias = index_alias_by_collection_id(collection_id) + + sync_client.options(ignore_status=400).indices.create( + index=index_name, + body={"aliases": {collection_alias: {}, alias_name: {}}}, + ) + return alias_name + + +class OpenSearchAdapter(SearchEngineAdapter): + """OpenSearch-specific adapter implementation.""" + + @staticmethod + def _create_index_body(aliases: Dict[str, Dict]) -> Dict[str, Any]: + """Create index body with common settings. + + Args: + aliases (Dict[str, Dict]): Aliases configuration. + + Returns: + Dict[str, Any]: Index body configuration. + """ + return { + "aliases": aliases, + "mappings": ES_ITEMS_MAPPINGS, + "settings": ES_ITEMS_SETTINGS, + } + + async def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index for OpenSearch. + + Args: + client: OpenSearch client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + index_name = f"{index_by_collection_id(collection_id)}-000001" + alias_name = index_alias_by_collection_id(collection_id) + + await client.indices.create( + index=index_name, + body=self._create_index_body({alias_name: {}}), + params={"ignore": [400]}, + ) + return index_name + + async def create_datetime_index( + self, client: Any, collection_id: str, start_date: str + ) -> str: + """Create a datetime-based index for OpenSearch. + + Args: + client: OpenSearch client instance. + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Created index alias name. + """ + index_name = self.create_index_name(collection_id, start_date) + alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX) + collection_alias = index_alias_by_collection_id(collection_id) + + await client.indices.create( + index=index_name, + body=self._create_index_body({collection_alias: {}, alias_name: {}}), + params={"ignore": [400]}, + ) + return alias_name + + def create_simple_index_sync(self, sync_client: Any, collection_id: str) -> str: + """Create a simple index for OpenSearch synchronously. + + Args: + sync_client: Synchronous OpenSearch client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + index_name = f"{index_by_collection_id(collection_id)}-000001" + alias_name = index_alias_by_collection_id(collection_id) + + sync_client.indices.create( + index=index_name, + body=self._create_index_body({alias_name: {}}), + params={"ignore": [400]}, + ) + return index_name + + def create_datetime_index_sync( + self, sync_client: Any, collection_id: str, start_date: str + ) -> str: + """Create a datetime-based index for OpenSearch synchronously. + + Args: + sync_client: Synchronous OpenSearch client instance. + collection_id (str): Collection identifier. + start_date (str): Start date for the index. + + Returns: + str: Created index alias name. + """ + index_name = self.create_index_name(collection_id, start_date) + alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX) + collection_alias = index_alias_by_collection_id(collection_id) + + sync_client.indices.create( + index=index_name, + body=self._create_index_body({collection_alias: {}, alias_name: {}}), + params={"ignore": [400]}, + ) + return alias_name + + +class SearchEngineAdapterFactory: + """Factory for creating search engine adapters.""" + + @staticmethod + def create_adapter(engine_type: SearchEngineType) -> SearchEngineAdapter: + """Create appropriate adapter based on engine type. + + Args: + engine_type (SearchEngineType): Type of search engine. + + Returns: + SearchEngineAdapter: Adapter instance for the specified engine type. + """ + adapters = { + SearchEngineType.ELASTICSEARCH: ElasticsearchAdapter, + SearchEngineType.OPENSEARCH: OpenSearchAdapter, + } + return adapters[engine_type]() + + @staticmethod + def detect_engine_type(client: Any) -> SearchEngineType: + """Detect engine type from client class name. + + Args: + client: Search engine client instance. + + Returns: + SearchEngineType: Detected engine type. + """ + return ( + SearchEngineType.OPENSEARCH + if "opensearch" in str(client.__class__).lower() + else SearchEngineType.ELASTICSEARCH + ) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/async_inserters.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/async_inserters.py new file mode 100644 index 00000000..b1d19b32 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/async_inserters.py @@ -0,0 +1,346 @@ +"""Async index insertion strategies.""" + +from datetime import timedelta +from typing import Any, Dict, List, Optional + +from stac_fastapi.sfeos_helpers.database import ( + extract_date, + extract_first_date_from_index, + index_alias_by_collection_id, + mk_item_id, +) + +from .adapters import SearchEngineAdapter +from .base import BaseAsyncIndexInserter +from .managers import DatetimeIndexManager +from .selection import AsyncDatetimeBasedIndexSelector + + +class AsyncDatetimeIndexInserter(BaseAsyncIndexInserter): + """Async datetime-based index insertion strategy.""" + + def __init__(self, client: Any, search_adapter: SearchEngineAdapter): + """Initialize the async datetime index inserter. + + Args: + client: Async search engine client instance. + search_adapter (SearchEngineAdapter): Search engine adapter instance. + """ + self.client = client + self.search_adapter = search_adapter + self.datetime_manager = DatetimeIndexManager(client, search_adapter) + + def should_create_collection_index(self) -> bool: + """Whether this strategy requires collection index creation. + + Returns: + bool: False, as datetime strategy doesn't create collection indexes. + """ + return False + + async def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index asynchronously. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + return await self.search_adapter.create_simple_index(client, collection_id) + + async def get_target_index( + self, collection_id: str, product: Dict[str, Any] + ) -> str: + """Get target index for a single product. + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data containing datetime information. + + Returns: + str: Target index name for the product. + """ + index_selector = AsyncDatetimeBasedIndexSelector(self.client) + return await self._get_target_index_internal( + index_selector, collection_id, product, check_size=True + ) + + async def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepare bulk actions for multiple items. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + List[Dict[str, Any]]: List of bulk actions ready for execution. + """ + index_selector = AsyncDatetimeBasedIndexSelector(self.client) + + await self._ensure_indexes_exist(index_selector, collection_id, items) + split_info = await self._handle_index_splitting( + index_selector, collection_id, items + ) + return await self._create_bulk_actions( + index_selector, collection_id, items, split_info + ) + + async def _get_target_index_internal( + self, + index_selector, + collection_id: str, + product: Dict[str, Any], + check_size: bool = True, + ) -> str: + """Get target index with size checking internally. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data. + check_size (bool): Whether to check index size limits. + + Returns: + str: Target index name. + """ + product_datetime = self.datetime_manager._validate_product_datetime(product) + + datetime_range = {"gte": product_datetime, "lte": product_datetime} + target_index = await index_selector.select_indexes( + [collection_id], datetime_range + ) + all_indexes = await index_selector.get_collection_indexes(collection_id) + + if not all_indexes: + target_index = await self.datetime_manager.handle_new_collection( + collection_id, product_datetime + ) + await index_selector.refresh_cache() + return target_index + + all_indexes.sort() + start_date = extract_date(product_datetime) + end_date = extract_first_date_from_index(all_indexes[0]) + + if start_date < end_date: + alias = await self.datetime_manager.handle_early_date( + collection_id, start_date, end_date + ) + await index_selector.refresh_cache() + return alias + + if target_index != all_indexes[-1]: + return target_index + + if check_size and await self.datetime_manager.size_manager.is_index_oversized( + target_index + ): + target_index = await self.datetime_manager.handle_oversized_index( + collection_id, target_index, product_datetime + ) + await index_selector.refresh_cache() + + return target_index + + async def _ensure_indexes_exist( + self, index_selector, collection_id: str, items: List[Dict[str, Any]] + ): + """Ensure necessary indexes exist for the items. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + """ + all_indexes = await index_selector.get_collection_indexes(collection_id) + + if not all_indexes: + first_item = items[0] + await self.search_adapter.create_datetime_index( + self.client, + collection_id, + extract_date(first_item["properties"]["datetime"]), + ) + await index_selector.refresh_cache() + + async def _handle_index_splitting( + self, index_selector, collection_id: str, items: List[Dict[str, Any]] + ): + """Handle potential index splitting due to size limits. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + Optional[Dict]: Split information if splitting occurred, None otherwise. + """ + all_indexes = await index_selector.get_collection_indexes(collection_id) + all_indexes.sort() + latest_index = all_indexes[-1] + + first_item = items[0] + first_item_index = await self._get_target_index_internal( + index_selector, collection_id, first_item, check_size=False + ) + + if first_item_index != latest_index: + return None + + if await self.datetime_manager.size_manager.is_index_oversized( + first_item_index + ): + return await self._create_new_index_for_split( + collection_id, first_item_index, first_item + ) + + return None + + async def _create_new_index_for_split( + self, collection_id: str, latest_index: str, first_item: Dict[str, Any] + ): + """Create new index for splitting oversized index. + + Args: + collection_id (str): Collection identifier. + latest_index (str): Current latest index name. + first_item (Dict[str, Any]): First item being processed. + + Returns: + Optional[Dict]: Split information with new index details. + """ + current_index_end_date = extract_first_date_from_index(latest_index) + first_item_date = extract_date(first_item["properties"]["datetime"]) + + if first_item_date != current_index_end_date: + await self.search_adapter.update_index_alias( + self.client, str(current_index_end_date), latest_index + ) + next_day_start = current_index_end_date + timedelta(days=1) + new_index = await self.search_adapter.create_datetime_index( + self.client, collection_id, str(next_day_start) + ) + return { + "split_date": current_index_end_date, + "new_index": new_index, + } + return None + + async def _create_bulk_actions( + self, + index_selector, + collection_id: str, + items: List[Dict[str, Any]], + split_info: Optional[Dict], + ) -> List[Dict[str, Any]]: + """Create bulk actions for all items. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + split_info (Optional[Dict]): Split information if applicable. + + Returns: + List[Dict[str, Any]]: List of prepared bulk actions. + """ + actions = [] + + for item in items: + if split_info: + item_date = extract_date(item["properties"]["datetime"]) + if item_date > split_info["split_date"]: + target_index = split_info["new_index"] + else: + target_index = await self._get_target_index_internal( + index_selector, collection_id, item, check_size=False + ) + else: + target_index = await self._get_target_index_internal( + index_selector, collection_id, item, check_size=False + ) + + actions.append( + { + "_index": target_index, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + ) + + return actions + + +class AsyncSimpleIndexInserter(BaseAsyncIndexInserter): + """Simple async index insertion strategy.""" + + def __init__(self, search_adapter: SearchEngineAdapter, client: Any): + """Initialize the async simple index inserter. + + Args: + search_adapter (SearchEngineAdapter): Search engine adapter instance. + client: Async search engine client instance. + """ + self.search_adapter = search_adapter + self.client = client + + def should_create_collection_index(self) -> bool: + """Whether this strategy requires collection index creation. + + Returns: + bool: True, as simple strategy creates collection indexes. + """ + return True + + async def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index asynchronously. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + return await self.search_adapter.create_simple_index(client, collection_id) + + async def get_target_index( + self, collection_id: str, product: Dict[str, Any] + ) -> str: + """Get target index (always the collection alias). + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data (not used in simple strategy). + + Returns: + str: Collection alias name. + """ + return index_alias_by_collection_id(collection_id) + + async def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepare bulk actions for simple indexing. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + List[Dict[str, Any]]: List of bulk actions with collection alias as target. + """ + target_index = index_alias_by_collection_id(collection_id) + return [ + { + "_index": target_index, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in items + ] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/base.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/base.py new file mode 100644 index 00000000..f75a59bf --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/base.py @@ -0,0 +1,139 @@ +"""Base classes for index inserters.""" + +from abc import ABC, abstractmethod +from typing import Any, Awaitable, Dict, List, Union + + +class BaseIndexInserter(ABC): + """Base class for index insertion strategies.""" + + @abstractmethod + def get_target_index( + self, collection_id: str, product: Dict[str, Any] + ) -> Union[str, Awaitable[str]]: + """Get target index for a product. + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data. + + Returns: + Union[str, Awaitable[str]]: Target index name or awaitable. + """ + pass + + @abstractmethod + def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]]: + """Prepare bulk actions for multiple items. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]]: List of bulk actions or awaitable. + """ + pass + + @abstractmethod + def should_create_collection_index(self) -> bool: + """Whether this strategy requires collection index creation. + + Returns: + bool: True if collection index should be created, False otherwise. + """ + pass + + +class BaseAsyncIndexInserter(BaseIndexInserter): + """Base async index inserter with common async methods.""" + + @abstractmethod + async def get_target_index( + self, collection_id: str, product: Dict[str, Any] + ) -> str: + """Get target index for a product asynchronously. + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data. + + Returns: + str: Target index name. + """ + pass + + @abstractmethod + async def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepare bulk actions for multiple items asynchronously. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + List[Dict[str, Any]]: List of bulk actions. + """ + pass + + @abstractmethod + async def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index asynchronously. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + pass + + +class BaseSyncIndexInserter(BaseIndexInserter): + """Base sync index inserter with common sync methods.""" + + @abstractmethod + def get_target_index(self, collection_id: str, product: Dict[str, Any]) -> str: + """Get target index for a product synchronously. + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data. + + Returns: + str: Target index name. + """ + pass + + @abstractmethod + def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepare bulk actions for multiple items synchronously. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + List[Dict[str, Any]]: List of bulk actions. + """ + pass + + @abstractmethod + def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index synchronously. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + pass diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/factory.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/factory.py new file mode 100644 index 00000000..71c9d986 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/factory.py @@ -0,0 +1,58 @@ +"""Factory for creating index insertion strategies.""" + +from typing import Any + +from stac_fastapi.core.utilities import get_bool_env + +from .adapters import SearchEngineAdapterFactory +from .async_inserters import AsyncDatetimeIndexInserter, AsyncSimpleIndexInserter +from .base import BaseIndexInserter +from .sync_inserters import SyncDatetimeIndexInserter, SyncSimpleIndexInserter + + +class IndexInsertionFactory: + """Factory for creating index insertion strategies.""" + + @staticmethod + def create_async_insertion_strategy(client: Any) -> BaseIndexInserter: + """Create async insertion strategy based on configuration. + + Args: + client: Async search engine client instance. + + Returns: + BaseIndexInserter: Configured async insertion strategy. + """ + engine_type = SearchEngineAdapterFactory.detect_engine_type(client) + search_adapter = SearchEngineAdapterFactory.create_adapter(engine_type) + + use_datetime_partitioning = get_bool_env( + "ENABLE_DATETIME_INDEX_FILTERING", default="false" + ) + + if use_datetime_partitioning: + return AsyncDatetimeIndexInserter(client, search_adapter) + else: + return AsyncSimpleIndexInserter(search_adapter, client) + + @staticmethod + def create_sync_insertion_strategy(sync_client: Any) -> BaseIndexInserter: + """Create sync insertion strategy based on configuration. + + Args: + sync_client: Sync search engine client instance. + + Returns: + BaseIndexInserter: Configured sync insertion strategy. + """ + engine_type = SearchEngineAdapterFactory.detect_engine_type(sync_client) + search_adapter = SearchEngineAdapterFactory.create_adapter(engine_type) + + use_datetime_partitioning = get_bool_env( + "ENABLE_DATETIME_INDEX_FILTERING", default="false" + ) + + if use_datetime_partitioning: + return SyncDatetimeIndexInserter(sync_client, search_adapter) + else: + return SyncSimpleIndexInserter(search_adapter, sync_client) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/managers.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/managers.py new file mode 100644 index 00000000..a8e52a3c --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/managers.py @@ -0,0 +1,239 @@ +"""Index management utilities.""" + +import os +from datetime import datetime, timedelta +from typing import Any, Dict + +from fastapi import HTTPException, status + +from stac_fastapi.sfeos_helpers.database import ( + extract_date, + extract_first_date_from_index, +) + +from .adapters import SearchEngineAdapter + + +class IndexSizeManager: + """Manages index size limits and operations.""" + + def __init__(self, client: Any): + """Initialize the index size manager. + + Args: + client: Search engine client instance. + """ + self.client = client + self.max_size_gb = float(os.getenv("DATETIME_INDEX_MAX_SIZE_GB", "25")) + + async def get_index_size_in_gb(self, index_name: str) -> float: + """Get index size in gigabytes asynchronously. + + Args: + index_name (str): Name of the index to check. + + Returns: + float: Size of the index in gigabytes. + """ + data = await self.client.indices.stats(index=index_name) + return data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9 + + def get_index_size_in_gb_sync(self, index_name: str) -> float: + """Get index size in gigabytes synchronously. + + Args: + index_name (str): Name of the index to check. + + Returns: + float: Size of the index in gigabytes. + """ + data = self.client.indices.stats(index=index_name) + return data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9 + + async def is_index_oversized(self, index_name: str) -> bool: + """Check if index exceeds size limit asynchronously. + + Args: + index_name (str): Name of the index to check. + + Returns: + bool: True if index exceeds size limit, False otherwise. + """ + size_gb = await self.get_index_size_in_gb(index_name) + return size_gb > self.max_size_gb + + def is_index_oversized_sync(self, index_name: str) -> bool: + """Check if index exceeds size limit synchronously. + + Args: + index_name (str): Name of the index to check. + + Returns: + bool: True if index exceeds size limit, False otherwise. + """ + size_gb = self.get_index_size_in_gb_sync(index_name) + return size_gb > self.max_size_gb + + +class DatetimeIndexManager: + """Manages datetime-based index operations.""" + + def __init__(self, client: Any, search_adapter: SearchEngineAdapter): + """Initialize the datetime index manager. + + Args: + client: Search engine client instance. + search_adapter (SearchEngineAdapter): Search engine adapter instance. + """ + self.client = client + self.search_adapter = search_adapter + self.size_manager = IndexSizeManager(client) + + @staticmethod + def _validate_product_datetime(product: Dict[str, Any]) -> str: + """Validate and extract datetime from product. + + Args: + product (Dict[str, Any]): Product data containing datetime information. + + Returns: + str: Validated product datetime. + + Raises: + HTTPException: If product datetime is missing or invalid. + """ + product_datetime = product["properties"]["datetime"] + if not product_datetime: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Product datetime is required for indexing", + ) + return product_datetime + + async def handle_new_collection( + self, collection_id: str, product_datetime: str + ) -> str: + """Handle index creation for new collection asynchronously. + + Args: + collection_id (str): Collection identifier. + product_datetime (str): Product datetime for index naming. + + Returns: + str: Created index name. + """ + target_index = await self.search_adapter.create_datetime_index( + self.client, collection_id, extract_date(product_datetime) + ) + return target_index + + def handle_new_collection_sync( + self, collection_id: str, product_datetime: str + ) -> str: + """Handle index creation for new collection synchronously. + + Args: + collection_id (str): Collection identifier. + product_datetime (str): Product datetime for index naming. + + Returns: + str: Created index name. + """ + target_index = self.search_adapter.create_datetime_index_sync( + self.client, collection_id, extract_date(product_datetime) + ) + return target_index + + async def handle_early_date( + self, collection_id: str, start_date: datetime, end_date: datetime + ) -> str: + """Handle product with date earlier than existing indexes asynchronously. + + Args: + collection_id (str): Collection identifier. + start_date (datetime): Start date for the new index. + end_date (datetime): End date for alias update. + + Returns: + str: Updated alias name. + """ + target_index = await self.search_adapter.create_datetime_index( + self.client, collection_id, str(start_date) + ) + alias = await self.search_adapter.update_index_alias( + self.client, str(end_date - timedelta(days=1)), target_index + ) + return alias + + def handle_early_date_sync( + self, collection_id: str, start_date: datetime, end_date: datetime + ) -> str: + """Handle product with date earlier than existing indexes synchronously. + + Args: + collection_id (str): Collection identifier. + start_date (datetime): Start date for the new index. + end_date (datetime): End date for alias update. + + Returns: + str: Updated alias name. + """ + target_index = self.search_adapter.create_datetime_index_sync( + self.client, collection_id, str(start_date) + ) + alias = self.search_adapter.update_index_alias_sync( + self.client, str(end_date - timedelta(days=1)), target_index + ) + return alias + + async def handle_oversized_index( + self, collection_id: str, target_index: str, product_datetime: str + ) -> str: + """Handle index that exceeds size limit asynchronously. + + Args: + collection_id (str): Collection identifier. + target_index (str): Current target index name. + product_datetime (str): Product datetime for new index. + + Returns: + str: New or updated index name. + """ + end_date = extract_date(product_datetime) + latest_index_start = extract_first_date_from_index(target_index) + + if end_date != latest_index_start: + await self.search_adapter.update_index_alias( + self.client, str(end_date), target_index + ) + target_index = await self.search_adapter.create_datetime_index( + self.client, collection_id, str(end_date + timedelta(days=1)) + ) + + return target_index + + def handle_oversized_index_sync( + self, collection_id: str, target_index: str, product_datetime: str + ) -> str: + """Handle index that exceeds size limit synchronously. + + Args: + collection_id (str): Collection identifier. + target_index (str): Current target index name. + product_datetime (str): Product datetime for new index. + + Returns: + str: New or updated index name. + """ + end_date = extract_date(product_datetime) + latest_index_start = extract_first_date_from_index(target_index) + + if end_date != latest_index_start: + self.search_adapter.update_index_alias_sync( + self.client, str(end_date), target_index + ) + target_index = self.search_adapter.create_datetime_index_sync( + self.client, collection_id, str(end_date + timedelta(days=1)) + ) + + return target_index diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/__init__.py new file mode 100644 index 00000000..c9f2d13c --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/__init__.py @@ -0,0 +1,25 @@ +"""Index selection strategies package.""" + +from .async_selectors import AsyncDatetimeBasedIndexSelector +from .base import BaseAsyncIndexSelector, BaseSyncIndexSelector, IndexSelectionStrategy +from .cache_manager import ( + AsyncIndexAliasLoader, + IndexCacheManager, + SyncIndexAliasLoader, +) +from .factory import IndexSelectorFactory +from .sync_selectors import SyncDatetimeBasedIndexSelector +from .unfiltered_selector import UnfilteredIndexSelector + +__all__ = [ + "IndexSelectionStrategy", + "BaseAsyncIndexSelector", + "BaseSyncIndexSelector", + "IndexCacheManager", + "AsyncIndexAliasLoader", + "SyncIndexAliasLoader", + "AsyncDatetimeBasedIndexSelector", + "SyncDatetimeBasedIndexSelector", + "UnfilteredIndexSelector", + "IndexSelectorFactory", +] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/async_selectors.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/async_selectors.py new file mode 100644 index 00000000..9b945fb3 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/async_selectors.py @@ -0,0 +1,98 @@ +"""Async index selectors with datetime-based filtering.""" + +from typing import Any, Dict, List, Optional + +from stac_fastapi.sfeos_helpers.database import filter_indexes_by_datetime +from stac_fastapi.sfeos_helpers.mappings import ITEM_INDICES + +from .base import BaseAsyncIndexSelector +from .cache_manager import AsyncIndexAliasLoader, IndexCacheManager + + +class AsyncDatetimeBasedIndexSelector(BaseAsyncIndexSelector): + """Asynchronous index selector that filters indices based on datetime criteria with caching.""" + + _instance = None + + def __new__(cls, client): + """Create singleton instance. + + Args: + client: Async search engine client instance. + + Returns: + AsyncDatetimeBasedIndexSelector: Singleton instance. + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, client: Any): + """Initialize the datetime-based index selector. + + Args: + client: Elasticsearch/OpenSearch client instance used for querying + index aliases and metadata. + """ + if not hasattr(self, "_initialized"): + self.cache_manager = IndexCacheManager() + self.alias_loader = AsyncIndexAliasLoader(client, self.cache_manager) + self._initialized = True + + async def refresh_cache(self) -> Dict[str, List[str]]: + """Force refresh of the aliases cache. + + Returns: + Dict[str, List[str]]: Refreshed dictionary mapping base collection aliases + to lists of their corresponding item index aliases. + """ + return await self.alias_loader.refresh_aliases() + + async def get_collection_indexes(self, collection_id: str) -> List[str]: + """Get all index aliases for a specific collection. + + Args: + collection_id (str): The ID of the collection to retrieve indexes for. + + Returns: + List[str]: List of index aliases associated with the collection. + Returns empty list if collection is not found in cache. + """ + return await self.alias_loader.get_collection_indexes(collection_id) + + async def select_indexes( + self, + collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], + ) -> str: + """Select indexes filtered by collection IDs and datetime criteria. + + For each specified collection, retrieves its associated indexes and filters + them based on datetime range. If no collection IDs are provided, returns + all item indices. + + Args: + collection_ids (Optional[List[str]]): List of collection IDs to filter by. + If None or empty, returns all item indices. + datetime_search (Dict[str, Optional[str]]): Dictionary containing datetime + search criteria with 'gte' and 'lte' keys for range filtering. + + Returns: + str: Comma-separated string of selected index names that match the + collection and datetime criteria. Returns empty string if no + indexes match the criteria. + """ + if collection_ids: + selected_indexes = [] + for collection_id in collection_ids: + collection_indexes = await self.get_collection_indexes(collection_id) + filtered_indexes = filter_indexes_by_datetime( + collection_indexes, + datetime_search.get("gte"), + datetime_search.get("lte"), + ) + selected_indexes.extend(filtered_indexes) + + return ",".join(selected_indexes) if selected_indexes else "" + + return ITEM_INDICES diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/base.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/base.py new file mode 100644 index 00000000..ff3c6bae --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/base.py @@ -0,0 +1,70 @@ +"""Base classes for index selection strategies.""" + +from abc import ABC, abstractmethod +from typing import Awaitable, Dict, List, Optional, Union + + +class IndexSelectionStrategy(ABC): + """Abstract base class for index selection strategies.""" + + @abstractmethod + def select_indexes( + self, + collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], + ) -> Union[str, Awaitable[str]]: + """Select appropriate indexes based on collection IDs and datetime criteria. + + Args: + collection_ids (Optional[List[str]]): List of collection IDs to filter by. + If None, all collections are considered. + datetime_search (Dict[str, Optional[str]]): Dictionary containing datetime + search criteria with 'gte' and 'lte' keys for range filtering. + + Returns: + Union[str, Awaitable[str]]: Comma-separated string of selected index names + or awaitable that resolves to such string. + """ + pass + + +class BaseAsyncIndexSelector(IndexSelectionStrategy): + """Base class for async index selectors.""" + + @abstractmethod + async def select_indexes( + self, + collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], + ) -> str: + """Select appropriate indexes asynchronously. + + Args: + collection_ids (Optional[List[str]]): List of collection IDs to filter by. + datetime_search (Dict[str, Optional[str]]): Datetime search criteria. + + Returns: + str: Comma-separated string of selected index names. + """ + pass + + +class BaseSyncIndexSelector(IndexSelectionStrategy): + """Base class for sync index selectors.""" + + @abstractmethod + def select_indexes( + self, + collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], + ) -> str: + """Select appropriate indexes synchronously. + + Args: + collection_ids (Optional[List[str]]): List of collection IDs to filter by. + datetime_search (Dict[str, Optional[str]]): Datetime search criteria. + + Returns: + str: Comma-separated string of selected index names. + """ + pass diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/cache_manager.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/cache_manager.py new file mode 100644 index 00000000..de9dcd75 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/cache_manager.py @@ -0,0 +1,196 @@ +"""Cache management for index selection strategies.""" + +import time +from typing import Any, Dict, List, Optional + +from stac_fastapi.sfeos_helpers.database import index_alias_by_collection_id +from stac_fastapi.sfeos_helpers.mappings import ITEMS_INDEX_PREFIX + + +class IndexCacheManager: + """Manages caching of index aliases with expiration.""" + + def __init__(self, cache_ttl_seconds: int = 3600): + """Initialize the cache manager. + + Args: + cache_ttl_seconds (int): Time-to-live for cache entries in seconds. + """ + self._cache: Optional[Dict[str, List[str]]] = None + self._timestamp: float = 0 + self._ttl = cache_ttl_seconds + + @property + def is_expired(self) -> bool: + """Check if the cache has expired. + + Returns: + bool: True if cache is expired, False otherwise. + """ + return time.time() - self._timestamp > self._ttl + + def get_cache(self) -> Optional[Dict[str, List[str]]]: + """Get the current cache if not expired. + + Returns: + Optional[Dict[str, List[str]]]: Cache data if valid, None if expired. + """ + if self.is_expired: + return None + return self._cache + + def set_cache(self, data: Dict[str, List[str]]) -> None: + """Set cache data and update timestamp. + + Args: + data (Dict[str, List[str]]): Cache data to store. + """ + self._cache = data + self._timestamp = time.time() + + def clear_cache(self) -> None: + """Clear the cache and reset timestamp.""" + self._cache = None + self._timestamp = 0 + + +class AsyncIndexAliasLoader: + """Asynchronous loader for index aliases.""" + + def __init__(self, client: Any, cache_manager: IndexCacheManager): + """Initialize the async alias loader. + + Args: + client: Async search engine client instance. + cache_manager (IndexCacheManager): Cache manager instance. + """ + self.client = client + self.cache_manager = cache_manager + + async def load_aliases(self) -> Dict[str, List[str]]: + """Load index aliases from search engine. + + Returns: + Dict[str, List[str]]: Mapping of base aliases to item aliases. + """ + response = await self.client.indices.get_alias(index=f"{ITEMS_INDEX_PREFIX}*") + result: Dict[str, List[str]] = {} + + for index_info in response.values(): + aliases = index_info.get("aliases", {}) + base_alias = None + items_aliases = [] + + for alias_name in aliases.keys(): + if not alias_name.startswith(ITEMS_INDEX_PREFIX): + items_aliases.append(alias_name) + else: + base_alias = alias_name + + if base_alias and items_aliases: + result.setdefault(base_alias, []).extend(items_aliases) + + self.cache_manager.set_cache(result) + return result + + async def get_aliases(self) -> Dict[str, List[str]]: + """Get aliases from cache or load if expired. + + Returns: + Dict[str, List[str]]: Alias mapping data. + """ + cached = self.cache_manager.get_cache() + if cached is not None: + return cached + return await self.load_aliases() + + async def refresh_aliases(self) -> Dict[str, List[str]]: + """Force refresh aliases from search engine. + + Returns: + Dict[str, List[str]]: Fresh alias mapping data. + """ + return await self.load_aliases() + + async def get_collection_indexes(self, collection_id: str) -> List[str]: + """Get all index aliases for a specific collection. + + Args: + collection_id (str): Collection identifier. + + Returns: + List[str]: List of index aliases for the collection. + """ + aliases = await self.get_aliases() + return aliases.get(index_alias_by_collection_id(collection_id), []) + + +class SyncIndexAliasLoader: + """Synchronous loader for index aliases.""" + + def __init__(self, client: Any, cache_manager: IndexCacheManager): + """Initialize the sync alias loader. + + Args: + client: Sync search engine client instance. + cache_manager (IndexCacheManager): Cache manager instance. + """ + self.client = client + self.cache_manager = cache_manager + + def load_aliases(self) -> Dict[str, List[str]]: + """Load index aliases from search engine synchronously. + + Returns: + Dict[str, List[str]]: Mapping of base aliases to item aliases. + """ + response = self.client.indices.get_alias(index=f"{ITEMS_INDEX_PREFIX}*") + result: Dict[str, List[str]] = {} + + for index_info in response.values(): + aliases = index_info.get("aliases", {}) + base_alias = None + items_aliases = [] + + for alias_name in aliases.keys(): + if not alias_name.startswith(ITEMS_INDEX_PREFIX): + items_aliases.append(alias_name) + else: + base_alias = alias_name + + if base_alias and items_aliases: + result.setdefault(base_alias, []).extend(items_aliases) + + self.cache_manager.set_cache(result) + return result + + def get_aliases(self) -> Dict[str, List[str]]: + """Get aliases from cache or load if expired. + + Returns: + Dict[str, List[str]]: Alias mapping data. + """ + cached = self.cache_manager.get_cache() + if cached is not None: + return cached + return self.load_aliases() + + def refresh_aliases(self) -> Dict[str, List[str]]: + """Force refresh aliases from search engine. + + Returns: + Dict[str, List[str]]: Fresh alias mapping data. + """ + return self.load_aliases() + + def get_collection_indexes(self, collection_id: str) -> List[str]: + """Get all index aliases for a specific collection. + + Args: + collection_id (str): Collection identifier. + + Returns: + List[str]: List of index aliases for the collection. + """ + aliases = self.get_aliases() + return aliases.get(index_alias_by_collection_id(collection_id), []) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/factory.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/factory.py new file mode 100644 index 00000000..11f83605 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/factory.py @@ -0,0 +1,64 @@ +"""Factory for creating index selection strategies.""" + +from typing import Any + +from stac_fastapi.core.utilities import get_bool_env + +from .async_selectors import AsyncDatetimeBasedIndexSelector +from .base import IndexSelectionStrategy +from .sync_selectors import SyncDatetimeBasedIndexSelector +from .unfiltered_selector import UnfilteredIndexSelector + + +class IndexSelectorFactory: + """Factory class for creating index selector instances.""" + + @staticmethod + def create_async_selector(client: Any) -> IndexSelectionStrategy: + """Create an appropriate asynchronous index selector based on environment configuration. + + Checks the ENABLE_DATETIME_INDEX_FILTERING environment variable to determine + whether to use datetime-based filtering or return all available indices. + + Args: + client: Asynchronous Elasticsearch/OpenSearch client instance, used only if datetime + filtering is enabled. + + Returns: + IndexSelectionStrategy: Either an AsyncDatetimeBasedIndexSelector if datetime + filtering is enabled, or an UnfilteredIndexSelector otherwise. + """ + use_datetime_filtering = get_bool_env( + "ENABLE_DATETIME_INDEX_FILTERING", default="false" + ) + + return ( + AsyncDatetimeBasedIndexSelector(client) + if use_datetime_filtering + else UnfilteredIndexSelector() + ) + + @staticmethod + def create_sync_selector(sync_client: Any) -> IndexSelectionStrategy: + """Create an appropriate synchronous index selector based on environment configuration. + + Checks the ENABLE_DATETIME_INDEX_FILTERING environment variable to determine + whether to use datetime-based filtering or return all available indices. + + Args: + sync_client: Synchronous Elasticsearch/OpenSearch client instance, used only if datetime + filtering is enabled. + + Returns: + IndexSelectionStrategy: Either a SyncDatetimeBasedIndexSelector if datetime + filtering is enabled, or an UnfilteredIndexSelector otherwise. + """ + use_datetime_filtering = get_bool_env( + "ENABLE_DATETIME_INDEX_FILTERING", default="false" + ) + + return ( + SyncDatetimeBasedIndexSelector(sync_client) + if use_datetime_filtering + else UnfilteredIndexSelector() + ) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/sync_selectors.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/sync_selectors.py new file mode 100644 index 00000000..6912a587 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/sync_selectors.py @@ -0,0 +1,98 @@ +"""Sync index selectors with datetime-based filtering.""" + +from typing import Any, Dict, List, Optional + +from stac_fastapi.sfeos_helpers.database import filter_indexes_by_datetime +from stac_fastapi.sfeos_helpers.mappings import ITEM_INDICES + +from .base import BaseSyncIndexSelector +from .cache_manager import IndexCacheManager, SyncIndexAliasLoader + + +class SyncDatetimeBasedIndexSelector(BaseSyncIndexSelector): + """Synchronous index selector that filters indices based on datetime criteria with caching.""" + + _instance = None + + def __new__(cls, client): + """Create singleton instance. + + Args: + client: Sync search engine client instance. + + Returns: + SyncDatetimeBasedIndexSelector: Singleton instance. + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, sync_client: Any): + """Initialize the datetime-based index selector. + + Args: + sync_client: Synchronous Elasticsearch/OpenSearch client instance used for querying + index aliases and metadata. + """ + if not hasattr(self, "_initialized"): + self.cache_manager = IndexCacheManager() + self.alias_loader = SyncIndexAliasLoader(sync_client, self.cache_manager) + self._initialized = True + + def refresh_cache(self) -> Dict[str, List[str]]: + """Force refresh of the aliases cache. + + Returns: + Dict[str, List[str]]: Refreshed dictionary mapping base collection aliases + to lists of their corresponding item index aliases. + """ + return self.alias_loader.refresh_aliases() + + def get_collection_indexes(self, collection_id: str) -> List[str]: + """Get all index aliases for a specific collection. + + Args: + collection_id (str): The ID of the collection to retrieve indexes for. + + Returns: + List[str]: List of index aliases associated with the collection. + Returns empty list if collection is not found in cache. + """ + return self.alias_loader.get_collection_indexes(collection_id) + + def select_indexes( + self, + collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], + ) -> str: + """Select indexes filtered by collection IDs and datetime criteria. + + For each specified collection, retrieves its associated indexes and filters + them based on datetime range. If no collection IDs are provided, returns + all item indices. + + Args: + collection_ids (Optional[List[str]]): List of collection IDs to filter by. + If None or empty, returns all item indices. + datetime_search (Dict[str, Optional[str]]): Dictionary containing datetime + search criteria with 'gte' and 'lte' keys for range filtering. + + Returns: + str: Comma-separated string of selected index names that match the + collection and datetime criteria. Returns empty string if no + indexes match the criteria. + """ + if collection_ids: + selected_indexes = [] + for collection_id in collection_ids: + collection_indexes = self.get_collection_indexes(collection_id) + filtered_indexes = filter_indexes_by_datetime( + collection_indexes, + datetime_search.get("gte"), + datetime_search.get("lte"), + ) + selected_indexes.extend(filtered_indexes) + + return ",".join(selected_indexes) if selected_indexes else "" + + return ITEM_INDICES diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/unfiltered_selector.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/unfiltered_selector.py new file mode 100644 index 00000000..0691a94d --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/selection/unfiltered_selector.py @@ -0,0 +1,37 @@ +"""Unfiltered index selector implementation.""" + +from typing import Dict, List, Optional + +from stac_fastapi.sfeos_helpers.database import indices + +from .base import IndexSelectionStrategy + + +class UnfilteredIndexSelector(IndexSelectionStrategy): + """Index selector that returns all available indices without filtering.""" + + async def select_indexes( + self, + collection_ids: Optional[List[str]], + datetime_search: Dict[str, Optional[str]], + ) -> str: + """Select all indices for given collections without datetime filtering. + + Args: + collection_ids (Optional[List[str]]): List of collection IDs to filter by. + If None, all collections are considered. + datetime_search (Dict[str, Optional[str]]): Datetime search criteria + (ignored by this implementation). + + Returns: + str: Comma-separated string of all available index names for the collections. + """ + return indices(collection_ids) + + async def refresh_cache(self): + """Refresh cache (no-op for unfiltered selector). + + Note: + Unfiltered selector doesn't use cache, so this is a no-op operation. + """ + pass diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/sync_inserters.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/sync_inserters.py new file mode 100644 index 00000000..db183878 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/sync_inserters.py @@ -0,0 +1,336 @@ +"""Sync index insertion strategies.""" + +from datetime import timedelta +from typing import Any, Dict, List, Optional + +from stac_fastapi.sfeos_helpers.database import ( + extract_date, + extract_first_date_from_index, + index_alias_by_collection_id, + mk_item_id, +) + +from .adapters import SearchEngineAdapter +from .base import BaseSyncIndexInserter +from .managers import DatetimeIndexManager +from .selection import SyncDatetimeBasedIndexSelector + + +class SyncDatetimeIndexInserter(BaseSyncIndexInserter): + """Sync datetime-based index insertion strategy.""" + + def __init__(self, client: Any, search_adapter: SearchEngineAdapter): + """Initialize the sync datetime index inserter. + + Args: + client: Sync search engine client instance. + search_adapter (SearchEngineAdapter): Search engine adapter instance. + """ + self.client = client + self.search_adapter = search_adapter + self.datetime_manager = DatetimeIndexManager(client, search_adapter) + + def should_create_collection_index(self) -> bool: + """Whether this strategy requires collection index creation. + + Returns: + bool: False, as datetime strategy doesn't create collection indexes. + """ + return False + + def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index synchronously. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + return self.search_adapter.create_simple_index_sync(client, collection_id) + + def get_target_index(self, collection_id: str, product: Dict[str, Any]) -> str: + """Get target index for a single product. + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data containing datetime information. + + Returns: + str: Target index name for the product. + """ + index_selector = SyncDatetimeBasedIndexSelector(self.client) + return self._get_target_index_internal( + index_selector, collection_id, product, check_size=True + ) + + def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepare bulk actions for multiple items. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + List[Dict[str, Any]]: List of bulk actions ready for execution. + """ + index_selector = SyncDatetimeBasedIndexSelector(self.client) + + self._ensure_indexes_exist(index_selector, collection_id, items) + split_info = self._handle_index_splitting(index_selector, collection_id, items) + return self._create_bulk_actions( + index_selector, collection_id, items, split_info + ) + + def _get_target_index_internal( + self, + index_selector, + collection_id: str, + product: Dict[str, Any], + check_size: bool = True, + ) -> str: + """Get target index with size checking internally. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data. + check_size (bool): Whether to check index size limits. + + Returns: + str: Target index name. + """ + product_datetime = self.datetime_manager._validate_product_datetime(product) + + datetime_range = {"gte": product_datetime, "lte": product_datetime} + target_index = index_selector.select_indexes([collection_id], datetime_range) + all_indexes = index_selector.get_collection_indexes(collection_id) + + if not all_indexes: + target_index = self.datetime_manager.handle_new_collection_sync( + collection_id, product_datetime + ) + index_selector.refresh_cache() + return target_index + + all_indexes.sort() + start_date = extract_date(product_datetime) + end_date = extract_first_date_from_index(all_indexes[0]) + + if start_date < end_date: + alias = self.datetime_manager.handle_early_date_sync( + collection_id, start_date, end_date + ) + index_selector.refresh_cache() + return alias + + if target_index != all_indexes[-1]: + return target_index + + if check_size and self.datetime_manager.size_manager.is_index_oversized_sync( + target_index + ): + target_index = self.datetime_manager.handle_oversized_index_sync( + collection_id, target_index, product_datetime + ) + index_selector.refresh_cache() + + return target_index + + def _ensure_indexes_exist( + self, index_selector, collection_id: str, items: List[Dict[str, Any]] + ): + """Ensure necessary indexes exist for the items. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + """ + all_indexes = index_selector.get_collection_indexes(collection_id) + + if not all_indexes: + first_item = items[0] + self.search_adapter.create_datetime_index_sync( + self.client, + collection_id, + extract_date(first_item["properties"]["datetime"]), + ) + index_selector.refresh_cache() + + def _handle_index_splitting( + self, index_selector, collection_id: str, items: List[Dict[str, Any]] + ): + """Handle potential index splitting due to size limits. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + Optional[Dict]: Split information if splitting occurred, None otherwise. + """ + all_indexes = index_selector.get_collection_indexes(collection_id) + all_indexes.sort() + latest_index = all_indexes[-1] + + first_item = items[0] + first_item_index = self._get_target_index_internal( + index_selector, collection_id, first_item, check_size=False + ) + + if first_item_index != latest_index: + return None + + if self.datetime_manager.size_manager.is_index_oversized_sync(first_item_index): + return self._create_new_index_for_split( + collection_id, first_item_index, first_item + ) + + return None + + def _create_new_index_for_split( + self, collection_id: str, latest_index: str, first_item: Dict[str, Any] + ): + """Create new index for splitting oversized index. + + Args: + collection_id (str): Collection identifier. + latest_index (str): Current latest index name. + first_item (Dict[str, Any]): First item being processed. + + Returns: + Optional[Dict]: Split information with new index details. + """ + current_index_end_date = extract_first_date_from_index(latest_index) + first_item_date = extract_date(first_item["properties"]["datetime"]) + + if first_item_date != current_index_end_date: + self.search_adapter.update_index_alias_sync( + self.client, str(current_index_end_date), latest_index + ) + next_day_start = current_index_end_date + timedelta(days=1) + new_index = self.search_adapter.create_datetime_index_sync( + self.client, collection_id, str(next_day_start) + ) + return { + "split_date": current_index_end_date, + "new_index": new_index, + } + return None + + def _create_bulk_actions( + self, + index_selector, + collection_id: str, + items: List[Dict[str, Any]], + split_info: Optional[Dict], + ) -> List[Dict[str, Any]]: + """Create bulk actions for all items. + + Args: + index_selector: Index selector instance. + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + split_info (Optional[Dict]): Split information if applicable. + + Returns: + List[Dict[str, Any]]: List of prepared bulk actions. + """ + actions = [] + + for item in items: + if split_info: + item_date = extract_date(item["properties"]["datetime"]) + if item_date > split_info["split_date"]: + target_index = split_info["new_index"] + else: + target_index = self._get_target_index_internal( + index_selector, collection_id, item, check_size=False + ) + else: + target_index = self._get_target_index_internal( + index_selector, collection_id, item, check_size=False + ) + + actions.append( + { + "_index": target_index, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + ) + + return actions + + +class SyncSimpleIndexInserter(BaseSyncIndexInserter): + """Simple sync index insertion strategy.""" + + def __init__(self, search_adapter: SearchEngineAdapter, client: Any): + """Initialize the sync simple index inserter. + + Args: + search_adapter (SearchEngineAdapter): Search engine adapter instance. + client: Sync search engine client instance. + """ + self.search_adapter = search_adapter + self.client = client + + def should_create_collection_index(self) -> bool: + """Whether this strategy requires collection index creation. + + Returns: + bool: True, as simple strategy creates collection indexes. + """ + return True + + def create_simple_index(self, client: Any, collection_id: str) -> str: + """Create a simple index synchronously. + + Args: + client: Search engine client instance. + collection_id (str): Collection identifier. + + Returns: + str: Created index name. + """ + return self.search_adapter.create_simple_index_sync(client, collection_id) + + def get_target_index(self, collection_id: str, product: Dict[str, Any]) -> str: + """Get target index (always the collection alias). + + Args: + collection_id (str): Collection identifier. + product (Dict[str, Any]): Product data (not used in simple strategy). + + Returns: + str: Collection alias name. + """ + return index_alias_by_collection_id(collection_id) + + def prepare_bulk_actions( + self, collection_id: str, items: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepare bulk actions for simple indexing. + + Args: + collection_id (str): Collection identifier. + items (List[Dict[str, Any]]): List of items to process. + + Returns: + List[Dict[str, Any]]: List of bulk actions with collection alias as target. + """ + target_index = index_alias_by_collection_id(collection_id) + return [ + { + "_index": target_index, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in items + ] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/types.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/types.py new file mode 100644 index 00000000..3c0411d1 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/types.py @@ -0,0 +1,10 @@ +"""Search engine types and enums.""" + +from enum import Enum + + +class SearchEngineType(Enum): + """Supported search engine types.""" + + ELASTICSEARCH = "elasticsearch" + OPENSEARCH = "opensearch" diff --git a/stac_fastapi/tests/api/test_api.py b/stac_fastapi/tests/api/test_api.py index efc97174..1ca0262f 100644 --- a/stac_fastapi/tests/api/test_api.py +++ b/stac_fastapi/tests/api/test_api.py @@ -1,7 +1,9 @@ +import os import random import uuid from copy import deepcopy from datetime import datetime, timedelta +from unittest.mock import patch import pytest @@ -25,6 +27,7 @@ "GET /collections/{collection_id}", "GET /collections/{collection_id}/queryables", "GET /collections/{collection_id}/items", + "POST /collections/{collection_id}/bulk_items", "GET /collections/{collection_id}/items/{item_id}", "GET /search", "POST /search", @@ -427,6 +430,9 @@ async def test_search_point_does_not_intersect(app_client, ctx): @pytest.mark.asyncio async def test_datetime_response_format(app_client, txn_client, ctx): + if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + first_item = dict(ctx.item) second_item = deepcopy(first_item) @@ -464,6 +470,9 @@ async def test_datetime_response_format(app_client, txn_client, ctx): @pytest.mark.asyncio async def test_datetime_non_interval(app_client, txn_client, ctx): + if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + first_item = dict(ctx.item) second_item = deepcopy(first_item) @@ -500,6 +509,9 @@ async def test_datetime_non_interval(app_client, txn_client, ctx): @pytest.mark.asyncio async def test_datetime_interval(app_client, txn_client, ctx): + if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + first_item = dict(ctx.item) second_item = deepcopy(first_item) @@ -536,6 +548,9 @@ async def test_datetime_interval(app_client, txn_client, ctx): @pytest.mark.asyncio async def test_datetime_bad_non_interval(app_client, txn_client, ctx): + if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + first_item = dict(ctx.item) second_item = deepcopy(first_item) @@ -572,6 +587,9 @@ async def test_datetime_bad_non_interval(app_client, txn_client, ctx): @pytest.mark.asyncio async def test_datetime_bad_interval(app_client, txn_client, ctx): + if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + first_item = dict(ctx.item) second_item = deepcopy(first_item) @@ -823,3 +841,230 @@ async def test_big_int_eo_search( results = {x["properties"][attr] for x in resp_json["features"]} assert len(results) == expected assert results == {value} + + +@pytest.mark.asyncio +async def test_create_item_in_past_date_creates_separate_index( + app_client, ctx, load_test_data, txn_client +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + item = load_test_data("test_item.json") + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = "2012-02-12T12:30:22Z" + + response = await app_client.post( + f"/collections/{item['collection']}/items", json=item + ) + + assert response.status_code == 201 + + indices = await txn_client.database.client.indices.get_alias(index="*") + expected_indices = [ + "items_test-collection_2012-02-12", + "items_test-collection_2020-02-12", + ] + + for expected_index in expected_indices: + assert expected_index in indices.keys() + + +@pytest.mark.asyncio +async def test_create_item_uses_existing_datetime_index( + app_client, ctx, load_test_data, txn_client +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + item = load_test_data("test_item.json") + item["id"] = str(uuid.uuid4()) + + response = await app_client.post( + f"/collections/{item['collection']}/items", json=item + ) + + assert response.status_code == 201 + + indices = await txn_client.database.client.indices.get_alias(index="*") + assert "items_test-collection_2020-02-12" in indices.keys() + + +async def test_create_item_with_different_date_same_index( + app_client, load_test_data, txn_client, ctx +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + item = load_test_data("test_item.json") + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = "2022-02-12T12:30:22Z" + + response = await app_client.post( + f"/collections/{item['collection']}/items", json=item + ) + + assert response.status_code == 201 + + indices = await txn_client.database.client.indices.get_alias(index="*") + assert "items_test-collection_2020-02-12" in indices.keys() + + +@pytest.mark.asyncio +async def test_create_new_index_when_size_limit_exceeded( + app_client, load_test_data, txn_client, ctx +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + item = load_test_data("test_item.json") + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = "2024-02-12T12:30:22Z" + + with patch( + "stac_fastapi.sfeos_helpers.search_engine.managers.IndexSizeManager.get_index_size_in_gb" + ) as mock_get_size: + mock_get_size.return_value = 26.0 + response = await app_client.post( + f"/collections/{item['collection']}/items", json=item + ) + + assert response.status_code == 201 + + indices = await txn_client.database.client.indices.get_alias(index="*") + expected_indices = [ + "items_test-collection_2020-02-12", + "items_test-collection_2024-02-13", + ] + + for expected_index in expected_indices: + assert expected_index in indices.keys() + + +@pytest.mark.asyncio +async def test_create_item_fails_without_datetime( + app_client, load_test_data, txn_client, ctx +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + item = load_test_data("test_item.json") + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = None + response = await app_client.post( + f"/collections/{item['collection']}/items", json=item + ) + assert response.status_code == 400 + + +@pytest.mark.asyncio +async def test_bulk_create_items_with_same_date_range( + app_client, load_test_data, txn_client, ctx +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + base_item = load_test_data("test_item.json") + items_dict = {} + + for i in range(10): + item = deepcopy(base_item) + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = f"2020-02-{12 + i}T12:30:22Z" + items_dict[item["id"]] = item + + payload = {"items": items_dict, "method": "insert"} + + response = await app_client.post( + f"/collections/{base_item['collection']}/bulk_items", json=payload + ) + + assert response.status_code == 200 + + indices = await txn_client.database.client.indices.get_alias(index="*") + assert "items_test-collection_2020-02-12" in indices.keys() + + +@pytest.mark.asyncio +async def test_bulk_create_items_with_different_date_ranges( + app_client, load_test_data, txn_client, ctx +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + base_item = load_test_data("test_item.json") + items_dict = {} + + for i in range(3): + item = deepcopy(base_item) + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = f"2020-02-{12 + i}T12:30:22Z" + items_dict[item["id"]] = item + + for i in range(2): + item = deepcopy(base_item) + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = f"2010-02-{10 + i}T12:30:22Z" + items_dict[item["id"]] = item + + payload = {"items": items_dict, "method": "insert"} + + response = await app_client.post( + f"/collections/{base_item['collection']}/bulk_items", json=payload + ) + + assert response.status_code == 200 + + indices = await txn_client.database.client.indices.get_alias(index="*") + expected_indices = [ + "items_test-collection_2020-02-12", + "items_test-collection_2010-02-10", + ] + + for expected_index in expected_indices: + assert expected_index in indices.keys() + + +@pytest.mark.asyncio +async def test_bulk_create_items_with_size_limit_exceeded( + app_client, load_test_data, txn_client, ctx +): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + + base_item = load_test_data("test_item.json") + items_dict = {} + + for i in range(3): + item = deepcopy(base_item) + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = f"2019-02-{15 + i}T12:30:22Z" + items_dict[item["id"]] = item + + for i in range(2): + item = deepcopy(base_item) + item["id"] = str(uuid.uuid4()) + item["properties"]["datetime"] = f"2010-02-{10 + i}T12:30:22Z" + items_dict[item["id"]] = item + + payload = {"items": items_dict, "method": "insert"} + + with patch( + "stac_fastapi.sfeos_helpers.search_engine.managers.IndexSizeManager.get_index_size_in_gb_sync" + ) as mock_get_size: + mock_get_size.return_value = 26.0 + response = await app_client.post( + f"/collections/{base_item['collection']}/bulk_items", json=payload + ) + + assert response.status_code == 200 + + indices = await txn_client.database.client.indices.get_alias(index="*") + expected_indices = [ + "items_test-collection_2010-02-10", + "items_test-collection_2019-02-15", + "items_test-collection_2020-02-12", + ] + + for expected_index in expected_indices: + assert expected_index in indices.keys() diff --git a/stac_fastapi/tests/conftest.py b/stac_fastapi/tests/conftest.py index d8c5fc88..83faab29 100644 --- a/stac_fastapi/tests/conftest.py +++ b/stac_fastapi/tests/conftest.py @@ -26,6 +26,7 @@ from stac_fastapi.core.rate_limit import setup_rate_limit from stac_fastapi.core.utilities import get_bool_env from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient +from stac_fastapi.sfeos_helpers.mappings import ITEMS_INDEX_PREFIX if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": from stac_fastapi.opensearch.app import app_config @@ -158,6 +159,9 @@ async def delete_collections_and_items(txn_client: TransactionsClient) -> None: await refresh_indices(txn_client) await txn_client.database.delete_items() await txn_client.database.delete_collections() + await txn_client.database.client.indices.delete(index=f"{ITEMS_INDEX_PREFIX}*") + await txn_client.database.async_index_selector.refresh_cache() + txn_client.database.sync_index_selector.refresh_cache() async def refresh_indices(txn_client: TransactionsClient) -> None: diff --git a/stac_fastapi/tests/database/test_database.py b/stac_fastapi/tests/database/test_database.py index 86611235..67897c15 100644 --- a/stac_fastapi/tests/database/test_database.py +++ b/stac_fastapi/tests/database/test_database.py @@ -1,3 +1,4 @@ +import os import uuid import pytest @@ -27,6 +28,9 @@ async def test_index_mapping_collections(ctx): @pytest.mark.asyncio async def test_index_mapping_items(txn_client, load_test_data): + if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + collection = load_test_data("test_collection.json") collection["id"] = str(uuid.uuid4()) await txn_client.create_collection( diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index 0102bf9b..1bf9abc5 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -114,8 +114,15 @@ async def test_create_uppercase_collection_with_item( async def test_update_item_already_exists(app_client, ctx, load_test_data): """Test updating an item which already exists (transactions extension)""" item = load_test_data("test_item.json") + item["id"] = str(uuid.uuid4()) assert item["properties"]["gsd"] != 16 item["properties"]["gsd"] = 16 + + response = await app_client.post( + f"/collections/{item['collection']}/items", json=item + ) + assert response.status_code == 201 + await app_client.put( f"/collections/{item['collection']}/items/{item['id']}", json=item ) @@ -998,6 +1005,9 @@ async def _search_and_get_ids( async def test_search_datetime_with_null_datetime( app_client, txn_client, load_test_data ): + if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): + pytest.skip() + """Test datetime filtering when properties.datetime is null or set, ensuring start_datetime and end_datetime are set when datetime is null.""" # Setup: Create test collection test_collection = load_test_data("test_collection.json")