diff --git a/de-projects/24_databricks_dqx_demo/.python-version b/de-projects/24_databricks_dqx_demo/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/de-projects/24_databricks_dqx_demo/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/de-projects/24_databricks_dqx_demo/README.md b/de-projects/24_databricks_dqx_demo/README.md new file mode 100644 index 0000000..ad03817 --- /dev/null +++ b/de-projects/24_databricks_dqx_demo/README.md @@ -0,0 +1,31 @@ +### DQX Demo Project + +To run in databricks notebook, use the `dqx_demo_notebook.ipynb` file + +To run locally in VS Code, use the `dqx_demo_notebook.ipynb` file + + +#### Configuring remote connection to Databricks in VS Code + +1. Install UV package manager +Ex: +```bash +curl -LsSf https://astral.sh/uv/install.sh | sh +``` + +2. Install dependencies +``` +uv sync +``` + +3. Create access token in Databricks workspace +Go to Profile > Settings > User > Developer > Access tokens > Manage > Generate new token + +4. Create databricks config in the home directory +``` +~/.databrickscfg +[DEFAULT] +host = https://abcdefasdf.databricks.com +token = your_generated_token +serverless_compute_id = auto +``` diff --git a/de-projects/24_databricks_dqx_demo/dqx_demo_notebook.ipynb b/de-projects/24_databricks_dqx_demo/dqx_demo_notebook.ipynb new file mode 100644 index 0000000..d1e27f3 --- /dev/null +++ b/de-projects/24_databricks_dqx_demo/dqx_demo_notebook.ipynb @@ -0,0 +1,527 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a446cd1b-5355-43ab-a1b0-acfea3e28cce", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "#### About DQX\n", + "\n", + "**DQX** = Python-based Data Quality framework designed for validating the quality of PySpark DataFrames developed by Databricks Labs\n", + "\n", + "**Links**:\n", + "- Official docs - https://databrickslabs.github.io/dqx/\n", + "- Github repo - https://github.com/databrickslabs/dqx\n", + "- Article with review - https://www.waitingforcode.com/databricks/data-quality-databricks-dqx/read\n", + "\n", + "**Some features**:\n", + "- Can evaluate dataframe and split it into valid_df and invalid_df\n", + "- Allows to profile data and automatically generate data quality rules candidates (which can be reviewed, refined and finalized as needed)\n", + "- Checks definition as code (Python) or config (yaml)\n", + "- Comes with simple Databricks dashboard for identifying and tracking data quality issues\n", + "- Allows to easily define custom checks (including using sql-based syntax: expression: \"ended_at > started_at\")\n", + "- Support for Spark Batch and Streaming including DLT (Delta Live Tables)\n", + "\n", + "**Cons**:\n", + "- Supposed to check data against single dataframe. So, if you need to check data from several dataframes, you first need to join them into one df.\n", + "- Doesn't have standard referential integrity check out of the box.\n", + "- Dashboard looks very primitive and not quite useful.\n", + "\n", + "**Use cases**:\n", + "- quality checks before-ingestion to curated layers\n", + "- post-factum (already loaded data)\n", + "\n", + "**Alternatives**: dbt, Soda, Great Expectations" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "740cd63a-afd6-4709-ba9a-10d3ae09249f", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Install DQX library" + } + }, + "outputs": [], + "source": [ + "# Installing DQX in notebook\n", + "\n", + "#%pip install databricks-labs-dqx\n", + "\n", + "#dbutils.library.restartPython()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "59f84760-60e9-4da7-a29e-f67a6affcda5", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Create sample dataframe with data quality issues" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql import DataFrame\n", + "from pyspark.sql.types import *\n", + "from pyspark.sql.functions import explode, col\n", + "from databricks.labs.dqx.profiler.profiler import DQProfiler\n", + "from databricks.labs.dqx.profiler.generator import DQGenerator\n", + "from databricks.labs.dqx.engine import DQEngine\n", + "from databricks.sdk import WorkspaceClient\n", + "from databricks.labs.dqx.rule import DQRowRule\n", + "from databricks.labs.dqx import check_funcs\n", + "import yaml\n", + "\n", + "\n", + "dq_engine = DQEngine(WorkspaceClient())\n", + "\n", + "# Creating a sample dataframe with data quality issues\n", + "df = spark.createDataFrame(\n", + " data = [\n", + " #id #name #email #age #signup_date #gender\n", + " (1, \"Alice\", \"alice@example.com\", 30, \"2022-01-15\", \"Female\"),\n", + " (2, \"Bob\", \"bob@example.com\", 25, \"2022-14-01\", \"Male\"),\n", + " (3, \"Charlie\", \"charlie@example.com\", None, \"2022-03-01\", \"Femail\"),\n", + " (4, \"Joanna\", \"joice@example.com\", 30, \"2022-01-15\", \"Female\"),\n", + " (5, \"Eve\", None, 200, \"2022-02-30\", \"Female\"),\n", + " (None, \"Frank\", \"frank@example.com\", 28, \"2022-05-20\", \"F\"),\n", + " ],\n", + " schema = StructType([\n", + " StructField(\"id\", IntegerType(), True),\n", + " StructField(\"name\", StringType(), True),\n", + " StructField(\"email\", StringType(), True),\n", + " StructField(\"age\", IntegerType(), True),\n", + " StructField(\"signup_date\", StringType(), True),\n", + " StructField(\"gender\", StringType(), True),\n", + " ])\n", + ")\n", + "\n", + "display(df)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0fbac5b0-3940-40c1-bfe7-35a4f9bd40cb", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Generate data quality profiles" + } + }, + "outputs": [], + "source": [ + "# Profiling data in data frame to generate data quality profiles\n", + "ws = WorkspaceClient()\n", + "\n", + "# Profile the input data\n", + "profiler = DQProfiler(ws)\n", + "\n", + "# Change the default sample fraction from 30% to 100% for demo purpose\n", + "summary_stats, profiles = profiler.profile(df, options={\"sample_fraction\": 1.0})\n", + "\n", + "print(\"Print summary_stats\")\n", + "print(yaml.safe_dump(summary_stats))\n", + "\n", + "print(\"Print profiles\")\n", + "for profile in profiles:\n", + " print(profile)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2be495e8-ee9e-4a24-b6bd-08f499ad4274", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Generate DQ check candidates (draft)" + } + }, + "outputs": [], + "source": [ + "# Generating data quality checks based on the profiles\n", + "\n", + "generator = DQGenerator(ws)\n", + "\n", + "generated_checks = generator.generate_dq_rules(profiles)\n", + "\n", + "print(\"Print generate data quality check candidates\")\n", + "print(yaml.safe_dump(generated_checks))" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3edc3da3-4452-4f7d-ac3e-ceff304b4840", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Define checks with YAML" + } + }, + "outputs": [], + "source": [ + "# Define checks in YAML\n", + "\n", + "yaml_checks = yaml.safe_load(\"\"\"\n", + "- check:\n", + " function: is_not_null\n", + " arguments:\n", + " column: id\n", + " criticality: error\n", + " name: id_is_null\n", + "- check:\n", + " function: is_in_range\n", + " arguments:\n", + " column: age\n", + " max_limit: 100\n", + " min_limit: 10\n", + " criticality: error\n", + " name: age_isnt_in_range\n", + "- check:\n", + " function: is_valid_date\n", + " arguments:\n", + " column: signup_date\n", + " criticality: error\n", + " name: wrong_date_format\n", + "- check:\n", + " function: is_in_list\n", + " arguments:\n", + " allowed:\n", + " - Female\n", + " - Male\n", + " column: gender\n", + " criticality: error\n", + " name: gender_is_not_in_the_list\n", + "\"\"\")\n", + "\n", + "# Execute checks\n", + "valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(df, yaml_checks)\n", + "display(quarantined_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e11833ef-e084-4f7b-8aee-4c688f42784f", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Define checks with Python" + } + }, + "outputs": [], + "source": [ + "# Option 2: Defining checks in Python\n", + "\n", + "python_checks = [\n", + " DQRowRule(\n", + " name=\"id_is_null\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_not_null_and_not_empty,\n", + " column=\"id\",\n", + " ),\n", + " DQRowRule(\n", + " name=\"age_isnt_in_range\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_range,\n", + " column=\"age\",\n", + " check_func_kwargs={\"min_limit\": 10, \"max_limit\": 100},\n", + " ),\n", + " DQRowRule(\n", + " name=\"wrong_date_format\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_valid_date,\n", + " column=\"signup_date\"\n", + " ),\n", + " DQRowRule(\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_list,\n", + " column=\"gender\",\n", + " check_func_kwargs={\"allowed\": [\"Female\", \"Male\"]},\n", + " )\n", + "]\n", + "\n", + "# Execute checks\n", + "valid_df, quarantined_df = dq_engine.apply_checks_and_split(df, python_checks)\n", + "display(quarantined_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3713cb81-997f-45a5-a6d1-d6223339a8fa", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Execute data quality checks - another way" + } + }, + "outputs": [], + "source": [ + "# Option 2: apply quality rules and flag invalid records as additional columns (`_warning` and `_error`)\n", + "\n", + "#valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(df, yaml_checks) # for yaml defined checks\n", + "\n", + "valid_and_quarantined_df = dq_engine.apply_checks(df, python_checks) # for python defined checks\n", + "\n", + "# Methods to get valid and invalid dataframes\n", + "display(dq_engine.get_valid(valid_and_quarantined_df))\n", + "display(dq_engine.get_invalid(valid_and_quarantined_df))\n", + "\n", + "display(valid_and_quarantined_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c7a43c15-b236-4a88-8ce8-c968f083cd70", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Function to get validation summary" + } + }, + "outputs": [], + "source": [ + "def _get_validation_summary_log(dqx_df_errors: DataFrame):\n", + " \"\"\" Takes a DQX errors DataFrame, counts failures per error name, and returns a summary string \"\"\"\n", + " rows = (\n", + " dqx_df_errors\n", + " .select(explode(\"_errors\").alias(\"err\"))\n", + " .groupby(col(\"err.name\").alias(\"failed_check\"))\n", + " .count()\n", + " .collect()\n", + " )\n", + "\n", + " lines = [\n", + " f'Found {r[\"count\"]} records with failed check \"{r[\"failed_check\"]}\"'\n", + " for r in rows\n", + " ]\n", + "\n", + " validation_summary_log = \"\\n\".join(lines)\n", + " return validation_summary_log" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "acd624a5-144a-47bd-a922-b0ef167e0ef6", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "View validation summary stats" + } + }, + "outputs": [], + "source": [ + "print(_get_validation_summary_log(quarantined_df))" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "377648b0-6bca-42d6-909a-b2d25e198ead", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Configure logging" + } + }, + "outputs": [], + "source": [ + "import logging\n", + "\n", + "def configure_logging():\n", + " logging.basicConfig(\n", + " level=logging.INFO,\n", + " format=\"%(asctime)s - %(levelname)s - %(message)s\",\n", + " handlers=[logging.StreamHandler()],\n", + " force=True\n", + " )\n", + "\n", + "configure_logging()\n", + "\n", + "logging.info(\"test log message\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "cca8e2aa-6305-42ef-b44f-6cb14e38afc8", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Make it a validation function" + } + }, + "outputs": [], + "source": [ + "# Define custom exception for data quality issues\n", + "class DataQualityException(Exception):\n", + " \"\"\"Exception raised when data quality checks fail.\"\"\"\n", + " pass\n", + "\n", + "\n", + "def validate_df(df: DataFrame):\n", + " \"\"\" Validates dataframe against defined data quality checks and throws DataQualityException if any issues are found \"\"\"\n", + " logging.info(\"Started validating dataframe\")\n", + "\n", + " # Defined data quality checks\n", + " dq_checks = [\n", + " DQRowRule(\n", + " name=\"id_is_null\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_not_null_and_not_empty,\n", + " column=\"id\",\n", + " ),\n", + " DQRowRule(\n", + " name=\"age_isnt_in_range\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_range,\n", + " column=\"age\",\n", + " check_func_kwargs={\"min_limit\": 10, \"max_limit\": 100},\n", + " ),\n", + " DQRowRule(\n", + " name=\"wrong_date_format\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_valid_date,\n", + " column=\"signup_date\"\n", + " ),\n", + " DQRowRule(\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_list,\n", + " column=\"gender\",\n", + " check_func_kwargs={\"allowed\": [\"Female\", \"Male\"]},\n", + " )\n", + " ]\n", + "\n", + " # Apply data quality checks\n", + " _, df_errors = dq_engine.apply_checks_and_split(df, dq_checks)\n", + "\n", + " error_count = df_errors.count()\n", + " if error_count > 0:\n", + " raise DataQualityException(\n", + " f\"Data quality failure:\\n{_get_validation_summary_log(df_errors)}\"\n", + " )\n", + " \n", + " logging.info(\"Dataframe validated successfully\")\n", + "\n", + "\n", + "\n", + "# Validate dataframe\n", + "try: \n", + " validate_df(df)\n", + "except DataQualityException as e:\n", + " print(str(e))\n" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "2" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "DQX Demo", + "widgets": {} + }, + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/de-projects/24_databricks_dqx_demo/dqx_demo_vscode.ipynb b/de-projects/24_databricks_dqx_demo/dqx_demo_vscode.ipynb new file mode 100644 index 0000000..1a073a3 --- /dev/null +++ b/de-projects/24_databricks_dqx_demo/dqx_demo_vscode.ipynb @@ -0,0 +1,525 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a446cd1b-5355-43ab-a1b0-acfea3e28cce", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "#### About DQX\n", + "\n", + "**DQX** = Python-based Data Quality framework designed for validating the quality of PySpark DataFrames developed by Databricks Labs\n", + "\n", + "**Links**:\n", + "- Official docs - https://databrickslabs.github.io/dqx/\n", + "- Github repo - https://github.com/databrickslabs/dqx\n", + "- Article with review - https://www.waitingforcode.com/databricks/data-quality-databricks-dqx/read\n", + "\n", + "**Some features**:\n", + "- Can evaluate dataframe and split it into valid_df and invalid_df\n", + "- Allows to profile data and automatically generate data quality rules candidates (which can be reviewed, refined and finalized as needed)\n", + "- Checks definition as code (Python) or config (yaml)\n", + "- Comes with simple Databricks dashboard for identifying and tracking data quality issues\n", + "- Allows to easily define custom checks (including using sql-based syntax: expression: \"ended_at > started_at\")\n", + "- Support for Spark Batch and Streaming including DLT (Delta Live Tables)\n", + "\n", + "**Cons**:\n", + "- Supposed to check data against single dataframe. So, if you need to check data from several dataframes, you first need to join them into one df.\n", + "- Doesn't have standard referential integrity check out of the box.\n", + "- Dashboard looks very primitive and not quite useful.\n", + "\n", + "**Use cases**:\n", + "- quality checks before-ingestion to curated layers\n", + "- post-factum (already loaded data)\n", + "\n", + "**Alternatives**: dbt, Soda, Great Expectations" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Connect to remote databricks workspace serverless compute\n", + "\n", + "from databricks.connect import DatabricksSession\n", + "\n", + "spark = DatabricksSession.builder.getOrCreate()\n", + "\n", + "spark.sql(\"select current_date\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "59f84760-60e9-4da7-a29e-f67a6affcda5", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Create sample dataframe with data quality issues" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql import DataFrame\n", + "from pyspark.sql.types import *\n", + "from pyspark.sql.functions import explode, col\n", + "from databricks.labs.dqx.profiler.profiler import DQProfiler\n", + "from databricks.labs.dqx.profiler.generator import DQGenerator\n", + "from databricks.labs.dqx.engine import DQEngine\n", + "from databricks.sdk import WorkspaceClient\n", + "from databricks.labs.dqx.rule import DQRowRule\n", + "from databricks.labs.dqx import check_funcs\n", + "import yaml\n", + "\n", + "\n", + "dq_engine = DQEngine(WorkspaceClient())\n", + "\n", + "# Creating a sample dataframe with data quality issues\n", + "df = spark.createDataFrame(\n", + " data = [\n", + " #id #name #email #age #signup_date #gender\n", + " (1, \"Alice\", \"alice@example.com\", 30, \"2022-01-15\", \"Female\"),\n", + " (2, \"Bob\", \"bob@example.com\", 25, \"2022-14-01\", \"Male\"),\n", + " (3, \"Charlie\", \"charlie@example.com\", None, \"2022-03-01\", \"Femail\"),\n", + " (4, \"Joanna\", \"joice@example.com\", 30, \"2022-01-15\", \"Female\"),\n", + " (5, \"Eve\", None, 200, \"2022-02-30\", \"Female\"),\n", + " (None, \"Frank\", \"frank@example.com\", 28, \"2022-05-20\", \"F\"),\n", + " ],\n", + " schema = StructType([\n", + " StructField(\"id\", IntegerType(), True),\n", + " StructField(\"name\", StringType(), True),\n", + " StructField(\"email\", StringType(), True),\n", + " StructField(\"age\", IntegerType(), True),\n", + " StructField(\"signup_date\", StringType(), True),\n", + " StructField(\"gender\", StringType(), True),\n", + " ])\n", + ")\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0fbac5b0-3940-40c1-bfe7-35a4f9bd40cb", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Generate data quality profiles" + } + }, + "outputs": [], + "source": [ + "# Profiling data in data frame to generate data quality profiles\n", + "ws = WorkspaceClient()\n", + "\n", + "# Profile the input data\n", + "profiler = DQProfiler(ws)\n", + "\n", + "# Change the default sample fraction from 30% to 100% for demo purpose\n", + "summary_stats, profiles = profiler.profile(df, options={\"sample_fraction\": 1.0})\n", + "\n", + "print(\"Print summary_stats\")\n", + "print(yaml.safe_dump(summary_stats))\n", + "\n", + "print(\"Print profiles\")\n", + "for profile in profiles:\n", + " print(profile)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2be495e8-ee9e-4a24-b6bd-08f499ad4274", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Generate DQ check candidates (draft)" + } + }, + "outputs": [], + "source": [ + "# Generating data quality checks based on the profiles\n", + "\n", + "generator = DQGenerator(ws)\n", + "\n", + "generated_checks = generator.generate_dq_rules(profiles)\n", + "\n", + "print(\"Print generate data quality check candidates\")\n", + "print(yaml.safe_dump(generated_checks))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3edc3da3-4452-4f7d-ac3e-ceff304b4840", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Define checks with YAML" + } + }, + "outputs": [], + "source": [ + "# Define checks in YAML\n", + "\n", + "yaml_checks = yaml.safe_load(\"\"\"\n", + "- check:\n", + " function: is_not_null\n", + " arguments:\n", + " column: id\n", + " criticality: error\n", + " name: id_is_null\n", + "- check:\n", + " function: is_in_range\n", + " arguments:\n", + " column: age\n", + " max_limit: 100\n", + " min_limit: 10\n", + " criticality: error\n", + " name: age_isnt_in_range\n", + "- check:\n", + " function: is_valid_date\n", + " arguments:\n", + " column: signup_date\n", + " criticality: error\n", + " name: wrong_date_format\n", + "- check:\n", + " function: is_in_list\n", + " arguments:\n", + " allowed:\n", + " - Female\n", + " - Male\n", + " column: gender\n", + " criticality: error\n", + " name: gender_is_not_in_the_list\n", + "\"\"\")\n", + "\n", + "# Execute checks\n", + "valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(df, yaml_checks)\n", + "quarantined_df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e11833ef-e084-4f7b-8aee-4c688f42784f", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Define checks with Python" + } + }, + "outputs": [], + "source": [ + "# Option 2: Defining checks in Python\n", + "\n", + "python_checks = [\n", + " DQRowRule(\n", + " name=\"id_is_null\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_not_null_and_not_empty,\n", + " column=\"id\",\n", + " ),\n", + " DQRowRule(\n", + " name=\"age_isnt_in_range\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_range,\n", + " column=\"age\",\n", + " check_func_kwargs={\"min_limit\": 10, \"max_limit\": 100},\n", + " ),\n", + " DQRowRule(\n", + " name=\"wrong_date_format\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_valid_date,\n", + " column=\"signup_date\"\n", + " ),\n", + " DQRowRule(\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_list,\n", + " column=\"gender\",\n", + " check_func_kwargs={\"allowed\": [\"Female\", \"Male\"]},\n", + " )\n", + "]\n", + "\n", + "# Execute checks\n", + "valid_df, quarantined_df = dq_engine.apply_checks_and_split(df, python_checks)\n", + "quarantined_df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3713cb81-997f-45a5-a6d1-d6223339a8fa", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Execute data quality checks - another way" + } + }, + "outputs": [], + "source": [ + "# Option 2: apply quality rules and flag invalid records as additional columns (`_warning` and `_error`)\n", + "\n", + "#valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(df, yaml_checks) # for yaml defined checks\n", + "\n", + "valid_and_quarantined_df = dq_engine.apply_checks(df, python_checks) # for python defined checks\n", + "\n", + "# Methods to get valid and invalid dataframes\n", + "# display(dq_engine.get_valid(valid_and_quarantined_df))\n", + "# display(dq_engine.get_invalid(valid_and_quarantined_df))\n", + "\n", + "valid_and_quarantined_df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c7a43c15-b236-4a88-8ce8-c968f083cd70", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Function to get validation summary" + } + }, + "outputs": [], + "source": [ + "def _get_validation_summary_log(dqx_df_errors: DataFrame):\n", + " \"\"\" Takes a DQX errors DataFrame, counts failures per error name, and returns a summary string \"\"\"\n", + " rows = (\n", + " dqx_df_errors\n", + " .select(explode(\"_errors\").alias(\"err\"))\n", + " .groupby(col(\"err.name\").alias(\"failed_check\"))\n", + " .count()\n", + " .collect()\n", + " )\n", + "\n", + " lines = [\n", + " f'Found {r[\"count\"]} records with failed check \"{r[\"failed_check\"]}\"'\n", + " for r in rows\n", + " ]\n", + "\n", + " validation_summary_log = \"\\n\".join(lines)\n", + " return validation_summary_log" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "acd624a5-144a-47bd-a922-b0ef167e0ef6", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "View validation summary stats" + } + }, + "outputs": [], + "source": [ + "print(_get_validation_summary_log(quarantined_df))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "377648b0-6bca-42d6-909a-b2d25e198ead", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Configure logging" + } + }, + "outputs": [], + "source": [ + "import logging\n", + "\n", + "def configure_logging():\n", + " logging.basicConfig(\n", + " level=logging.INFO,\n", + " format=\"%(asctime)s - %(levelname)s - %(message)s\",\n", + " handlers=[logging.StreamHandler()],\n", + " force=True\n", + " )\n", + "\n", + "configure_logging()\n", + "\n", + "logging.info(\"test log message\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "cca8e2aa-6305-42ef-b44f-6cb14e38afc8", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Make it a validation function" + } + }, + "outputs": [], + "source": [ + "# Define custom exception for data quality issues\n", + "class DataQualityException(Exception):\n", + " \"\"\"Exception raised when data quality checks fail.\"\"\"\n", + " pass\n", + "\n", + "\n", + "def validate_df(df: DataFrame):\n", + " \"\"\" Validates dataframe against defined data quality checks and throws DataQualityException if any issues are found \"\"\"\n", + " logging.info(\"Started validating dataframe\")\n", + "\n", + " # Defined data quality checks\n", + " dq_checks = [\n", + " DQRowRule(\n", + " name=\"id_is_null\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_not_null_and_not_empty,\n", + " column=\"id\",\n", + " ),\n", + " DQRowRule(\n", + " name=\"age_isnt_in_range\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_range,\n", + " column=\"age\",\n", + " check_func_kwargs={\"min_limit\": 10, \"max_limit\": 100},\n", + " ),\n", + " DQRowRule(\n", + " name=\"wrong_date_format\",\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_valid_date,\n", + " column=\"signup_date\"\n", + " ),\n", + " DQRowRule(\n", + " criticality=\"error\",\n", + " check_func=check_funcs.is_in_list,\n", + " column=\"gender\",\n", + " check_func_kwargs={\"allowed\": [\"Female\", \"Male\"]},\n", + " )\n", + " ]\n", + "\n", + " # Apply data quality checks\n", + " _, df_errors = dq_engine.apply_checks_and_split(df, dq_checks)\n", + "\n", + " error_count = df_errors.count()\n", + " if error_count > 0:\n", + " raise DataQualityException(\n", + " f\"Data quality failure:\\n{_get_validation_summary_log(df_errors)}\"\n", + " )\n", + " \n", + " logging.info(\"Dataframe validated successfully\")\n", + "\n", + "\n", + "\n", + "# Validate dataframe\n", + "try: \n", + " validate_df(df)\n", + "except DataQualityException as e:\n", + " print(str(e))\n" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "2" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "DQX Demo", + "widgets": {} + }, + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/de-projects/24_databricks_dqx_demo/pyproject.toml b/de-projects/24_databricks_dqx_demo/pyproject.toml new file mode 100644 index 0000000..8ff3d7b --- /dev/null +++ b/de-projects/24_databricks_dqx_demo/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "dqx-demo" +version = "0.1.0" +description = "DQX Demo Project" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "databricks-connect==16.4.1", + "databricks-labs-dqx>=0.6.0", + "ipykernel>=6.29.5", +]