Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
55 changes: 55 additions & 0 deletions de-projects/22_airflow_intro/dags/simple_etl_dag_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import logging


def extract():
logging.info("Extracting data...")
return "raw_data"


def transform(ti):
raw_data = ti.xcom_pull(task_ids='extract_task')
transformed_data = raw_data.upper()
logging.info(f"Transforming data: {transformed_data}")
return transformed_data


def load(ti):
transformed_data = ti.xcom_pull(task_ids='transform_task')
logging.info(f"Loading data: {transformed_data}")


# Define the DAG
with DAG(
dag_id='simple_etl_dag_v1',
description='A simple ETL DAG',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
) as dag:

# Define tasks
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract
)

transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform
)

load_task = PythonOperator(
task_id='load_task',
python_callable=load
)

monitoring_task = BashOperator(
task_id='monitoring_task',
bash_command="echo $AIRFLOW__CORE__EXECUTOR"
)

# Set task dependencies
extract_task >> transform_task >> load_task >> monitoring_task
37 changes: 37 additions & 0 deletions de-projects/22_airflow_intro/dags/simple_etl_dag_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from airflow.decorators import task, dag
from datetime import datetime
import logging

@dag(
dag_id=f'simple_etl_dag_v2',
description='A simple ETL DAG V2 with decorators',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
catchup=False
)
def etl():
@task
def extract():
logging.info("Extracting data...")
return "dummy data"

@task
def transform(raw_data: str):
transformed_data = raw_data.upper()
logging.info(f"Transforming data: {transformed_data}")
return transformed_data

@task
def load(transformed_data: str):
logging.info(f"Loading data: {transformed_data}")

@task.bash
def monitor():
return "echo $AIRFLOW__CORE__EXECUTOR"

raw = extract()
transformed = transform(raw)
load(transformed) >> monitor()


dag = etl()
40 changes: 40 additions & 0 deletions de-projects/22_airflow_intro/dags/simple_etl_dags_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from airflow.decorators import task, dag
from datetime import datetime
import logging


def make_etl_dag(table_name: str):
@dag(
dag_id=f'simple_etl_dag_{table_name}',
description='A simple ETL DAG V2',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
catchup=False
)
def etl():
@task(task_id=f"extract_{table_name}")
def extract():
logging.info("Extracting data...")
return table_name

@task(task_id=f"transform_{table_name}")
def transform(raw_data: str):
transformed_data = raw_data.upper()
logging.info(f"Transforming data: {transformed_data}")
return transformed_data

@task(task_id=f"load_{table_name}")
def load(transformed_data: str):
logging.info(f"Loading data: {transformed_data}")

raw = extract()
transformed = transform(raw)
load(transformed)

return etl()


TABLES = ['accounts', 'contacts', 'sales']

for table in TABLES:
make_etl_dag(table)
221 changes: 221 additions & 0 deletions de-projects/22_airflow_intro/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.4}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}

volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always

redis:
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always

airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources

airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
command:
- bash
- -c
- airflow

flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- "5555:5555"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

volumes:
postgres-db-volume:
Empty file.
Empty file.
Loading
Loading