From 17bfc12dabb448388130590dc99d468d36ab9cb8 Mon Sep 17 00:00:00 2001 From: Danil Timbal Date: Sat, 11 May 2024 22:02:49 +0300 Subject: [PATCH 1/2] add prefect and dags/parser.py --- pyproject.toml | 1 + src/dags/__init__.py | 0 src/dags/parser.py | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+) create mode 100644 src/dags/__init__.py create mode 100644 src/dags/parser.py diff --git a/pyproject.toml b/pyproject.toml index a00681d..4bffa45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ pytest = "^7.4.4" pytest-asyncio = "^0.23.3" langchain-openai = "^0.0.2.post1" ruamel-yaml = "^0.18.6" +prefect = "^2.18.3" [tool.poetry.group.dev.dependencies] pytest = "^7.4.3" diff --git a/src/dags/__init__.py b/src/dags/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dags/parser.py b/src/dags/parser.py new file mode 100644 index 0000000..ea5a6ef --- /dev/null +++ b/src/dags/parser.py @@ -0,0 +1,38 @@ +from loguru import logger +from prefect import flow, task +from telethon import TelegramClient + +from src.utils.scrapper import scrape_telegram_messages +from src.config import API_ID, API_HASH + +CHANNELS_TO_SCRAPE = ["cryptovalerii", "KarpovCourses"] + + +@task +async def scrape_channels(): + """ "Scraping messages from telegram-channel""" + async with TelegramClient( + "src/artifacts/sessions/post_finder.session", API_ID, API_HASH + ) as client: + results = [] + for channel in CHANNELS_TO_SCRAPE: + logger.info(f"Starting scrape for {channel}") + result = await scrape_telegram_messages(client, channel, limit=10) + results.append(result) + logger.info(f"Completed scrape for {channel}") + return results + + +@flow +async def daily_channel_scrape(): + """Initiates a daily task to scrape messages from Telegram channels""" + logger.info("Starting the scraping process...") + results = await scrape_channels() + for result in results: + print(result) + + +if __name__ == "__main__": + daily_channel_scrape.serve( + name="Daily Channel Scrape", tags=["scraping", "daily"], cron="0 0 * * *" + ) From 0f5257487974a7d2c4cefc54a1c60346071b4bc6 Mon Sep 17 00:00:00 2001 From: Danil Timbal Date: Mon, 13 May 2024 21:02:44 +0300 Subject: [PATCH 2/2] Add makefile, prefect.sh. Modified dockerfile, docker-compose --- Makefile | 11 +++++++++++ docker/Dockerfile | 6 ++++++ docker/docker-compose.yaml | 21 +++++++++++++++++++++ src/dags/parser.py | 8 ++++---- src/dags/prefect.sh | 1 + 5 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 Makefile create mode 100644 src/dags/prefect.sh diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ae346da --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +SHELL := /bin/bash + +install: + pip install poetry + poetry install + +auth: + poetry run prefect cloud login --key $(PREFECT_API_KEY) --workspace $(PREFECT_WORKSPACE_ID) + +run: + poetry run sh src/dags/prefect.sh diff --git a/docker/Dockerfile b/docker/Dockerfile index ad7d9d1..1a70d68 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,5 +1,11 @@ FROM python:3.11.6 +ARG PREFECT_API_KEY +ARG PREFECT_WORKSPACE_ID + +ENV PREFECT_API_KEY=${PREFECT_API_KEY} +ENV PREFECT_WORKSPACE_ID=${PREFECT_WORKSPACE_ID} + WORKDIR /app COPY . /app diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index fc5c3ef..1b7c545 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -16,6 +16,7 @@ services: dockerfile: docker/Dockerfile depends_on: - postgres_db + - prefect_flow environment: - ENV_STATE=docker env_file: @@ -32,3 +33,23 @@ services: env_file: - ../.env-docker command: ["/app/docker/bot.sh"] + + prefect_flow: + container_name: prefect_flow + build: + context: ../ + dockerfile: docker/Dockerfile + args: + - PREFECT_API_KEY=${PREFECT_API_KEY} + - PREFECT_WORKSPACE_ID=${PREFECT_WORKSPACE_ID} + depends_on: + - postgres_db + env_file: + - ../.env-docker + entrypoint: /bin/bash + command: + - -c + - | + source /app/.env-docker && + make auth && + make run diff --git a/src/dags/parser.py b/src/dags/parser.py index ea5a6ef..b981ed0 100644 --- a/src/dags/parser.py +++ b/src/dags/parser.py @@ -17,14 +17,14 @@ async def scrape_channels(): results = [] for channel in CHANNELS_TO_SCRAPE: logger.info(f"Starting scrape for {channel}") - result = await scrape_telegram_messages(client, channel, limit=10) + result = await scrape_telegram_messages(client, channel) results.append(result) logger.info(f"Completed scrape for {channel}") return results @flow -async def daily_channel_scrape(): +async def daily_scraper(): """Initiates a daily task to scrape messages from Telegram channels""" logger.info("Starting the scraping process...") results = await scrape_channels() @@ -33,6 +33,6 @@ async def daily_channel_scrape(): if __name__ == "__main__": - daily_channel_scrape.serve( - name="Daily Channel Scrape", tags=["scraping", "daily"], cron="0 0 * * *" + daily_scraper.serve( + name="Daily Channel Scraper", tags=["scraping", "daily"], cron="0 0 * * *" ) diff --git a/src/dags/prefect.sh b/src/dags/prefect.sh new file mode 100644 index 0000000..e0132aa --- /dev/null +++ b/src/dags/prefect.sh @@ -0,0 +1 @@ +poetry run python -m src.dags.parser