diff --git a/docker-compose.yml b/docker-compose.yml index d5929601..9fac4901 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.8' - services: web: build: . @@ -9,7 +7,8 @@ services: - "8000:8000" env_file: - ./ops/environment/.default.env - - ./ops/environment/.local.env + - ./ops/environment/.env + - ./env.example depends_on: # db: # condition: service_healthy diff --git a/src/catalogue/models/elasticsearch.py b/src/catalogue/models/elasticsearch.py index d907061d..7719af89 100644 --- a/src/catalogue/models/elasticsearch.py +++ b/src/catalogue/models/elasticsearch.py @@ -1,10 +1,12 @@ from elasticsearch_dsl import ( AsyncDocument as Document, Text, + connections, ) PRODUCT_INDEX = 'products_index' +CATEGORY_INDEX = 'categories_index' class ProductIndex(Document): @@ -14,3 +16,10 @@ class ProductIndex(Document): class Index: name = PRODUCT_INDEX + +class CategoryIndex(Document): + title = Text() + description = Text() + + class Index: + name = 'categories_index' \ No newline at end of file diff --git a/src/catalogue/models/pydantic.py b/src/catalogue/models/pydantic.py index d27c82e3..acca499b 100644 --- a/src/catalogue/models/pydantic.py +++ b/src/catalogue/models/pydantic.py @@ -5,3 +5,8 @@ class ProductElasticResponse(BaseModel): product_id: int title: str score: float + +class CategoryElasticResponse(BaseModel): + product_id: int + title: str + score: float \ No newline at end of file diff --git a/src/catalogue/repository.py b/src/catalogue/repository.py index fab3dcd5..e44d2529 100644 --- a/src/catalogue/repository.py +++ b/src/catalogue/repository.py @@ -1,7 +1,7 @@ from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession -from src.catalogue.models.database import Product +from src.catalogue.models.database import Product, Category from src.common.databases.postgres import get_session from src.common.repository.sqlalchemy import BaseSQLAlchemyRepository @@ -13,3 +13,11 @@ def __init__(self, session: AsyncSession): def get_product_repository(session: AsyncSession = Depends(get_session)) -> ProductRepository: return ProductRepository(session=session) + +class CategoryRepository(BaseSQLAlchemyRepository[Category]): + def __init__(self, session: AsyncSession): + super().__init__(model=Category, session=session) + + +def get_category_repository(session: AsyncSession = Depends(get_session)) -> CategoryRepository: + return CategoryRepository(session=session) diff --git a/src/catalogue/services.py b/src/catalogue/services.py index ac15b552..f6c59889 100644 --- a/src/catalogue/services.py +++ b/src/catalogue/services.py @@ -4,12 +4,14 @@ from fastapi import Depends from src.base_settings import base_settings -from src.catalogue.models.database import Product +from src.catalogue.models.database import Product, Category from src.catalogue.repository import ( ProductRepository, get_product_repository, + CategoryRepository, + get_category_repository, ) -from src.catalogue.utils import ProductElasticManager +from src.catalogue.utils import ProductElasticManager, CategoryElasticManager from src.common.enums import TaskStatus from src.common.service import BaseService from src.general.schemas.task_status import TaskStatusModel @@ -28,7 +30,7 @@ async def update_search_index(self, uuid): products = await self.list() try: - await ProductElasticManager().update_index(products=productsf) + await ProductElasticManager().update_index(products=products) except Exception as exc: await TaskStatusModel(uuid=uuid, status=TaskStatus.ERROR, details=str(exc)).save_to_redis() return None @@ -49,3 +51,31 @@ async def update_search_index(self, uuid): def get_product_service(repo: ProductRepository = Depends(get_product_repository)) -> ProductService: return ProductService(repository=repo) + +class CategoryService(BaseService[Category]): + def __init__(self, repository: CategoryRepository): + super().__init__(repository) + + @staticmethod + async def search(keyword: str): + result = await CategoryElasticManager().search_category(keyword=keyword) + return result + + async def update_search_index(self, uuid): + categories = await self.list() + + try: + await CategoryElasticManager().update_index(categorys=categories) + except Exception as exc: + await TaskStatusModel(uuid=uuid, status=TaskStatus.ERROR, details=str(exc)).save_to_redis() + return None + + await TaskStatusModel( + uuid=uuid, + status=TaskStatus.DONE, + done_at=datetime.utcnow().strftime(base_settings.date_time_format), + ).save_to_redis() + + +def get_category_service(repo: CategoryRepository = Depends(get_category_repository)) -> CategoryService: + return CategoryService(repository=repo) diff --git a/src/catalogue/utils.py b/src/catalogue/utils.py index d6ad6f1a..90116127 100644 --- a/src/catalogue/utils.py +++ b/src/catalogue/utils.py @@ -7,12 +7,14 @@ ) from fastapi import Depends -from src.catalogue.models.database import Product +from src.catalogue.models.database import Product, Category from src.catalogue.models.elasticsearch import ( PRODUCT_INDEX, ProductIndex, + CATEGORY_INDEX, + CategoryIndex, ) -from src.catalogue.models.pydantic import ProductElasticResponse +from src.catalogue.models.pydantic import ProductElasticResponse, CategoryElasticResponse from src.common.databases.elasticsearch import elastic_client @@ -79,3 +81,66 @@ async def update_index(self, products: list[Product]) -> None: if bulk_data: await self.client.bulk(body=bulk_data) + +class CategoryElasticManager: + def __init__(self, client: Annotated[AsyncElasticsearch, Depends(elastic_client)] = elastic_client): + self.client = client + + async def init_indices(self): + category_index = Index( + name=CATEGORY_INDEX, + using=self.client, + ) + + category_index.document(CategoryIndex) + + if not await category_index.exists(): + await category_index.create() + + @staticmethod + def build_category_search_query(keyword): + search = Search( + index='category_index', + ).query( + 'multi_match', + query=keyword, + fields=['title', 'description'], + ) + return search.to_dict() + + async def search_category(self, keyword): + query = self.build_category_search_query(keyword) + response = await self.client.search(body=query) + await self.client.close() + + hits = response.get('hits', {}).get('hits', []) + sorted_hits = sorted(hits, key=lambda x: x.get('_score', 0), reverse=True) + + sorted_response = [ + CategoryElasticResponse( + product_id=hit.get('_id', ''), + title=hit.get('_source', {}).get('title', ''), + score=hit.get('_score', {}), + ) + for hit in sorted_hits + ] + + return sorted_response + + async def update_index(self, categories: list[Category]) -> None: + bulk_data = [] + for category in categories: + action = {'index': {'_index': CATEGORY_INDEX, '_id': category.id}} + data = { + 'title': category.title, + 'description': category.description, + } + bulk_data.append(action) + bulk_data.append(data) + + if len(bulk_data) >= 100: + await self.client.bulk(body=bulk_data) + bulk_data = [] + + if bulk_data: + await self.client.bulk(body=bulk_data) \ No newline at end of file diff --git a/src/catalogue/views/product.py b/src/catalogue/views/product.py index 0118dae2..78de3c10 100644 --- a/src/catalogue/views/product.py +++ b/src/catalogue/views/product.py @@ -40,7 +40,6 @@ async def product_list(product_service: Annotated[get_product_service, Depends() """ return await product_service.list() - @router.get( ProductRoutesPrefixes.detail, responses={ @@ -69,7 +68,6 @@ async def product_detail( return response - @router.get( ProductRoutesPrefixes.search, status_code=status.HTTP_200_OK,