diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ae346da --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +SHELL := /bin/bash + +install: + pip install poetry + poetry install + +auth: + poetry run prefect cloud login --key $(PREFECT_API_KEY) --workspace $(PREFECT_WORKSPACE_ID) + +run: + poetry run sh src/dags/prefect.sh diff --git a/docker/Dockerfile b/docker/Dockerfile index ad7d9d1..1a70d68 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,5 +1,11 @@ FROM python:3.11.6 +ARG PREFECT_API_KEY +ARG PREFECT_WORKSPACE_ID + +ENV PREFECT_API_KEY=${PREFECT_API_KEY} +ENV PREFECT_WORKSPACE_ID=${PREFECT_WORKSPACE_ID} + WORKDIR /app COPY . /app diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index fc5c3ef..1b7c545 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -16,6 +16,7 @@ services: dockerfile: docker/Dockerfile depends_on: - postgres_db + - prefect_flow environment: - ENV_STATE=docker env_file: @@ -32,3 +33,23 @@ services: env_file: - ../.env-docker command: ["/app/docker/bot.sh"] + + prefect_flow: + container_name: prefect_flow + build: + context: ../ + dockerfile: docker/Dockerfile + args: + - PREFECT_API_KEY=${PREFECT_API_KEY} + - PREFECT_WORKSPACE_ID=${PREFECT_WORKSPACE_ID} + depends_on: + - postgres_db + env_file: + - ../.env-docker + entrypoint: /bin/bash + command: + - -c + - | + source /app/.env-docker && + make auth && + make run diff --git a/pyproject.toml b/pyproject.toml index a00681d..4bffa45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ pytest = "^7.4.4" pytest-asyncio = "^0.23.3" langchain-openai = "^0.0.2.post1" ruamel-yaml = "^0.18.6" +prefect = "^2.18.3" [tool.poetry.group.dev.dependencies] pytest = "^7.4.3" diff --git a/src/dags/__init__.py b/src/dags/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dags/parser.py b/src/dags/parser.py new file mode 100644 index 0000000..b981ed0 --- /dev/null +++ b/src/dags/parser.py @@ -0,0 +1,38 @@ +from loguru import logger +from prefect import flow, task +from telethon import TelegramClient + +from src.utils.scrapper import scrape_telegram_messages +from src.config import API_ID, API_HASH + +CHANNELS_TO_SCRAPE = ["cryptovalerii", "KarpovCourses"] + + +@task +async def scrape_channels(): + """ "Scraping messages from telegram-channel""" + async with TelegramClient( + "src/artifacts/sessions/post_finder.session", API_ID, API_HASH + ) as client: + results = [] + for channel in CHANNELS_TO_SCRAPE: + logger.info(f"Starting scrape for {channel}") + result = await scrape_telegram_messages(client, channel) + results.append(result) + logger.info(f"Completed scrape for {channel}") + return results + + +@flow +async def daily_scraper(): + """Initiates a daily task to scrape messages from Telegram channels""" + logger.info("Starting the scraping process...") + results = await scrape_channels() + for result in results: + print(result) + + +if __name__ == "__main__": + daily_scraper.serve( + name="Daily Channel Scraper", tags=["scraping", "daily"], cron="0 0 * * *" + ) diff --git a/src/dags/prefect.sh b/src/dags/prefect.sh new file mode 100644 index 0000000..e0132aa --- /dev/null +++ b/src/dags/prefect.sh @@ -0,0 +1 @@ +poetry run python -m src.dags.parser