From 5d2130121fd370eb3e805a79785ce6cf8b7a513c Mon Sep 17 00:00:00 2001 From: Maksim Kazartsev Date: Sun, 30 Mar 2025 14:39:21 -0400 Subject: [PATCH] Added dag to ingest data from Adzuna API --- .../dags/adzuna_etl/adzuna_etl_dag.py | 50 +++++++++++++ .../dags/adzuna_etl/src/utils.py | 72 +++++++++++++++++++ .../22_airflow_intro/docker-compose.yaml | 1 + .../22_airflow_intro/raw_data/.gitkeep | 0 4 files changed, 123 insertions(+) create mode 100644 de-projects/22_airflow_intro/dags/adzuna_etl/adzuna_etl_dag.py create mode 100644 de-projects/22_airflow_intro/dags/adzuna_etl/src/utils.py create mode 100644 de-projects/22_airflow_intro/raw_data/.gitkeep diff --git a/de-projects/22_airflow_intro/dags/adzuna_etl/adzuna_etl_dag.py b/de-projects/22_airflow_intro/dags/adzuna_etl/adzuna_etl_dag.py new file mode 100644 index 0000000..dffa8bd --- /dev/null +++ b/de-projects/22_airflow_intro/dags/adzuna_etl/adzuna_etl_dag.py @@ -0,0 +1,50 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.operators.bash import BashOperator +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.models import Variable +from datetime import datetime +from adzuna_etl.src.utils import get_adzuna_raw_data + + +""" +Home work + +1. Split into modules and import them inside dag + +2. Second approach (offload heavy job from airflow to outside) +create a new repo +inside repo, there should be docker container with repo code (ingestion / traformation logic) +save docker image into docker registry +inside dag, need to use docker executor to pull the image and start the container (and tasks are executed inside that container) + +""" + +# Shared folders mounted to all services to store raw and transformed data files +RAW_DATA_UNPROCESSED_FOLDER = "/opt/airflow/shared-data/raw_data/to_process/" + +ADZUNA_APP_ID = Variable.get("ADZUNA_APP_ID") +ADZUNA_APP_KEY = Variable.get("ADZUNA_APP_KEY") + + +default_args = { + "owner": "Maksim", + "depends_on_past": False, + "start_date": datetime(2025, 3, 30), +} + + +with DAG( + dag_id = "adzuna_ingest_raw_data", + default_args = default_args, + description="Download data from Adzuna API", + schedule= None #timedelta(hours=1) to run every hour; timedelta(days=1) to run every 24 hours +) as dag: + + extract_raw_data_task = PythonOperator( + task_id = "extract_raw_data_task", + python_callable = get_adzuna_raw_data, + op_args = [ADZUNA_APP_ID, ADZUNA_APP_KEY, RAW_DATA_UNPROCESSED_FOLDER] + ) + + extract_raw_data_task diff --git a/de-projects/22_airflow_intro/dags/adzuna_etl/src/utils.py b/de-projects/22_airflow_intro/dags/adzuna_etl/src/utils.py new file mode 100644 index 0000000..7ccdfef --- /dev/null +++ b/de-projects/22_airflow_intro/dags/adzuna_etl/src/utils.py @@ -0,0 +1,72 @@ +from datetime import datetime +import pandas as pd +import requests +import json +import math +import os + + +def get_adzuna_raw_data(ADZUNA_APP_ID, ADZUNA_APP_KEY, RAW_DATA_UNPROCESSED_FOLDER): + + # Define the API endpoint and base parameters + url = "https://api.adzuna.com/v1/api/jobs/ca/search/" + base_params = { + 'app_id': ADZUNA_APP_ID, + 'app_key': ADZUNA_APP_KEY, + 'results_per_page': 50, # Maximum allowed results per page + 'what_phrase': "data engineer", + 'max_days_old': 2, + 'sort_by': "date" + } + print("Adzuna Function triggered to extract raw json data from Adzuna API.") + + # Initialize a list to store all job postings + all_job_postings = [] + + # Make the first request to determine the total number of pages + print("Making the first request to determine the total number of pages") + response = requests.get(f"{url}1", params=base_params) + + if response.status_code != 200: + error_message = f"Error fetching page 1: {response.status_code}, {response.text}" + print(error_message) + + data = response.json() # Parse the JSON response + total_results = data.get('count', 0) + results_per_page = base_params['results_per_page'] + + # Calculate the total number of pages + total_pages = math.ceil(total_results / results_per_page) + print(f"Total number of pages = {total_pages}") + + # Store the results from the first page + all_job_postings.extend(data.get('results', [])) + + # Loop through the remaining pages and request data from each + print("Looping through the remaining pages to request data from each") + for page in range(2, total_pages + 1): # Start from page 2 + response = requests.get(f"{url}{page}", params=base_params) + if response.status_code == 200: + page_data = response.json() + all_job_postings.extend(page_data.get('results', [])) + else: + print(f"Error fetching page {page}: {response.status_code}, {response.text}") + + print(f"Total jobs retrieved: {len(all_job_postings)}") + + raw_json_data = json.dumps({"items": all_job_postings}) + raw_json_bytes = raw_json_data.encode('utf-8') + + # Generate a filename with the current timestamp to store raw data + # Create RAW_DATA_UNPROCESSED_FOLDER if it doesn't exist + os.makedirs(RAW_DATA_UNPROCESSED_FOLDER, exist_ok=True) + current_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + file_name = f"adzuna_raw_data_{current_timestamp}.json" + file_path = RAW_DATA_UNPROCESSED_FOLDER + file_name + print(f"File name to store raw data: {file_path}") + + with open(file_path, "wb") as file: + file.write(raw_json_bytes) + print("Done") + + return file_path diff --git a/de-projects/22_airflow_intro/docker-compose.yaml b/de-projects/22_airflow_intro/docker-compose.yaml index cb112d8..f66c0b4 100644 --- a/de-projects/22_airflow_intro/docker-compose.yaml +++ b/de-projects/22_airflow_intro/docker-compose.yaml @@ -20,6 +20,7 @@ x-airflow-common: - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + - ${AIRFLOW_PROJ_DIR:-.}/raw_data:/opt/airflow/shared-data/raw_data/to_process/ user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on diff --git a/de-projects/22_airflow_intro/raw_data/.gitkeep b/de-projects/22_airflow_intro/raw_data/.gitkeep new file mode 100644 index 0000000..e69de29