Skip to content

Commit 7ba552d

Browse files
author
Grzegorz Pustulka
committed
part_1 - sync versions removed
1 parent f7bcff6 commit 7ba552d

File tree

21 files changed

+258
-1003
lines changed

21 files changed

+258
-1003
lines changed

compose.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ services:
33
container_name: stac-fastapi-es
44
image: stac-utils/stac-fastapi-es
55
restart: always
6+
platform: linux/amd64
67
build:
78
context: .
89
dockerfile: dockerfiles/Dockerfile.dev.es
@@ -21,6 +22,7 @@ services:
2122
- ES_USE_SSL=false
2223
- ES_VERIFY_CERTS=false
2324
- BACKEND=elasticsearch
25+
- ENABLE_DATETIME_INDEX_FILTERING=true
2426
ports:
2527
- "8080:8080"
2628
volumes:
@@ -36,6 +38,7 @@ services:
3638
container_name: stac-fastapi-os
3739
image: stac-utils/stac-fastapi-os
3840
restart: always
41+
platform: linux/amd64
3942
build:
4043
context: .
4144
dockerfile: dockerfiles/Dockerfile.dev.os
@@ -70,6 +73,7 @@ services:
7073
container_name: es-container
7174
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0}
7275
hostname: elasticsearch
76+
platform: linux/amd64
7377
environment:
7478
ES_JAVA_OPTS: -Xms512m -Xmx1g
7579
action.destructive_requires_name: false
@@ -83,6 +87,7 @@ services:
8387
container_name: os-container
8488
image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1}
8589
hostname: opensearch
90+
platform: linux/amd64
8691
environment:
8792
- discovery.type=single-node
8893
- plugins.security.disabled=true

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
PartialItem,
2828
PatchOperation,
2929
)
30-
from stac_fastapi.sfeos_helpers import filter
3130
from stac_fastapi.sfeos_helpers.database import (
3231
apply_free_text_filter_shared,
3332
apply_intersects_filter_shared,
@@ -36,6 +35,7 @@
3635
get_queryables_mapping_shared,
3736
index_alias_by_collection_id,
3837
index_by_collection_id,
38+
mk_actions,
3939
mk_item_id,
4040
populate_sort_shared,
4141
validate_refresh,
@@ -44,6 +44,7 @@
4444
merge_to_operations,
4545
operations_to_script,
4646
)
47+
from stac_fastapi.sfeos_helpers.filter import cql2 as filter
4748
from stac_fastapi.sfeos_helpers.mappings import (
4849
AGGREGATION_MAPPING,
4950
COLLECTIONS_INDEX,
@@ -54,8 +55,8 @@
5455
)
5556
from stac_fastapi.sfeos_helpers.search_engine import (
5657
BaseIndexInserter,
58+
BaseIndexSelector,
5759
IndexInsertionFactory,
58-
IndexSelectionStrategy,
5960
IndexSelectorFactory,
6061
)
6162
from stac_fastapi.types.errors import ConflictError, NotFoundError
@@ -137,10 +138,8 @@ class DatabaseLogic(BaseDatabaseLogic):
137138
sync_settings: SyncElasticsearchSettings = attr.ib(
138139
factory=SyncElasticsearchSettings
139140
)
140-
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
141-
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
141+
async_index_selector: BaseIndexSelector = attr.ib(init=False)
142142
async_index_inserter: BaseIndexInserter = attr.ib(init=False)
143-
sync_index_inserter: BaseIndexInserter = attr.ib(init=False)
144143

