Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@


def main() -> None:
# SETUP
initialize_dependencies()

# OPEN STREET MAP
run_pipeline()


Expand Down
3 changes: 2 additions & 1 deletion src/application/contracts/stac_service_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ def create_release_catalog(self, root_catalog: Catalog, release: str) -> Catalog
raise NotImplementedError

@abstractmethod
def save_catalog(self, catalog: Catalog) -> None:
def save_catalog(self, catalog: Catalog, release: str) -> None:
"""
Saves catalog to its self href location
:param catalog:
:param release:
:return:
"""
raise NotImplementedError
16 changes: 12 additions & 4 deletions src/infra/infrastructure/services/stac_io_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,34 @@ def __init__(self, blob_storage_service: IBlobStorageService):
super().__init__()
self.__blob_storage_service = blob_storage_service


def write_text(self, dest: HREF, txt: str, *args: Any, **kwargs: Any) -> None:
data = txt.encode(encoding="utf-8")
path = self.strip_path_stem(dest)

if path != "catalog.json" and self.__blob_storage_service.is_blob_in_storage_container(
container_name=StorageContainer.STAC,
blob_name=path
):
is_file_uploaded = (
path != "catalog.json" and
self.__blob_storage_service.is_blob_in_storage_container(
container_name=StorageContainer.STAC,
blob_name=path
)
)

if is_file_uploaded:
return

self.__blob_storage_service.upload_file(container_name=StorageContainer.STAC, blob_name=path, data=data)

def read_text(self, source: HREF, *args: Any, **kwargs: Any) -> str:
path = self.strip_path_stem(source)
path = path.lstrip(".")

data = self.__blob_storage_service.download_file(container_name=StorageContainer.STAC, blob_name=path)

if data is None:
raise FileNotFoundError(f"File not found in blob storage: {path}")

self.skip_file_download = False
return data.decode(encoding="utf-8")

def strip_path_stem(self, path: str) -> str:
Expand Down
17 changes: 12 additions & 5 deletions src/infra/infrastructure/services/stac_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,15 @@ def create_release_catalog(self, root_catalog: Catalog, release: str) -> Catalog

return release_catalog

def save_catalog(self, catalog: Catalog) -> None:
catalog.normalize_and_save(
root_href=Config.STAC_STORAGE_CONTAINER,
catalog_type=CatalogType.ABSOLUTE_PUBLISHED
)
def save_catalog(self, catalog: Catalog, release: str) -> None:
latest_release = catalog.get_child(id=f"release/{release}", recursive=True)

catalog_copy = catalog.clone()
catalog_copy.clear_children()
catalog_copy.add_child(latest_release)
catalog_copy.normalize_hrefs(root_href=Config.STAC_STORAGE_CONTAINER)

normalized_latest_release = catalog_copy.get_child(id=f"release/{release}", recursive=True)
catalog.remove_child(f"release/{release}")
catalog.add_child(normalized_latest_release)
catalog.save(catalog_type=CatalogType.ABSOLUTE_PUBLISHED)
15 changes: 10 additions & 5 deletions src/presentation/entrypoints/release_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from src.application.common import logger
from src.application.contracts import (
IReleaseService, IStacService, IOpenStreetMapFileService, ICountyService, IOpenStreetMapService, IFKBService,
IVectorService, IBlobStorageService, IFilePathService, IConflationService
IVectorService, IBlobStorageService, IFilePathService, IConflationService, IStacIOService
)
from src.domain.enums import EPSGCode, Theme, DataSource, StorageContainer
from src.infra.infrastructure import Containers
Expand Down Expand Up @@ -93,13 +93,13 @@ def run_pipeline() -> None:

add_assets_to_item(conflated_region_item, conflated_blob_paths)

save_catalog(catalog=root_catalog)
save_catalog(catalog=root_catalog, release=latest_release)


@inject
def create_release(
release_service: IReleaseService = Provide[Containers.release_service],
stac_service: IStacService = Provide[Containers.stac_service],
stac_service: IStacService = Provide[Containers.stac_service]
) -> tuple[str, Catalog, Catalog]:
root_catalog = stac_service.get_catalog_root()
current_release = release_service.create_release()
Expand Down Expand Up @@ -246,5 +246,10 @@ def add_assets_to_item(


@inject
def save_catalog(catalog: Catalog, stac_service: IStacService = Provide[Containers.stac_service]) -> None:
stac_service.save_catalog(catalog)
def save_catalog(
catalog: Catalog,
release: str,
stac_service: IStacService = Provide[Containers.stac_service],
stac_io_service: IStacIOService = Provide[Containers.stac_io_service]
) -> None:
stac_service.save_catalog(catalog, release)