diff --git a/de-projects/22_airflow_intro/config/.gitkeep b/de-projects/22_airflow_intro/config/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/de-projects/22_airflow_intro/dags/simple_etl_dag_v1.py b/de-projects/22_airflow_intro/dags/simple_etl_dag_v1.py new file mode 100644 index 0000000..26f25a8 --- /dev/null +++ b/de-projects/22_airflow_intro/dags/simple_etl_dag_v1.py @@ -0,0 +1,55 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.operators.bash import BashOperator +from datetime import datetime, timedelta +import logging + + +def extract(): + logging.info("Extracting data...") + return "raw_data" + + +def transform(ti): + raw_data = ti.xcom_pull(task_ids='extract_task') + transformed_data = raw_data.upper() + logging.info(f"Transforming data: {transformed_data}") + return transformed_data + + +def load(ti): + transformed_data = ti.xcom_pull(task_ids='transform_task') + logging.info(f"Loading data: {transformed_data}") + + +# Define the DAG +with DAG( + dag_id='simple_etl_dag_v1', + description='A simple ETL DAG', + start_date=datetime(2025, 1, 1), + schedule_interval='@daily', +) as dag: + + # Define tasks + extract_task = PythonOperator( + task_id='extract_task', + python_callable=extract + ) + + transform_task = PythonOperator( + task_id='transform_task', + python_callable=transform + ) + + load_task = PythonOperator( + task_id='load_task', + python_callable=load + ) + + monitoring_task = BashOperator( + task_id='monitoring_task', + bash_command="echo $AIRFLOW__CORE__EXECUTOR" + ) + + # Set task dependencies + extract_task >> transform_task >> load_task >> monitoring_task diff --git a/de-projects/22_airflow_intro/dags/simple_etl_dag_v2.py b/de-projects/22_airflow_intro/dags/simple_etl_dag_v2.py new file mode 100644 index 0000000..4e45850 --- /dev/null +++ b/de-projects/22_airflow_intro/dags/simple_etl_dag_v2.py @@ -0,0 +1,37 @@ +from airflow.decorators import task, dag +from datetime import datetime +import logging + +@dag( + dag_id=f'simple_etl_dag_v2', + description='A simple ETL DAG V2 with decorators', + start_date=datetime(2025, 1, 1), + schedule_interval='@daily', + catchup=False +) +def etl(): + @task + def extract(): + logging.info("Extracting data...") + return "dummy data" + + @task + def transform(raw_data: str): + transformed_data = raw_data.upper() + logging.info(f"Transforming data: {transformed_data}") + return transformed_data + + @task + def load(transformed_data: str): + logging.info(f"Loading data: {transformed_data}") + + @task.bash + def monitor(): + return "echo $AIRFLOW__CORE__EXECUTOR" + + raw = extract() + transformed = transform(raw) + load(transformed) >> monitor() + + +dag = etl() diff --git a/de-projects/22_airflow_intro/dags/simple_etl_dags_v3.py b/de-projects/22_airflow_intro/dags/simple_etl_dags_v3.py new file mode 100644 index 0000000..16d77b2 --- /dev/null +++ b/de-projects/22_airflow_intro/dags/simple_etl_dags_v3.py @@ -0,0 +1,40 @@ +from airflow.decorators import task, dag +from datetime import datetime +import logging + + +def make_etl_dag(table_name: str): + @dag( + dag_id=f'simple_etl_dag_{table_name}', + description='A simple ETL DAG V2', + start_date=datetime(2025, 1, 1), + schedule_interval='@daily', + catchup=False + ) + def etl(): + @task(task_id=f"extract_{table_name}") + def extract(): + logging.info("Extracting data...") + return table_name + + @task(task_id=f"transform_{table_name}") + def transform(raw_data: str): + transformed_data = raw_data.upper() + logging.info(f"Transforming data: {transformed_data}") + return transformed_data + + @task(task_id=f"load_{table_name}") + def load(transformed_data: str): + logging.info(f"Loading data: {transformed_data}") + + raw = extract() + transformed = transform(raw) + load(transformed) + + return etl() + + +TABLES = ['accounts', 'contacts', 'sales'] + +for table in TABLES: + make_etl_dag(table) diff --git a/de-projects/22_airflow_intro/docker-compose.yaml b/de-projects/22_airflow_intro/docker-compose.yaml new file mode 100644 index 0000000..cb112d8 --- /dev/null +++ b/de-projects/22_airflow_intro/docker-compose.yaml @@ -0,0 +1,221 @@ +x-airflow-common: + &airflow-common + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.4} + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + image: redis:7.2-bookworm + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + command: + - -c + - | + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + command: + - bash + - -c + - airflow + + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: diff --git a/de-projects/22_airflow_intro/logs/.gitkeep b/de-projects/22_airflow_intro/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/de-projects/22_airflow_intro/plugins/.gitkeep b/de-projects/22_airflow_intro/plugins/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/de-projects/22_airflow_intro/readme.md b/de-projects/22_airflow_intro/readme.md new file mode 100644 index 0000000..c703aac --- /dev/null +++ b/de-projects/22_airflow_intro/readme.md @@ -0,0 +1,69 @@ +# Getting started with Airflow + +## Prerequisites +For working on the project, everyone needs the following things installed / available before the webinar: + +- Python +- VS Code +- Docker Desktop + +## Instructions + +1. Download the repo and open it in VS Code + +2. Navigate to the `/data-projects/de-projects/22_airflow_intro/` project folder + +3. Run docker compose + +```bash +docker compose up -d +``` + +4. Verify successful setup + +```bash +docker compose ps -a +``` + +5. Open Airflow UI at http://localhost:8080/ + +6. Run all dags + + +## Useful commands + +View airflow logs for all services + +```bash +docker compose logs +``` + +View airflow logs for specific service + +```bash +docker compose logs airflow-webserver +``` + +View airflow logs for all services in real time (live streaming) + +```bash +docker compose logs airflow-webserver -f +``` + +Log in to container running airflow service + +```bash +docker compose exec -it airflow-webserver bash +``` + +Shut down all services + +```bash +docker compose down +``` + +## Useful materials + + - ["Getting Started with Apache Airflow" article](https://medium.com/@kazarmax/getting-started-with-apache-airflow-b2fba7dbd45a) + + - ["ETL with Airflow" project](https://medium.com/@kazarmax/extracting-and-processing-job-market-data-etl-with-apache-airflow-and-adzuna-api-d7feae752bf9)