Skip to content

Commit 6b1b6c8

Browse files
committed
New article on prefect.
1 parent 96c70f2 commit 6b1b6c8

File tree

1 file changed

+170
-0
lines changed

1 file changed

+170
-0
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
---
2+
title: "Streamlining Data Workflows with Prefect"
3+
date: 2024-05-01 14:40:25 -0500
4+
categories:
5+
- prefect
6+
- data_engineering
7+
- python
8+
- scikit-learn
9+
author: steven
10+
---
11+
12+
_Streamlining Data Workflows with Prefect: A Powerful Alternative to Airflow_
13+
14+
![](https://raw.githubusercontent.com/git-steven/git-steven.github.io/master/assets/images/prefect-md.png)
15+
16+
## Introduction
17+
In the world of data engineering, efficient and reliable data workflows are crucial for managing complex data pipelines. [Prefect](https://www.prefect.io/), a modern workflow management system, has emerged as a powerful alternative to Apache Airflow, offering a more flexible and intuitive approach to building and executing data workflows.
18+
19+
_What is Prefect?_
20+
21+
[Prefect](https://www.prefect.io/) is an open-source Python library designed to simplify the process of building, scheduling, and monitoring data workflows. It provides a clean and expressive API that allows data engineers to define complex workflows using Python code, making it easy to create, test, and maintain data pipelines.
22+
23+
Prefect vs. Airflow: Similarities and Advantages
24+
[Prefect](https://www.prefect.io/) shares some similarities with Airflow, as both tools aim to orchestrate and manage data workflows. However, Prefect offers several advantages over Airflow:
25+
26+
1. Python-native: Prefect is built around the concept of using pure Python code to define workflows, making it more intuitive and accessible to Python developers.
27+
2. Task-based approach: Prefect introduces the concept of tasks, which are the building blocks of a workflow. Tasks encapsulate a single unit of work and can be easily composed to create complex workflows.
28+
3. Dynamic flow control: Prefect allows for dynamic flow control, enabling tasks to be added, removed, or modified during runtime based on the results of previous tasks.
29+
4. Concurrency and parallelism: Prefect supports concurrent execution of tasks, allowing for efficient utilization of resources and faster execution of workflows.
30+
5. Advanced error handling: Prefect provides a robust error handling mechanism, allowing for automatic retries, failure notifications, and the ability to define custom error handling strategies.
31+
32+
## Dependencies
33+
Before diving into the example code, let's ensure we have the necessary dependencies installed. Here's a list of the required libraries:
34+
35+
- prefect
36+
- prefect-sqlalchemy
37+
- pandas
38+
- numpy
39+
- scikit-learn
40+
- typer
41+
42+
You can install these dependencies using pip:
43+
```bash
44+
pip install prefect prefect-sqlalchemy pandas numpy scikit-learn typer
45+
```
46+
47+
Or, you can follow along using the full working project on [github](https://github.com/terracoil/terracoil-prefect).
48+
It uses [poetry](https://python-poetry.org/) as a package manager, so you'll need to install that.
49+
50+
Understanding Tasks and Flows in Prefect
51+
In [Prefect](https://www.prefect.io/), a "task" is a Python function decorated with the `@task` decorator. Tasks encapsulate a single unit of work and can take inputs, perform computations, and produce outputs. Tasks are the fundamental building blocks of a Prefect workflow.
52+
53+
A flow, on the other hand, is a collection of tasks arranged in a specific order to accomplish a larger goal. Flows define the dependencies between tasks and specify the order in which they should be executed. Flows are created using the `@flow` decorator in Prefect.
54+
55+
## Example Code
56+
Let's take a closer look at the provided example code and understand how it leverages Prefect for an ETL pipeline.
57+
58+
### Extract
59+
In the `extract_data` task, we use the `connection_context_manager` to establish a connection to the source database. We then execute a SQL query to extract all data from the `source_data` table and return it as a pandas DataFrame.
60+
61+
```python
62+
@task
63+
def extract_data() -> DataFrame:
64+
"""
65+
Extract all data from source_data table into a dataframe
66+
:return new DataFrame with all data
67+
"""
68+
logger = get_run_logger()
69+
logger.info("Extracting data...")
70+
71+
with connection_context_manager() as connector:
72+
connection = connector.get_connection(begin=False)
73+
query = "SELECT * FROM source_data"
74+
df = pd.read_sql(query, connection)
75+
return df
76+
```
77+
78+
79+
### Transform
80+
The `transform_data` task takes the extracted DataFrame as input and performs various data transformations. It applies data cleaning by removing any missing values using `df.dropna(inplace=True)`. It then performs data normalization using `MinMaxScaler`, standardization using `StandardScaler`, and Gaussian transformation using `QuantileTransformer` from the scikit-learn library.
81+
82+
```python
83+
@task
84+
def transform_data(df: DataFrame) -> DataFrame:
85+
"""
86+
Transform source data from given DataFrame. Performs cleaning,
87+
:return transformed data with 3 columns per feature:
88+
- normalized feature
89+
- standardized feature
90+
- Gaussian-transformed feature
91+
"""
92+
# ...
93+
94+
# Data normalization
95+
scaler = MinMaxScaler()
96+
normalized_data = scaler.fit_transform(df)
97+
df_normalized = pd.DataFrame(normalized_data, columns=NORMALIZED_COLUMNS)
98+
df_normalized.drop(columns=['id'], inplace=True)
99+
100+
# Standardization
101+
standardized_data = StandardScaler().fit_transform(df)
102+
df_standardized = pd.DataFrame(standardized_data, columns=STANDARDIZED_COLUMNS)
103+
df_standardized.drop(columns=['id'], inplace=True)
104+
105+
# Gaussian transformation
106+
gaussian_data = QuantileTransformer(output_distribution='normal').fit_transform(df)
107+
df_gaussian = pd.DataFrame(gaussian_data, columns=GAUSSIAN_COLUMNS)
108+
df_gaussian.drop(columns=['id'], inplace=True)
109+
110+
# ...
111+
```
112+
113+
The transformed data is stored in separate DataFrames (`df_normalized`, `df_standardized`, `df_gaussian`) with appropriate column names. These DataFrames are then merged into a single DataFrame `df_xform` before being returned.
114+
115+
## Load
116+
The load task simply stores the transformed data into the `destination_data` table.
117+
```python
118+
@task
119+
def load_data(df_xform) -> None:
120+
"""
121+
Load transformed data into destination_data table
122+
123+
:param df_xform: The transformed data to load
124+
"""
125+
# ...
126+
with connection_context_manager() as connector:
127+
connection = connector.get_connection(begin=False)
128+
df_xform.to_sql('destination_data', connection, if_exists='append', index=False, chunksize=50000)
129+
```
130+
131+
### ETL Pipeline flow
132+
133+
The `etl_pipeline` flow defines the overall ETL process. It calls the `extract_data` task to retrieve the source data, passes it to the `transform_data` task for transformations, and finally calls the `load_data` task to load the transformed data into the destination table.
134+
135+
```python
136+
@flow
137+
def etl_pipeline():
138+
"""
139+
ETL pipeline flow. Extracts data from source, transforms it, then
140+
loads it to destination. Transformation step applies data cleaning,
141+
providing normalized features with MinMaxScaler, standardized features,
142+
and Gaussian-transformed features using QuantileTransformer.
143+
"""
144+
df = extract_data()
145+
df_xform = transform_data(df)
146+
load_data(df_xform)
147+
```
148+
149+
## Details
150+
151+
### MinMaxScaler
152+
MinMaxScaler is a scaling technique that transforms features to a specific range, typically between 0 and 1. It is useful when features have different scales, and you want to bring them to a common scale for comparison or visualization. MinMaxScaler is also beneficial when working with algorithms sensitive to feature scales, such as neural networks or support vector machines. However, it is sensitive to outliers, which can significantly impact the scaling of the features.
153+
154+
### StandardScaler
155+
StandardScaler is a scaling technique that standardizes features by removing the mean and scaling to unit variance. It is useful when features have different units or scales, and you want to bring them to a common scale with zero mean and unit variance. StandardScaler is particularly helpful when working with algorithms that assume normally distributed input features, such as linear regression or logistic regression. It gives equal importance to all features, regardless of their original scale. StandardScaler is less sensitive to outliers compared to MinMaxScaler, but extreme outliers can still affect the mean and standard deviation calculations.
156+
157+
### Gaussian distribution
158+
_Leveraging Gaussian Transformation with QuantileTransformer_
159+
160+
In the example code, the `transform_data` task utilizes the `QuantileTransformer` from scikit-learn to perform Gaussian transformation on the input data. The Gaussian transformation aims to transform the data to follow a normal (Gaussian) distribution.
161+
162+
By setting `output_distribution='normal'` in the `QuantileTransformer`, the transformed data will have a distribution that approximates a Gaussian distribution. This can be beneficial when working with algorithms that assume normally distributed input data or when you want to reduce the impact of outliers.
163+
164+
165+
## Conclusion
166+
Prefect offers a powerful and flexible alternative to Airflow for building and managing data workflows. With its Python-native approach, task-based composition, dynamic flow control, and advanced error handling capabilities, Prefect simplifies the process of creating and maintaining complex data pipelines.
167+
168+
By leveraging Prefect's tasks and flows, data engineers can easily define and orchestrate data workflows, incorporating essential data preprocessing techniques like normalization, standardization, and Gaussian transformation using scikit-learn's `QuantileTransformer`. These techniques enhance the quality and compatibility of data, enabling more effective and efficient downstream data processing and analysis tasks.
169+
170+
As data workflows continue to grow in complexity, tools like Prefect empower data engineers to streamline their workflows, improve data quality, and focus on delivering valuable insights from their data pipelines.

0 commit comments

Comments
 (0)