From 5a02f04590e4288e2bf256db47f6af890eb16dd0 Mon Sep 17 00:00:00 2001 From: Marija Selakovic Date: Sun, 14 Sep 2025 23:14:45 +0200 Subject: [PATCH 1/3] Prefect: Index page and starter tutorial --- docs/ingest/etl/index.md | 1 + docs/integrate/prefect/index.md | 47 +++++++++++++++++ docs/integrate/prefect/tutorial.md | 84 ++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 docs/integrate/prefect/index.md create mode 100644 docs/integrate/prefect/tutorial.md diff --git a/docs/ingest/etl/index.md b/docs/ingest/etl/index.md index fddf5957..fe8b5189 100644 --- a/docs/ingest/etl/index.md +++ b/docs/ingest/etl/index.md @@ -53,6 +53,7 @@ Load data from streaming platforms. - {ref}`kestra` - {ref}`meltano` - {ref}`nifi` +- {ref}`prefect` +++ Use data pipeline programming frameworks and platforms. :::: diff --git a/docs/integrate/prefect/index.md b/docs/integrate/prefect/index.md new file mode 100644 index 00000000..214747fb --- /dev/null +++ b/docs/integrate/prefect/index.md @@ -0,0 +1,47 @@ +(prefect)= +# Prefect + +```{div} .float-right +[![Prefect logo](https://i.logos-download.com/112205/28342-og-60c4845cec1905f22b2878f23a9c29ad.png/Prefect_Technologies_Logo_og.png){w=180px}][Prefect] +``` +```{div} .clearfix +``` + +:::{div} sd-text-muted +Modern Workflow Orchestration. +::: + +:::{rubric} About +::: + +[Prefect] is a workflow orchestration framework for building resilient data +pipelines in Python. + +Give your team the power to build reliable workflows without sacrificing +development speed. Prefect Core combines the freedom of pure Python +development with production-grade resilience, putting you in control of +your data operations. Transform your code into scalable workflows that +deliver consistent results. + +:::{rubric} Learn +::: + +::::{grid} + +:::{grid-item-card} Tutorial: Combining Prefect and CrateDB +:link: prefect-tutorial +:link-type: ref +Building Seamless Data Pipelines Made Easy: Combining Prefect and CrateDB. +::: + +:::: + + +:::{toctree} +:maxdepth: 1 +:hidden: +Tutorial +::: + + +[Prefect]: https://www.prefect.io/opensource diff --git a/docs/integrate/prefect/tutorial.md b/docs/integrate/prefect/tutorial.md new file mode 100644 index 00000000..7ae40763 --- /dev/null +++ b/docs/integrate/prefect/tutorial.md @@ -0,0 +1,84 @@ +(prefect-tutorial)= +# Building Seamless Data Pipelines Made Easy: Combining Prefect and CrateDB + +## Introduction + +[Prefect](https://www.prefect.io/opensource/) is an open-source workflow automation and orchestration tool for data engineering, machine learning, and other data-related tasks. It allows you to define, schedule, and execute complex data workflows in a straightforward manner. + +Prefect workflows are defined using *Python code*. Each step in the workflow is represented as a "task," and tasks can be connected to create a directed acyclic graph (DAG). The workflow defines the sequence of task execution and can include conditional logic and branching. Furthermore, Prefect provides built-in scheduling features that set up cron-like schedules for the flow. You can also parameterize your flow, allowing a run of the same flow with different input values. + +This tutorial will explore how CrateDB and Prefect come together to streamline data ingestion, transformation, and loading (ETL) processes with a few lines of Python code. + +## Prerequisites + +Before we begin, ensure you have the following prerequisites installed on your system: + +* **Python 3.x**: Prefect is a Python-based workflow management system, so you'll need Python installed on your machine. +* **CrateDB**: To work with CrateDB, create a new cluster in [CrateDB Cloud](https://console.cratedb.cloud/). You can choose the CRFEE tier cluster that does not require any payment information. +* **Prefect**: Install Prefect using pip by running the following command in your terminal or command prompt: `pip install -U prefect` + +## Getting started with Perfect + +1. To get started with Prefect, you need to connect to Prefect’s API: the easiest way is to sign up for a free forever Cloud account at [https://app.prefect.cloud/](https://app.prefect.cloud/?deviceId=cfc80edd-a234-4911-a25e-ff0d6bb2c32a&deviceId=cfc80edd-a234-4911-a25e-ff0d6bb2c32a). +2. Once you create a new account, create a new workspace with a name of your choice. +3. Run `prefect cloud login` to [log into Prefect Cloud](https://docs.prefect.io/cloud/users/api-keys) from the local environment. + +Now you are ready to build your first data workflows! + +## Run your first ETL workflow with CrateDB +We'll dive into the basics of Prefect by creating a simple workflow with tasks that fetch data from a source, perform basic transformations, and load it into CrateDB. For this example, we will use [the yellow taxi trip data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz), which includes pickup time, geo-coordinates, number of passengers, and several other variables. The goal is to create a workflow that does a basic transformation on this data and inserts it into a CrateDB table named `trip_data`: + +```python +import pandas as pd +from prefect import flow, task +from crate import client + +CSV_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz" +URI = "crate://admin:password@host:5432" + +@task() +def extract_data(url: str): + df = pd.read_csv(url, compression="gzip") + return df + +@task() +def transform_data(df): + df = df[df['passenger_count'] != 0] + return df + +@task() +def load_data(table_name, df): + df.to_sql(table_name,URI,if_exists="replace",index=False) + +@flow(name="ETL workflow", log_prints=True) +def main_flow(): + raw_data = extract_data(CSV_URL) + data = transform_data(raw_data) + load_data("trip_data", data) + +if __name__ == '__main__': + main_flow() +``` + +1. We start defining the flow by importing the necessary modules, including `prefect` for working with workflows, `pandas` for data manipulation, and `crate` for interacting with CrateDB. +2. Next, we specify the connection parameters for CrateDB and the URL for a file containing the dataset. You should modify these values according to your CrateDB Cloud setup. +3. We define three tasks using the `@task` decorator: `extract_data(url)`, `transform_data(data)`, and `load_data(table_name, transformed_data)`. Each task represents a unit of work in the workflow: + 1. The `read_data()` task loads the data from the CSV file to a `pandas` data frame. + 2. The `transform_data(data)` task takes the data frame and returns the data frame with entries where the `passenger_count` value is different than 0. + 3. The `load_data(transformed_data)` task connects to the CrateDB and loads data into the `trip_data` table. +4. We define the workflow, name it “ETL workflow“, and specify the sequence of tasks: `extract_data()`, `transform_data(data)`, and `load_data(table_name, transformed_data)`. +5. Finally, we execute the flow by calling `main_flow()`. This runs the workflow, and each task is executed in the order defined. + +When you run this Python script, the workflow will read the trip data from a `csv` file, transform it, and load it into the CrateDB table. You can see the state of the flow run in the *Flows Runs* tab in Prefect UI: + +![Screenshot 2023-08-01 at 09.50.02|690x328](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/ecd02359cf23b5048e084faa785c7ad795bb5e57.png) + +You can enrich the ETL pipeline with many advanced features available in Prefect such as parameterization, error handling, retries, and more. Finally, after the successful execution of the workflow, you can query the data in the CrateDB: + +![Screenshot 2023-08-01 at 09.49.20|690x340](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/5582fcd2a677f78f8f7c6a1aa4b8e14f25dda2d1.png) + +## Wrap up + +Throughout this tutorial, you made a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive feature sets that you can use to optimize and scale your data workflows further. + +As you continue exploring, don’t forget to check out the {ref}`reference documentation `. If you have further questions or would like to learn more about updates, features, and integrations, join the [CrateDB community](https://community.cratedb.com/). Happy data wrangling! From 8adc7d3c3deb212efe4d1d49d5bf45ee3e5933f1 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 16 Sep 2025 19:46:57 +0200 Subject: [PATCH 2/3] Prefect: Implement suggestions by CodeRabbit --- docs/integrate/prefect/index.md | 14 ++++----- docs/integrate/prefect/tutorial.md | 47 +++++++++++++++++------------- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/docs/integrate/prefect/index.md b/docs/integrate/prefect/index.md index 214747fb..c068f2ae 100644 --- a/docs/integrate/prefect/index.md +++ b/docs/integrate/prefect/index.md @@ -8,20 +8,18 @@ ``` :::{div} sd-text-muted -Modern Workflow Orchestration. +Orchestrate modern data workflows in Python. ::: :::{rubric} About ::: -[Prefect] is a workflow orchestration framework for building resilient data -pipelines in Python. +Use [Prefect] to orchestrate resilient data pipelines in Python. -Give your team the power to build reliable workflows without sacrificing -development speed. Prefect Core combines the freedom of pure Python -development with production-grade resilience, putting you in control of -your data operations. Transform your code into scalable workflows that -deliver consistent results. +Build reliable workflows without sacrificing development speed. Prefect +combines the freedom of pure Python with production‑grade resilience, +putting you in control of your data operations. Turn code into scalable +workflows that deliver consistent results. :::{rubric} Learn ::: diff --git a/docs/integrate/prefect/tutorial.md b/docs/integrate/prefect/tutorial.md index 7ae40763..d50bfb93 100644 --- a/docs/integrate/prefect/tutorial.md +++ b/docs/integrate/prefect/tutorial.md @@ -3,11 +3,11 @@ ## Introduction -[Prefect](https://www.prefect.io/opensource/) is an open-source workflow automation and orchestration tool for data engineering, machine learning, and other data-related tasks. It allows you to define, schedule, and execute complex data workflows in a straightforward manner. +[Prefect](https://www.prefect.io/opensource/) is an open-source workflow orchestration tool for data engineering, machine learning, and other data tasks. You define, schedule, and execute complex data workflows with straightforward Python code. -Prefect workflows are defined using *Python code*. Each step in the workflow is represented as a "task," and tasks can be connected to create a directed acyclic graph (DAG). The workflow defines the sequence of task execution and can include conditional logic and branching. Furthermore, Prefect provides built-in scheduling features that set up cron-like schedules for the flow. You can also parameterize your flow, allowing a run of the same flow with different input values. +You define Prefect workflows in Python. Each step is a “task,” and tasks form a directed acyclic graph (DAG). Flows can branch and include conditional logic. Prefect also provides built‑in scheduling and flow parameters so you can run the same flow with different inputs. -This tutorial will explore how CrateDB and Prefect come together to streamline data ingestion, transformation, and loading (ETL) processes with a few lines of Python code. +This tutorial shows how to combine CrateDB and Prefect to streamline ETL with a few lines of Python. ## Prerequisites @@ -15,26 +15,29 @@ Before we begin, ensure you have the following prerequisites installed on your s * **Python 3.x**: Prefect is a Python-based workflow management system, so you'll need Python installed on your machine. * **CrateDB**: To work with CrateDB, create a new cluster in [CrateDB Cloud](https://console.cratedb.cloud/). You can choose the CRFEE tier cluster that does not require any payment information. -* **Prefect**: Install Prefect using pip by running the following command in your terminal or command prompt: `pip install -U prefect` +* **Prefect**: Install Prefect using pip: `pip install -U prefect` +* **SQLAlchemy + CrateDB dialect**: `pip install -U sqlalchemy sqlalchemy-cratedb` -## Getting started with Perfect +## Getting started with Prefect -1. To get started with Prefect, you need to connect to Prefect’s API: the easiest way is to sign up for a free forever Cloud account at [https://app.prefect.cloud/](https://app.prefect.cloud/?deviceId=cfc80edd-a234-4911-a25e-ff0d6bb2c32a&deviceId=cfc80edd-a234-4911-a25e-ff0d6bb2c32a). +1. To get started with Prefect, connect to Prefect’s API by signing up for a free Cloud account at [https://app.prefect.cloud/](https://app.prefect.cloud/). 2. Once you create a new account, create a new workspace with a name of your choice. 3. Run `prefect cloud login` to [log into Prefect Cloud](https://docs.prefect.io/cloud/users/api-keys) from the local environment. Now you are ready to build your first data workflows! ## Run your first ETL workflow with CrateDB -We'll dive into the basics of Prefect by creating a simple workflow with tasks that fetch data from a source, perform basic transformations, and load it into CrateDB. For this example, we will use [the yellow taxi trip data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz), which includes pickup time, geo-coordinates, number of passengers, and several other variables. The goal is to create a workflow that does a basic transformation on this data and inserts it into a CrateDB table named `trip_data`: + +This section walks you through a simple workflow that fetches data, applies a basic transformation, and loads it into CrateDB. It uses the [yellow taxi trip dataset](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz), which includes pickup time, geo‑coordinates, passenger count, and other fields. The goal is to write transformed data to a CrateDB table named `trip_data`: ```python import pandas as pd from prefect import flow, task -from crate import client +from sqlalchemy import create_engine CSV_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz" -URI = "crate://admin:password@host:5432" +URI = "crate://admin:password@host:4200" +engine = create_engine(URI) @task() def extract_data(url: str): @@ -48,7 +51,7 @@ def transform_data(df): @task() def load_data(table_name, df): - df.to_sql(table_name,URI,if_exists="replace",index=False) + df.to_sql(table_name, engine, if_exists="replace", index=False) @flow(name="ETL workflow", log_prints=True) def main_flow(): @@ -60,25 +63,27 @@ if __name__ == '__main__': main_flow() ``` -1. We start defining the flow by importing the necessary modules, including `prefect` for working with workflows, `pandas` for data manipulation, and `crate` for interacting with CrateDB. -2. Next, we specify the connection parameters for CrateDB and the URL for a file containing the dataset. You should modify these values according to your CrateDB Cloud setup. -3. We define three tasks using the `@task` decorator: `extract_data(url)`, `transform_data(data)`, and `load_data(table_name, transformed_data)`. Each task represents a unit of work in the workflow: - 1. The `read_data()` task loads the data from the CSV file to a `pandas` data frame. - 2. The `transform_data(data)` task takes the data frame and returns the data frame with entries where the `passenger_count` value is different than 0. - 3. The `load_data(transformed_data)` task connects to the CrateDB and loads data into the `trip_data` table. -4. We define the workflow, name it “ETL workflow“, and specify the sequence of tasks: `extract_data()`, `transform_data(data)`, and `load_data(table_name, transformed_data)`. -5. Finally, we execute the flow by calling `main_flow()`. This runs the workflow, and each task is executed in the order defined. +1. Start by importing the necessary modules: `prefect` for workflows, `pandas` for data manipulation, and SQLAlchemy for the database connection. +2. Specify the CrateDB connection URI and the dataset URL. Modify these values for your CrateDB Cloud setup. +3. Define three tasks with the `@task` decorator—`extract_data(url)`, `transform_data(df)`, and `load_data(table_name, df)`: + + 1. `extract_data()` reads the CSV into a pandas DataFrame. + 2. `transform_data(df)` filters out rows where `passenger_count` is 0. + 3. `load_data(table_name, df)` writes the data to the `trip_data` table in CrateDB. + +4. Define the flow, name it “ETL workflow,” and order the tasks: `extract_data()`, `transform_data()`, then `load_data()`. +5. Execute the flow by calling `main_flow()`. Prefect runs each task in order. -When you run this Python script, the workflow will read the trip data from a `csv` file, transform it, and load it into the CrateDB table. You can see the state of the flow run in the *Flows Runs* tab in Prefect UI: +When you run the script, the workflow reads the trip data from a CSV file, transforms it, and loads it into CrateDB. You can see the state of the run in the *Flow Runs* tab in the Prefect UI: ![Screenshot 2023-08-01 at 09.50.02|690x328](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/ecd02359cf23b5048e084faa785c7ad795bb5e57.png) -You can enrich the ETL pipeline with many advanced features available in Prefect such as parameterization, error handling, retries, and more. Finally, after the successful execution of the workflow, you can query the data in the CrateDB: +You can enrich the pipeline with Prefect features such as parameters, error handling, and retries. After a successful run, query the data in CrateDB: ![Screenshot 2023-08-01 at 09.49.20|690x340](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/5582fcd2a677f78f8f7c6a1aa4b8e14f25dda2d1.png) ## Wrap up -Throughout this tutorial, you made a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive feature sets that you can use to optimize and scale your data workflows further. +In this tutorial, you created a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive features that help you optimize and scale your data workflows. As you continue exploring, don’t forget to check out the {ref}`reference documentation `. If you have further questions or would like to learn more about updates, features, and integrations, join the [CrateDB community](https://community.cratedb.com/). Happy data wrangling! From b4bc57c58c31a5e82d0e350b0c221daab66e8b1c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 23 Sep 2025 23:03:15 +0200 Subject: [PATCH 3/3] Prefect: s/tutorial/usage/ --- docs/integrate/prefect/index.md | 8 ++++---- docs/integrate/prefect/{tutorial.md => usage.md} | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) rename docs/integrate/prefect/{tutorial.md => usage.md} (91%) diff --git a/docs/integrate/prefect/index.md b/docs/integrate/prefect/index.md index c068f2ae..2f4b036c 100644 --- a/docs/integrate/prefect/index.md +++ b/docs/integrate/prefect/index.md @@ -26,10 +26,10 @@ workflows that deliver consistent results. ::::{grid} -:::{grid-item-card} Tutorial: Combining Prefect and CrateDB -:link: prefect-tutorial +:::{grid-item-card} Combine Prefect and CrateDB for building seamless data pipelines +:link: prefect-usage :link-type: ref -Building Seamless Data Pipelines Made Easy: Combining Prefect and CrateDB. +Learn how to build Prefect workflows in Python, focused on tasks around CrateDB. ::: :::: @@ -38,7 +38,7 @@ Building Seamless Data Pipelines Made Easy: Combining Prefect and CrateDB. :::{toctree} :maxdepth: 1 :hidden: -Tutorial +Usage ::: diff --git a/docs/integrate/prefect/tutorial.md b/docs/integrate/prefect/usage.md similarity index 91% rename from docs/integrate/prefect/tutorial.md rename to docs/integrate/prefect/usage.md index d50bfb93..5c5d7e1d 100644 --- a/docs/integrate/prefect/tutorial.md +++ b/docs/integrate/prefect/usage.md @@ -1,5 +1,5 @@ -(prefect-tutorial)= -# Building Seamless Data Pipelines Made Easy: Combining Prefect and CrateDB +(prefect-usage)= +# Combine Prefect and CrateDB for building seamless data pipelines ## Introduction @@ -7,7 +7,7 @@ You define Prefect workflows in Python. Each step is a “task,” and tasks form a directed acyclic graph (DAG). Flows can branch and include conditional logic. Prefect also provides built‑in scheduling and flow parameters so you can run the same flow with different inputs. -This tutorial shows how to combine CrateDB and Prefect to streamline ETL with a few lines of Python. +This usage guide shows how to combine CrateDB and Prefect to streamline ETL with a few lines of Python. ## Prerequisites @@ -84,6 +84,6 @@ You can enrich the pipeline with Prefect features such as parameters, error hand ## Wrap up -In this tutorial, you created a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive features that help you optimize and scale your data workflows. +In this usage guide, you created a simple Prefect workflow, defined tasks, and orchestrated data transformations and loading into CrateDB. Both tools offer extensive features that help you optimize and scale your data workflows. As you continue exploring, don’t forget to check out the {ref}`reference documentation `. If you have further questions or would like to learn more about updates, features, and integrations, join the [CrateDB community](https://community.cratedb.com/). Happy data wrangling!