diff --git a/.github/workflows/30_deploy_changes_to_production.yml b/.github/workflows/30_deploy_changes_to_production.yml index 49bd5585d..7faab2c7b 100644 --- a/.github/workflows/30_deploy_changes_to_production.yml +++ b/.github/workflows/30_deploy_changes_to_production.yml @@ -48,6 +48,13 @@ jobs: DBT_ARTIFACTS_BUCKET: "convexa-local" + # Needed for dbt-api + DATACOVES__API_ENDPOINT: ${{ vars.DATACOVES__API_ENDPOINT }} + DATACOVES__API_TOKEN: ${{ secrets.DATACOVES__API_TOKEN }} + DATACOVES__ACCOUNT_ID: ${{ vars.DATACOVES__ACCOUNT_ID }} + DATACOVES__PROJECT_SLUG: ${{ vars.DATACOVES__PROJECT_SLUG }} + DATACOVES__ENVIRONMENT_SLUG: ${{ vars.DATACOVES__ENVIRONMENT_SLUG }} + # This is used by datacoves to drop the staging database for blue/green # deployments, most likely you don't want to set this, we use it for demos DATACOVES__DROP_DB_ON_FAIL: ${{ vars.DATACOVES__DROP_DB_ON_FAIL }} @@ -112,6 +119,9 @@ jobs: - name: Upload dbt artifacts run: "dbt run-operation upload_artifacts" + - name: Push dbt artifacts to dbt-api on Datacoves + run: "../automate/dbt/push_dbt_artifacts.py" + - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 with: diff --git a/automate/dbt/push_dbt_artifacts.py b/automate/dbt/push_dbt_artifacts.py new file mode 100755 index 000000000..1fc02fccd --- /dev/null +++ b/automate/dbt/push_dbt_artifacts.py @@ -0,0 +1,175 @@ +#!/usr/bin/env -S uv run +# /// script +# dependencies = [ +# "requests", +# "python-dotenv", +# "rich", +# ] +# /// + +import requests +import os +from dotenv import load_dotenv +import json +from rich import print_json +from rich.console import Console +from rich.table import Table + + +load_dotenv() +base_url = os.getenv("DATACOVES__API_ENDPOINT") +token = os.getenv("DATACOVES__API_TOKEN") +account_id = os.getenv("DATACOVES__ACCOUNT_ID") +project_slug = os.getenv("DATACOVES__PROJECT_SLUG") +environment_slug = os.getenv("DATACOVES__ENVIRONMENT_SLUG") +dbt_home = os.getenv("DATACOVES__DBT_HOME") + + +####################################### +# Utility for api interactions +####################################### +def print_responce(r): + print("STATUS:", r.status_code) + + response_text = r.text + + try: + parsed_json = json.loads(response_text) + print_json(data=parsed_json) + except json.JSONDecodeError: + print("RESPONSE:", response_text) + + print("-----------------------") + +def print_table(items, keys_to_show, title="Items"): + """Print a table showing only specified keys from a list of dictionaries""" + console = Console() + table = Table(title=title) + + # Define different colors for each column + colors = ["blue", "bright_green", "yellow", "green", "cyan", "magenta", "red", "bright_cyan", "bright_magenta", "bright_yellow"] + + # Add columns for each key we want to show with different colors + for index, key in enumerate(keys_to_show): + color = colors[index % len(colors)] # Cycle through colors if more columns than colors + table.add_column(key.replace('_', ' ').title(), style=color) + + # Add rows for each item in the list + for item in items: + row_values = [] + for key in keys_to_show: + value = item.get(key, "N/A") + row_values.append(str(value)) + table.add_row(*row_values) + + console.print(table) + +def get_endpoint(endpoint: str) -> str: + return f"{base_url}/{endpoint}" + +def get_headers() -> dict: + return { + "Accept": "application/json", + "Authorization": f"Bearer {token}" + } + +####################################### +# Get information +####################################### + +def health_check(): + print("Checking Health of api") + + r = requests.get( + url=get_endpoint(endpoint="/api/v3/healthcheck"), + headers=get_headers(), + ) + + print_responce(r) + +####################################### +# Working with files +####################################### + +def list_project_files(account_id: int, project_slug: str): + print(f"Listing files for project: {project_slug}") + + r = requests.get( + # url=get_endpoint(endpoint=f"/api/v3/datacoves/account/{account_id}/projects/{project_slug}/files"), + + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files"), + + headers=get_headers(), + ) + + return r.json().get("data", {}) + +def upload_env_file(account_id: int, project_slug: str, env_slug: str, + filename: str, is_manifest: bool = False, + dag_id: str = None, run_id: str = None, use_multipart: bool = False): + + print(f"Uploading file {filename} to project: {project_slug} in environment: {env_slug}") + + file = {"file": (filename, open(f"{dbt_home}/target/{filename}", "rb"))} + + data = { + 'filename': filename, + 'is_manifest': str(is_manifest).lower() + } + + r = requests.post( + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files"), + headers=get_headers(), + files=file, + data=data + ) + + print_responce(r) + +def promote_env_file(account_id: int, project_slug: str, env_slug: str, + filename: str): + + print(f"Promoting file {filename} in environment: {env_slug} to project level ({project_slug})") + + r = requests.post( + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files/{filename}/promote"), + headers=get_headers() + ) + + print_responce(r) + +def delete_project_file(account_id: int, project_slug: str, filename: str): + + print(f"Deleting file {filename} from project: {project_slug}") + + r = requests.delete( + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files/{filename}"), + headers=get_headers() + ) + + print_responce(r) + +if __name__ == "__main__": + # Get infomration + + health_check() + + cols = ["environment_slug",'filename', 'metadata', 'inserted_at'] + + # UPLOAD FILES + + filenames = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"] + for filename in filenames: + upload_env_file(account_id, project_slug, environment_slug, filename) + + for filename in filenames: + promote_env_file(account_id, project_slug, environment_slug, filename) + + upload_env_file(account_id, project_slug, environment_slug, "manifest.json", is_manifest=True ) + promote_env_file(account_id, project_slug, environment_slug, "manifest.json" ) + + # delete_project_file(account_id, project_slug, "manifest.json") + + # SHOW FILE DETAILS + files = list_project_files(account_id, project_slug) + print_table(files, cols) diff --git a/orchestrate/dags/daily_loan_run.py b/orchestrate/dags/daily_loan_run.py index f4d0cfe5c..9362f9b9c 100644 --- a/orchestrate/dags/daily_loan_run.py +++ b/orchestrate/dags/daily_loan_run.py @@ -63,14 +63,13 @@ def extract_and_load_fivetran(): tooltip="dlt Extract and Load" ) def extract_and_load_dlt(): - @task.datacoves_bash + @task.datacoves_bash( + env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}), + append_env=True + ) def load_loans_data(): - from orchestrate.utils import datacoves_utils - - env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}) - env_exports = datacoves_utils.generate_env_exports(env_vars) - return f"{env_exports}; cd load/dlt && ./loans_data.py" + return "cd load/dlt && ./loans_data.py" load_loans_data() diff --git a/orchestrate/dags/other_examples/airflow_db_cleanup.py b/orchestrate/dags/other_examples/airflow_db_cleanup.py new file mode 100644 index 000000000..091697f47 --- /dev/null +++ b/orchestrate/dags/other_examples/airflow_db_cleanup.py @@ -0,0 +1,202 @@ +""" +## Airflow Database Cleanup DAG + +This DAG automates the cleanup of Airflow's metadata database using the `airflow db clean` command. +To learn more check out https://www.astronomer.io/docs/learn/2.x/cleanup-dag-tutorial + +### Why is this needed? +Airflow retains all historical task data indefinitely. Over time, tables like `dag_run`, `job`, +`log`, `task_instance`, and `xcom` can grow significantly, causing: +- Delayed task scheduling and DAG parsing +- Sluggish UI performance +- Increased storage costs + +### How it works +1. **db_cleanup**: Executes `airflow db clean` to remove old records +2. **drop_archive_tables**: Removes temporary archive tables created during cleanup + +### Parameters +- **clean_before_timestamp**: Delete records older than this date (default: 180 days ago) +- **tables**: Which metadata tables to clean (default: all eligible tables) +- **dry_run**: Preview SQL commands without executing (default: True for safety) + +### Usage +1. Trigger the DAG manually from the Airflow UI +2. Configure parameters as needed +3. For first run, use `dry_run=True` to preview changes +4. Review logs, then run with `dry_run=False` to execute cleanup +""" + +from datetime import datetime, timedelta + +from airflow.decorators import dag, task +from airflow.models.param import Param + +from orchestrate.utils import datacoves_utils + +# Tables eligible for cleanup via `airflow db clean` +CLEANABLE_TABLES = [ + "callback_request", + "dag", + "dag_run", + "dataset_event", + "import_error", + "job", + "log", + "session", + "sla_miss", + "task_fail", + "task_instance", + "task_instance_history", + "task_reschedule", + "trigger", + "xcom" +] + +def get_default_clean_before_date() -> str: + """Returns date 180 days ago as default cleanup cutoff.""" + return (datetime.now() - timedelta(days=180)).strftime("%Y-%m-%d") + +@dag( + doc_md=__doc__, + catchup=False, + default_args=datacoves_utils.set_default_args( + owner="Noel Gomez", + owner_email="noel@example.com" + ), + # Manual trigger only - no schedule + schedule=None, + render_template_as_native_obj=True, + params={ + "clean_before_timestamp": Param( + default=get_default_clean_before_date(), + type="string", + format="date", + description="Delete records older than this date (YYYY-MM-DD)", + ), + "tables": Param( + default=CLEANABLE_TABLES, + type="array", + description="Metadata tables to clean (leave default for all eligible tables)", + ), + "dry_run": Param( + default=True, + type="boolean", + description="Preview SQL commands without executing (recommended for first run)", + ), + }, + description="Clean up Airflow metadata database to improve performance", + tags=["maintenance"], +) +def airflow_db_cleanup(): + + @task() + def db_cleanup( + clean_before_timestamp: str, + tables: list, + dry_run: bool, + ) -> dict: + """ + Execute the airflow db clean command. + + This removes old records from metadata tables while preserving + the most recent data needed for operations. + """ + import subprocess + + # Build the command + cmd = [ + "airflow", + "db", + "clean", + "--clean-before-timestamp", + clean_before_timestamp, + "--skip-archive", + "--yes", + ] + + # Add tables to clean + for table in tables: + cmd.extend(["--tables", table]) + + # Add dry run flag if enabled + if dry_run: + cmd.append("--dry-run") + + print(f"Executing command: {' '.join(cmd)}") + + # Execute the command + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + + print(result.stdout) + if result.stderr: + print(result.stderr) + + if result.returncode != 0: + raise Exception( + f"airflow db clean failed with return code {result.returncode}" + ) + + return { + "status": "success", + "dry_run": dry_run, + "clean_before": clean_before_timestamp, + "tables_cleaned": tables, + } + + @task(trigger_rule="all_done") + def drop_archive_tables(cleanup_result: dict) -> str: + """ + Drop any archive tables created during cleanup. + + Archive tables are temporary tables that may be left behind if + the cleanup process times out or fails. This task ensures they + are removed. + + Uses trigger_rule='all_done' to run even if db_cleanup fails. + """ + import subprocess + + cmd = [ + "airflow", + "db", + "drop-archived", + "--yes", + ] + + print(f"Executing command: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + + print(result.stdout) + if result.stderr: + print(result.stderr) + + if result.returncode != 0: + raise Exception( + f"airflow db drop-archived failed with return code {result.returncode}" + ) + + return "Archive tables dropped successfully" + + # Execute tasks + cleanup_result = db_cleanup( + clean_before_timestamp="{{ params.clean_before_timestamp }}", + tables="{{ params.tables }}", + dry_run="{{ params.dry_run }}", + ) + + drop_archive_tables(cleanup_result) + + +airflow_db_cleanup() diff --git a/orchestrate/dags/other_examples/load_dlt.py b/orchestrate/dags/other_examples/load_dlt.py index 67a02f2fd..360ce4143 100644 --- a/orchestrate/dags/other_examples/load_dlt.py +++ b/orchestrate/dags/other_examples/load_dlt.py @@ -22,14 +22,12 @@ ) def load_with_dlt(): - @task.datacoves_bash + @task.datacoves_bash( + env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}), + append_env=True + ) def load_us_population(): - from orchestrate.utils import datacoves_utils - - env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}) - env_exports = datacoves_utils.generate_env_exports(env_vars) - - return f"{env_exports}; cd load/dlt && ./us_population.py" + return "cd load/dlt && ./us_population.py" load_us_population() diff --git a/orchestrate/dags/other_examples/load_earthquake_data.py b/orchestrate/dags/other_examples/load_earthquake_data.py index eb0ed8191..03b61e260 100644 --- a/orchestrate/dags/other_examples/load_earthquake_data.py +++ b/orchestrate/dags/other_examples/load_earthquake_data.py @@ -36,32 +36,25 @@ def get_last_success_date(**context): return str(success_date - datetime.timedelta(days=3)) # Load earthquake data from USGS - @task.datacoves_bash + @task.datacoves_bash( + env=datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}), + append_env=True + ) def load_usgs_data(**context): - from orchestrate.utils import datacoves_utils - - # Get the start date directly from the upstream task + # Get the start date from the upstream task task_instance = context['task_instance'] - start_date = task_instance.xcom_pull(task_ids = 'get_last_success_date') - - # Set up environment variables - env_vars = datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}) - env_vars['DATACOVES__START_DATE'] = start_date - - env_exports = datacoves_utils.generate_env_exports(env_vars) + start_date = task_instance.xcom_pull(task_ids='get_last_success_date') - return f"{env_exports}; cd load/dlt && ./usgs_earthquake.py --start-date $DATACOVES__START_DATE" + # Pass the start date directly to the command + return f"cd load/dlt && ./usgs_earthquake.py --start-date {start_date}" # Load Country Polygon Data - @task.datacoves_bash + @task.datacoves_bash( + env=datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}), + append_env=True + ) def load_country_geography(): - from orchestrate.utils import datacoves_utils - - env_vars = datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}) - - env_exports = datacoves_utils.generate_env_exports(env_vars) - - return f"{env_exports}; cd load/dlt && ./country_geo.py" + return "cd load/dlt && ./country_geo.py" # Run the dbt transformations @task.datacoves_dbt( diff --git a/orchestrate/test_dags/validate_dags.py b/orchestrate/test_dags/validate_dags.py index e8398dc28..9ee44ee12 100644 --- a/orchestrate/test_dags/validate_dags.py +++ b/orchestrate/test_dags/validate_dags.py @@ -9,14 +9,15 @@ from airflow.models import DagBag APPROVED_TAGS = {'extract_and_load', - 'transform', - 'python_script', - 'ms_teams_notification', - 'slack_notification', - 'marketing_automation', - 'update_catalog', - 'parameters', - 'sample'} + 'maintenance', + 'marketing_automation', + 'ms_teams_notification', + 'parameters', + 'python_script', + 'sample', + 'slack_notification', + 'transform', + 'update_catalog'} ALLOWED_OPERATORS = [ "_PythonDecoratedOperator", # this allows the @task decorator diff --git a/training_and_demos/billing_api/.env.sample b/training_and_demos/billing_api/.env.sample new file mode 100644 index 000000000..bb1893415 --- /dev/null +++ b/training_and_demos/billing_api/.env.sample @@ -0,0 +1,3 @@ +DATACOVES__API_URL = "https://..." +DATACOVES__API_TOKEN = "...." +DATACOVES__ENVIRONMENT = "abc123" diff --git a/training_and_demos/billing_api/billing_api.py b/training_and_demos/billing_api/billing_api.py new file mode 100644 index 000000000..2bf22390a --- /dev/null +++ b/training_and_demos/billing_api/billing_api.py @@ -0,0 +1,106 @@ +import requests +import os +from dotenv import load_dotenv + +load_dotenv() +API_URL = os.getenv("DATACOVES__API_URL") +API_KEY = os.getenv("DATACOVES__API_TOKEN") +ENV_SLUG = os.getenv("DATACOVES__ENVIRONMENT") + +def get_worker_usage(start_date=None, end_date=None, service=None, env_slug=None): + """ + Retrieve worker usage minutes for an environment or all environments. + + Args: + env_slug (str, optional): Environment slug. If None, returns data for all environments + start_date (str, optional): Start date in YYYY-MM-DD format + end_date (str, optional): End date in YYYY-MM-DD format + service (str, optional): Filter by service type (airflow, airbyte, unknown) + + Returns: + list: Usage records + """ + if env_slug: + url = f"{API_URL}/{env_slug}/" + else: + url = f"{API_URL}/" + + params = {} + if start_date: + params['start_date'] = start_date + if end_date: + params['end_date'] = end_date + if service: + params['service'] = service + + response = requests.get( + url=url, + headers={"Authorization": f"Token {API_KEY}"}, + params=params + ) + + print(f"Fetching: {response.url}") + + if not response.ok: + print(response.text) + return + + response.raise_for_status() + return response.json() + +def calculate_total_minutes_by_service(usage_data): + """ + Calculate total minutes used per service. + + Args: + usage_data (list): Usage records from API + + Returns: + dict: Total minutes per service + """ + totals = {} + + for record in usage_data: + service = record['service'] + minutes = record['amount_minutes'] + totals[service] = totals.get(service, 0) + minutes + return totals + +def print_usage_info(usage): + + totals = calculate_total_minutes_by_service(usage) + + for service, minutes in totals.items(): + hours = minutes / 60 + print(f" {service}: {minutes} minutes ({hours:.2f} hours)") + + +# Example: Get usage for last 30 days +print("#### Usage for last 30 days ####") +usage = get_worker_usage() +if usage: + print_usage_info(usage) +else: + print("No Usage Data Returned for the last 30 days") + +# Example: Specific date range +start_date = "2025-07-01" +end_date = "2025-12-27" +print(f"\n#### Usage for {start_date} through {end_date} ####") + +usage = get_worker_usage(start_date, end_date) + +if usage: + print_usage_info(usage) +else: + print(f"No Usage Data Returned for {start_date} to {end_date}") + +# Example: specific environment +print(f"\n#### Usage for environment {ENV_SLUG} ####") + +usage = get_worker_usage(env_slug=ENV_SLUG) + +if usage: + print_usage_info(usage) +else: + print(f"No Usage Data Returned for environment {ENV_SLUG} from {start_date} to {end_date}") diff --git a/training_and_demos/dbt-api/dbt_api_files.py b/training_and_demos/dbt-api/dbt_api_files.py index 005ce5363..c6755bc00 100755 --- a/training_and_demos/dbt-api/dbt_api_files.py +++ b/training_and_demos/dbt-api/dbt_api_files.py @@ -13,7 +13,6 @@ from rich import print_json from rich.console import Console from rich.table import Table -from pathlib import Path load_dotenv() base_url = os.getenv("DATACOVES__API_ENDPOINT") @@ -381,15 +380,15 @@ def delete_project_file(account_id: int, project_slug: str, filename: str): # environment_files = list_environment_files(account_id, project_slug, environment_slug) # print_table(environment_files, cols) - filenames = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"] + filenames = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack", "semantic_manifest.json"] - # UPLOAD FILES - # for filename in filenames: - # upload_env_file(account_id, project_slug, environment_slug, filename) + # UPLOAD ENV FILES + for filename in filenames: + upload_env_file(account_id, project_slug, environment_slug, filename) - # upload_env_file(account_id, project_slug, environment_slug, "manifest.json", is_manifest=True ) + upload_env_file(account_id, project_slug, environment_slug, "manifest.json", is_manifest=True ) - # DELETE FILES + # DELETE ENV FILES # for filename in filenames: # delete_env_file(account_id, project_slug, environment_slug, filename) @@ -399,22 +398,30 @@ def delete_project_file(account_id: int, project_slug: str, filename: str): # for filename in filenames: # show_env_file_details(account_id, project_slug, environment_slug, filename) - # DOWNLOAD Files + # DOWNLOAD ENV Files # for filename in filenames: # download_env_file(account_id, project_slug, environment_slug, filename) # download_env_manifest(account_id, project_slug, environment_slug, trimmed = True) + # PROMOTE ENV FILES + for filename in filenames: + promote_env_file(account_id, project_slug, environment_slug, filename) + + promote_env_file(account_id, project_slug, environment_slug, "manifest.json" ) + + # DOWNLOAD PROJECT FILES # for filename in filenames: - # promote_env_file(account_id, project_slug, environment_slug, filename) # download_project_file(account_id, project_slug, filename) + # download_project_manifest(account_id, project_slug, trimmed = True) + # DELETE FILES # for filename in filenames: # delete_project_file(account_id, project_slug, filename) - # promote_env_file(account_id, project_slug, environment_slug, "manifest.json" ) - # download_project_manifest(account_id, project_slug, trimmed = True) + # delete_project_file(account_id, project_slug, "manifest.json" ) + files = list_project_files(account_id, project_slug) print_table(files, cols) diff --git a/training_and_demos/dbt-api/old-dbt_api_files-v2.py b/training_and_demos/dbt-api/old-dbt_api_files-v2.py new file mode 100644 index 000000000..425f0acd5 --- /dev/null +++ b/training_and_demos/dbt-api/old-dbt_api_files-v2.py @@ -0,0 +1,265 @@ +import requests +import os +from dotenv import load_dotenv +import json +from rich import print_json + +def str_to_bool(s): + return s.lower() in ('true', '1', 'yes', 'y') + +load_dotenv() +base_url = os.getenv("DATACOVES__API_ENDPOINT") +token = os.getenv("DATACOVES__API_TOKEN") +project_slug = os.getenv("DATACOVES__PROJECT_SLUG") +environment_slug = os.getenv("DATACOVES__ENVIRONMENT_SLUG") + +####################################### +# Utility for api interactions +####################################### + +def print_format(r): + print("STATUS:", r.status_code) + + response_text = r.text + + try: + parsed_json = json.loads(response_text) + print_json(data=parsed_json) + except json.JSONDecodeError: + print("RESPONSE:", response_text) + + print("-----------------------") + +def get_endpoint(endpoint: str) -> str: + return f"{base_url}/{endpoint}" + +def get_headers() -> dict: + return { + "Accept": "application/json", + "Authorization": f"Bearer {token}" + } + +####################################### +# Get information +####################################### + +def get_account(id: int): + print(f"Getting info for account: {id}") + + r = requests.get( + url=get_endpoint(endpoint=f"api/v2/accounts/{id}"), + headers=get_headers(), + ) + + print_format(r) + +def get_environments(id: int): + print(f"Getting environments for account: {id}") + + r = requests.get( + url=get_endpoint(endpoint=f"api/v2/accounts/{id}/environments"), + headers=get_headers(), + ) + + print_format(r) + +def get_projects(id: int): + print(f"Getting projects for account: {id}") + + r = requests.get( + url=get_endpoint(endpoint=f"api/v2/accounts/{id}/projects"), + headers=get_headers(), + ) + + print_format(r) + +def health_check(): + print("Checking Health of api") + + r = requests.get( + url=get_endpoint(endpoint="/api/v2/healthcheck"), + headers=get_headers(), + ) + + print_format(r) + +# FIXME +def get_files(): + print(f"Getting files for environment: {environment_slug}") + + # r = requests.get( + # url=get_endpoint(endpoint=f"/api/v2/datacoves/environments/{environment_slug}/files"), + # headers=get_headers(), + # ) + + # print_format(r) + + +####################################### +# Working with files +####################################### + +def delete_file(tag: str = ''): + + print(f"Deleting file by tag: {tag}") + + r = requests.delete( + url=get_endpoint(endpoint=f"/api/v2/datacoves/environments/{environment_slug}/files"), + headers=get_headers(), + data={ + "tag": tag + } + ) + + print_format(r) + + +def upload_single_file(tag: str): + + print(f"Uploading a single file with tag {tag}") + files = {"file": open("file1.txt","rb")} + r = requests.post( + url=get_endpoint(endpoint=f"/api/v2/datacoves/environments/{environment_slug}/files"), + headers=get_headers(), + files=files, + data={"tag": tag} + ) + + print_format(r) + + +def upload_multiple_files(file_names: list, tag: str = 'latest'): + print(f"Uploading files: {file_names}") + + file_root_path = "/config/workspace/transform/target/" + files = {} + for index, file in enumerate(file_names): + files[f"files[{index}][tag]"] = (None, tag) + files[f"files[{index}][file]"] = (file, open(f"{file_root_path}{file}", "rb")) + + r = requests.post( + url=get_endpoint(endpoint=f"/api/v2/datacoves/environments/{environment_slug}/files"), + headers=get_headers(), + files=files, + ) + + print_format(r) + + +def get_file_by_name(file_name: str): + print(f"Getting file {file_name}") + params = f"filename={file_name}" + + r = requests.get( + url=get_endpoint(endpoint=f"/api/v2/datacoves/environments/{environment_slug}/files?{params}"), + headers=get_headers(), + ) + + if r.ok: + try: + content = r.json().get("data", {}).get("contents", "") + if type(content) is dict: + content = json.dumps(content, indent=4) + except requests.exceptions.JSONDecodeError: + content = r.text + with open(file_name, "w") as f: + f.write(content) + print(f"Downloaded {file_name}") + else: + print(r.text) + + +def get_file_by_tag(tag: str): + print("Getting file by tag: {tag}...") + params = f"tag={tag}" + + r = requests.get( + url=get_endpoint(endpoint=f"api/internal/environments/{environment_slug}/files?{params}"), + headers=get_headers(), + ) + + if r.ok: + try: + file_name = r.json().get("data", {}).get("filename", "") + content = r.json().get("data", {}).get("contents", "") + if type(content) is dict: + content = json.dumps(content, indent=4) + except requests.exceptions.JSONDecodeError: + content = r.text + with open(file_name, "w") as f: + f.write(content) + print(f"Downloaded {file_name}") + else: + print(r.text) + + +def upload_manifest(): + print("Uploading manifest") + files = {"file": open("/config/workspace/transform/target/manifest.json", "rb")} + + r = requests.post( + url=get_endpoint(endpoint="/api/v2/datacoves/manifests"), + headers=get_headers(), + files=files, + data={ + "environment_slug": environment_slug + } + ) + + print_format(r) + + +def get_latest_manifest(project_slug: str): + print(f"Getting manifest for project: {project_slug}...") + + r = requests.get( + url=get_endpoint(endpoint=f"/api/v2/datacoves/projects/{project_slug}/latest-manifest"), + headers=get_headers(), + ) + + file_name = "manifest.json" + + if r.ok: + try: + content = r.json() + if type(content) is dict: + content = json.dumps(content, indent=4) + except requests.exceptions.JSONDecodeError: + content = r.text + with open(file_name, "w") as f: + f.write(content) + print(f"Downloaded {file_name}") + else: + print(r.text) + +if __name__ == "__main__": + # Get infomration + + # get_account(1) + # get_environments(1) + # get_projects(1) + # health_check() + + # FIXME + # get_files() + + # Working with files + # upload_single_file(tag="my-file") + # delete_file(tag="my-file") + + # file_names = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"] + # upload_multiple_files(file_names) + # for file in file_names: + # delete_file(tag="latest") + + # file_names = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"] + # for file in file_names: + # get_file_by_name(file) + + # Will only get one file even if multiple have the same tag + # get_file_by_tag("latest") + + # Upload transform/target/manifest.json, then immediately download it to transform/manifest.json + health_check() + upload_manifest() + get_latest_manifest(project_slug) diff --git a/training_and_demos/dbt-api/skip_partial_parse.md b/training_and_demos/dbt-api/skip_partial_parse.md new file mode 100644 index 000000000..5c4bba6ed --- /dev/null +++ b/training_and_demos/dbt-api/skip_partial_parse.md @@ -0,0 +1,14 @@ +We can generate "static" files when deploying to production. + +We can then push them to dbt-api on Datacoves to skip partial parse in Airflow tasks. + +In order for Airflow to skip partial parse these need to be the same when the manifest and partial parse files are generated. + +```yaml + database: MY_DATABASE + role: MY_ROLE + schema: MY_SCHEMA + user: AIRFLOW_USER +``` + +If these values differ, dbt will detect the difference and parse the project. diff --git a/transform/dbt_project.yml b/transform/dbt_project.yml index f4c333d87..e85ee30b0 100644 --- a/transform/dbt_project.yml +++ b/transform/dbt_project.yml @@ -3,7 +3,7 @@ # name or the intended use of these models name: 'balboa' # this is the project version, has nothing to do with dbt version -version: '56.0.0' +version: '57.0.0' config-version: 2 require-dbt-version: ">=1.8.0" diff --git a/transform/package-lock.yml b/transform/package-lock.yml index 979f8fd7d..2874e60ce 100644 --- a/transform/package-lock.yml +++ b/transform/package-lock.yml @@ -1,7 +1,7 @@ packages: - name: dbt_utils package: dbt-labs/dbt_utils - version: 1.3.2 + version: 1.3.3 - name: dbt_expectations package: metaplane/dbt_expectations version: 0.10.10 @@ -10,8 +10,8 @@ packages: version: 0.2.7 - name: dbt_external_tables package: dbt-labs/dbt_external_tables - version: 0.11.1 + version: 0.12.0 - name: dbt_date package: godatadriven/dbt_date version: 0.17.0 -sha1_hash: 7aa15b7d43881cbd57e7fdfaf4088f62aefe68d6 +sha1_hash: 9c82ea718e2f998b9fb89aa546bc3a100620f6b8 diff --git a/transform/packages.yml b/transform/packages.yml index 9c6a79064..e461ad1f2 100644 --- a/transform/packages.yml +++ b/transform/packages.yml @@ -1,13 +1,13 @@ packages: - package: dbt-labs/dbt_utils - version: 1.3.2 + version: 1.3.3 - package: metaplane/dbt_expectations version: 0.10.10 - package: entechlog/dbt_snow_mask version: 0.2.7 # for the latest version tag - package: dbt-labs/dbt_external_tables - version: 0.11.1 + version: 0.12.0 # for the latest version tag # - package: data-mie/dbt_profiler # version: 0.8.4