145144
client = attr.ib(init=False)
146145
sync_client = attr.ib(init=False)
@@ -149,18 +148,10 @@ def __attrs_post_init__(self):
149148
"""Initialize clients after the class is instantiated."""
150149
self.client = self.async_settings.create_client
151150
self.sync_client = self.sync_settings.create_client
152-
self.async_index_inserter = (
153-
IndexInsertionFactory.create_async_insertion_strategy(self.client)
154-
)
155-
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
156-
self.sync_client
157-
)
158-
self.async_index_selector = IndexSelectorFactory.create_async_selector(
151+
self.async_index_inserter = IndexInsertionFactory.create_insertion_strategy(
159152
self.client
160153
)
161-
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(
162-
self.sync_client
163-
)
154+
self.async_index_selector = IndexSelectorFactory.create_selector(self.client)
164155

165156
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
166157
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -1448,12 +1439,9 @@ def bulk_sync(
14481439

14491440
# Perform the bulk insert
14501441
raise_on_error = self.sync_settings.raise_on_bulk_error
1451-
actions = self.sync_index_inserter.prepare_bulk_actions(
1452-
collection_id, processed_items
1453-
)
14541442
success, errors = helpers.bulk(
14551443
self.sync_client,
1456-
actions,
1444+
mk_actions(collection_id, processed_items),
14571445
refresh=refresh,
14581446
raise_on_error=raise_on_error,
14591447
)

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
AsyncOpensearchSettings as AsyncSearchSettings,
2727
)
2828
from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings
29-
from stac_fastapi.sfeos_helpers import filter
3029
from stac_fastapi.sfeos_helpers.database import (
3130
apply_free_text_filter_shared,
3231
apply_intersects_filter_shared,
3332
create_index_templates_shared,
3433
delete_item_index_shared,
3534
get_queryables_mapping_shared,
3635
index_alias_by_collection_id,
36+
mk_actions,
3737
mk_item_id,
3838
populate_sort_shared,
3939
validate_refresh,
@@ -42,6 +42,7 @@
4242
merge_to_operations,
4343
operations_to_script,
4444
)
45+
from stac_fastapi.sfeos_helpers.filter import cql2 as filter
4546
from stac_fastapi.sfeos_helpers.mappings import (
4647
AGGREGATION_MAPPING,
4748
COLLECTIONS_INDEX,
@@ -53,8 +54,8 @@
5354
)
5455
from stac_fastapi.sfeos_helpers.search_engine import (
5556
BaseIndexInserter,
57+
BaseIndexSelector,
5658
IndexInsertionFactory,
57-
IndexSelectionStrategy,
5859
IndexSelectorFactory,
5960
)
6061
from stac_fastapi.types.errors import ConflictError, NotFoundError
@@ -120,10 +121,8 @@ class DatabaseLogic(BaseDatabaseLogic):
120121
async_settings: AsyncSearchSettings = attr.ib(factory=AsyncSearchSettings)
121122
sync_settings: SyncSearchSettings = attr.ib(factory=SyncSearchSettings)
122123

123-
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
124-
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
124+
async_index_selector: BaseIndexSelector = attr.ib(init=False)
125125
async_index_inserter: BaseIndexInserter = attr.ib(init=False)
126-
sync_index_inserter: BaseIndexInserter = attr.ib(init=False)
127126

128127
client = attr.ib(init=False)
129128
sync_client = attr.ib(init=False)
@@ -132,18 +131,10 @@ def __attrs_post_init__(self):
132131
"""Initialize clients after the class is instantiated."""
133132
self.client = self.async_settings.create_client
134133
self.sync_client = self.sync_settings.create_client
135-
self.async_index_inserter = (
136-
IndexInsertionFactory.create_async_insertion_strategy(self.client)
137-
)
138-
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
139-
self.sync_client
140-
)
141-
self.async_index_selector = IndexSelectorFactory.create_async_selector(
134+
self.async_index_inserter = IndexInsertionFactory.create_insertion_strategy(
142135
self.client
143136
)
144-
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(
145-
self.sync_client
146-
)
137+
self.async_index_selector = IndexSelectorFactory.create_selector(self.client)
147138

148139
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
149140
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -1323,6 +1314,7 @@ async def bulk_async(
13231314
- "false": Does not refresh the index immediately (default behavior).
13241315
- "wait_for": Waits for the next refresh cycle to make the changes visible.
13251316
"""
1317+
breakpoint()
13261318
# Ensure kwargs is a dictionary
13271319
kwargs = kwargs or {}
13281320

@@ -1407,14 +1399,15 @@ def bulk_sync(
14071399
logger.warning(f"No items to insert for collection {collection_id}")
14081400
return 0, []
14091401

1410-
raise_on_error = self.sync_settings.raise_on_bulk_error
1411-
actions = self.sync_index_inserter.prepare_bulk_actions(
1412-
collection_id, processed_items
1413-
)
1402+
# Handle empty processed_items
1403+
if not processed_items:
1404+
logger.warning(f"No items to insert for collection {collection_id}")
1405+
return 0, []
14141406

1407+
raise_on_error = self.sync_settings.raise_on_bulk_error
14151408
success, errors = helpers.bulk(
14161409
self.sync_client,
1417-
actions,
1410+
mk_actions(collection_id, processed_items),
14181411
refresh=refresh,
14191412
raise_on_error=raise_on_error,
14201413
)

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
# Re-export all functions for backward compatibility
3333
from .datetime import extract_date, extract_first_date_from_index, return_date
34-
from .document import mk_item_id
34+
from .document import mk_actions, mk_item_id
3535
from .index import (
3636
create_index_templates_shared,
3737
delete_item_index_shared,
@@ -64,6 +64,7 @@
6464
"get_queryables_mapping_shared",
6565
# Document operations
6666
"mk_item_id",
67+
"mk_actions",
6768
# Utility functions
6869
"validate_refresh",
6970
"get_bool_env",

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
including document ID generation and bulk action creation.
55
"""
66

7+
from typing import Any, Dict, List
8+
9+
from stac_fastapi.sfeos_helpers.database.index import index_alias_by_collection_id
10+
from stac_fastapi.types.stac import Item
11+
712

813
def mk_item_id(item_id: str, collection_id: str) -> str:
914
"""Create the document id for an Item in Elasticsearch.
@@ -16,3 +21,28 @@ def mk_item_id(item_id: str, collection_id: str) -> str:
1621
str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character.
1722
"""
1823
return f"{item_id}|{collection_id}"
24+
25+
26+
def mk_actions(collection_id: str, processed_items: List[Item]) -> List[Dict[str, Any]]:
27+
"""Create Elasticsearch bulk actions for a list of processed items.
28+
29+
Args:
30+
collection_id (str): The identifier for the collection the items belong to.
31+
processed_items (List[Item]): The list of processed items to be bulk indexed.
32+
33+
Returns:
34+
List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed,
35+
each action being a dictionary with the following keys:
36+
- `_index`: the index to store the document in.
37+
- `_id`: the document's identifier.
38+
- `_source`: the source of the document.
39+
"""
40+
index_alias = index_alias_by_collection_id(collection_id)
41+
return [
42+
{
43+
"_index": index_alias,
44+
"_id": mk_item_id(item["id"], item["collection"]),
45+
"_source": item,
46+
}
47+
for item in processed_items
48+
]
Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,27 @@
11
"""Search engine index management package."""
22

3-
from .async_inserters import AsyncDatetimeIndexInserter, AsyncSimpleIndexInserter
4-
from .base import BaseAsyncIndexInserter, BaseSyncIndexInserter
3+
from .base import BaseIndexInserter
54
from .factory import IndexInsertionFactory
65
from .index_operations import IndexOperations
6+
from .inserters import DatetimeIndexInserter, SimpleIndexInserter
77
from .managers import DatetimeIndexManager, IndexSizeManager
88
from .selection import (
9-
AsyncDatetimeBasedIndexSelector,
10-
IndexSelectionStrategy,
9+
BaseIndexSelector,
10+
DatetimeBasedIndexSelector,
1111
IndexSelectorFactory,
12-
SyncDatetimeBasedIndexSelector,
1312
UnfilteredIndexSelector,
1413
)
15-
from .sync_inserters import SyncDatetimeIndexInserter, SyncSimpleIndexInserter
1614

1715
__all__ = [
18-
"BaseAsyncIndexInserter",
19-
"BaseSyncIndexInserter",
16+
"BaseIndexInserter",
17+
"BaseIndexSelector",
18+
"IndexOperations",
2019
"IndexSizeManager",
2120
"DatetimeIndexManager",
22-
"AsyncDatetimeIndexInserter",
23-
"AsyncSimpleIndexInserter",
24-
"SyncDatetimeIndexInserter",
25-
"SyncSimpleIndexInserter",
26-
"IndexOperations",
21+
"DatetimeIndexInserter",
22+
"SimpleIndexInserter",
2723
"IndexInsertionFactory",
28-
"IndexSelectionStrategy",
29-
"AsyncDatetimeBasedIndexSelector",
30-
"SyncDatetimeBasedIndexSelector",
24+
"DatetimeBasedIndexSelector",
3125
"UnfilteredIndexSelector",
3226
"IndexSelectorFactory",
3327
]

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/base.py

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from typing import Any, Dict, List
55

66

7-
class BaseAsyncIndexInserter(ABC):
7+
class BaseIndexInserter(ABC):
88
"""Base async index inserter with common async methods."""
99

1010
@abstractmethod
@@ -49,48 +49,3 @@ async def create_simple_index(self, client: Any, collection_id: str) -> str:
4949
str: Created index name.
5050
"""
5151
pass
52-
53-
54-
class BaseSyncIndexInserter(ABC):
55-
"""Base sync index inserter with common sync methods."""
56-
57-
@abstractmethod
58-
def get_target_index(self, collection_id: str, product: Dict[str, Any]) -> str:
59-
"""Get target index for a product synchronously.
60-
61-
Args:
62-
collection_id (str): Collection identifier.
63-
product (Dict[str, Any]): Product data.
64-
65-
Returns:
66-
str: Target index name.
67-
"""
68-
pass
69-
70-
@abstractmethod
71-
def prepare_bulk_actions(
72-
self, collection_id: str, items: List[Dict[str, Any]]
73-
) -> List[Dict[str, Any]]:
74-
"""Prepare bulk actions for multiple items synchronously.
75-
76-
Args:
77-
collection_id (str): Collection identifier.
78-
items (List[Dict[str, Any]]): List of items to process.
79-
80-
Returns:
81-
List[Dict[str, Any]]: List of bulk actions.
82-
"""
83-
pass
84-
85-
@abstractmethod
86-
def create_simple_index(self, client: Any, collection_id: str) -> str:
87-
"""Create a simple index synchronously.
88-
89-
Args:
90-
client: Search engine client instance.
91-
collection_id (str): Collection identifier.
92-
93-
Returns:
94-
str: Created index name.
95-
"""
96-
pass

0 commit comments

Comments
 (0)