Skip to content

Commit 3e7d99a

Browse files
committed
Airflow: Implement suggestions by CodeRabbit, part 5
1 parent 028d2d2 commit 3e7d99a

File tree

6 files changed

+25
-27
lines changed

6 files changed

+25
-27
lines changed

docs/integrate/airflow/data-retention-hot-cold.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ Assume a basic Astronomer/Airflow setup is in place, as described in the {ref}`f
139139

140140
The CrateDB cluster will then automatically initiate the relocation of the affected partition to a node that fulfills the requirement (`cratedb03` in our case).
141141

142-
The full implementation is available as [data_retention_reallocate_dag.py](https://github.com/crate/crate-airflow-tutorial/blob/main/dags/data_retention_reallocate_dag.py) on GitHub.
142+
The full implementation is available as [data_retention_reallocate_dag.py](https://github.com/crate/cratedb-airflow-tutorial/blob/main/dags/data_retention_reallocate_dag.py) on GitHub.
143143

144144
To validate our implementation, we trigger the DAG once manually via the Airflow UI at `http://localhost:8081/`. Once executed, log messages of the `reallocate_partitions` task confirm the reallocation was triggered for the partition with the sample data set up earlier:
145145

docs/integrate/airflow/data-retention-policy.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ INSERT INTO retention_policies (table_schema, table_name, partition_column, rete
4242
```
4343

4444
## Implementation in Apache Airflow
45-
To automate the process of deleting expired data we use [Apache Airflow](https://airflow.apache.org/). Our workflow implementation does the following: _once a day, fetch policies from the database, and delete all data for which the retention period expired._
45+
46+
Use [Apache Airflow](https://airflow.apache.org/) to automate deletions. Once a day, fetch policies from the database and delete data whose retention period expired.
4647

4748
### Retrieving Retention Policies
4849
The first step consists of a task that queries partitions affected by retention policies. We do this by joining `retention_policies` and `information_schema.table_partitions` tables and selecting values with expired retention periods. In CrateDB, `information_schema.table_partitions` [{ref}`documentation <crate-reference:is_table_partitions>`] contains information about all partitioned tables including the name of the table, schema, partition column, and the values of the partition.
@@ -79,7 +80,7 @@ The first step is to create the function `get_policies` that takes as a paramete
7980
### Cross-Communication Between Tasks
8081
Before we continue into the implementation of the next task in Apache Airflow, we would like to give a brief overview of how the data is communicated between different tasks in a DAG. For this purpose, Airflow introduces the [XCom](https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) system. Simply speaking `XCom` can be seen as a small object with storage that allows tasks to `push` data into that storage that can be later used by a different task in the DAG.
8182

82-
The key thing here is that it allows the exchange of a **small** amount of data between tasks. From Airflow 2.0, the return value of a Python method used as a task will be automatically stored in `XCom`. For our example, this means that the `get_policies` return value is available from the next task after the `get_policies` operator executes. To access the data from another task, a reference to the previous task can be passed to the next task when defining dependencies between tasks.
83+
XCom exchanges a small amount of data between tasks. Since Airflow 2.0, a Python task’s return value is stored in XCom. In our case, `get_policies` returns the partitions; the next task reads them via a reference to `get_policies` when defining dependencies.
8384

8485
### Applying Retention Policies
8586
Now that we retrieved the policies and Airflow automatically saved them via `XCom`, we need to create another task that will go through each element in the list and delete expired data.
@@ -147,7 +148,7 @@ data_retention_delete()
147148

148149
On the `SQLExecuteQueryOperator`, a certain set of attributes are passed via `partial` instead of `expand`. These are static values that are the same for each `DELETE` statement, like the connection and task ID.
149150

150-
The full DAG implementation of the data retention policy can be found in our [GitHub repository](https://github.com/crate/crate-airflow-tutorial/blob/main/dags/data_retention_delete_dag.py). To run the workflow, we rely on Astronomer infrastructure with the same setup as shown in the {ref}`getting started <airflow-getting-started>` section.
151+
The full DAG implementation of the data retention policy can be found in our [GitHub repository](https://github.com/crate/cratedb-airflow-tutorial/blob/main/dags/data_retention_delete_dag.py). To run the workflow, we rely on Astronomer infrastructure with the same setup as shown in the {ref}`getting started <airflow-getting-started>` section.
151152

152153
## Summary
153154
This tutorial gives a guide on how to delete data with expired retention policies. The first part shows how to design policies in CrateDB and then, how to use Apache Airflow to automate data deletion. The DAG implementation is fairly simple: the first task performs the extraction of relevant policies, while the second task makes sure that affected partitions are deleted. In the following tutorial, we will focus on another real-world example that can be automated with Apache Airflow and CrateDB.

docs/integrate/airflow/export-s3.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ TABLES = [
4949
```
5050
The DAG itself is specified as a Python file `astro-project/dags`. It loads the above-defined `TABLES` list and iterates over it. For each entry, a corresponding `SQLExecuteQueryOperator` is instantiated, which will perform the actual export during execution. If the `TABLES` list contains more than one element, Airflow will be able to process the corresponding exports in parallel, as there are no dependencies between them.
5151

52-
The resulting DAG code is as follows (see the [GitHub repository](https://github.com/crate/crate-airflow-tutorial) for the complete project):
52+
The resulting DAG code is as follows (see the [GitHub repository](https://github.com/crate/cratedb-airflow-tutorial) for the complete project):
5353
```python
5454
import os
5555
import pendulum
@@ -115,4 +115,4 @@ To find more details about running DAGs, go to `Browse/DAG runs` which opens a n
115115
After a successful DAG execution, the data will be stored on the remote filesystem.
116116

117117
## Summary
118-
This article covered a simple use case: periodic data export to a remote filesystem. In the following articles, we will cover more complex use cases composed of several tasks based on real-world scenarios. If you want to try our examples with Apache Airflow and Astronomer, you are free to check out the code on the public [GitHub repository](https://github.com/crate/crate-airflow-tutorial).
118+
This article covered a simple use case: periodic data export to a remote filesystem. In the following articles, we will cover more complex use cases composed of several tasks based on real-world scenarios. If you want to try our examples with Apache Airflow and Astronomer, you are free to check out the code on the public [GitHub repository](https://github.com/crate/cratedb-airflow-tutorial).

docs/integrate/airflow/getting-started.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ The astronomer project consists of four Docker containers:
8787
- Triggerer (running an event loop for deferrable tasks)
8888

8989
The PostgreSQL server listens on port 5432. The web server listens on port 8080
90-
and is available at <http://localhost:8080/> with `admin`/`admin`.
90+
and is available at `http://localhost:8080/` with `admin`/`admin`.
9191

9292
If these ports are already in use, change them in `.astro/config.yaml`. For
9393
example, set the webserver to 8081 and PostgreSQL to 5435:

docs/integrate/airflow/import-parquet.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ For an alternative Parquet ingestion approach, see {ref}`arrow-import-parquet`.
1414

1515
Before you start, have Airflow and CrateDB running. The SQL shown below also
1616
resides in the setup folder of the
17-
[GitHub repository](https://github.com/crate/crate-airflow-tutorial).
17+
[GitHub repository](https://github.com/crate/cratedb-airflow-tutorial).
1818

1919
Create two tables in CrateDB: a temporary staging table
2020
(`nyc_taxi.load_trips_staging`) and the final table (`nyc_taxi.trips`).
@@ -78,8 +78,8 @@ CREATE TABLE IF NOT EXISTS "nyc_taxi"."trips" (
7878
)
7979
PARTITIONED BY ("pickup_year");
8080
```
81-
To better understand how Airflow works and its applications, you can check other
82-
tutorials related to that topic {ref}`here <airflow-tutorials>`.
81+
To explore more Airflow use cases, see the related tutorials
82+
{ref}`here <airflow-tutorials>`.
8383

8484
With the tools set up and tables created, proceed to the DAG.
8585

@@ -93,17 +93,17 @@ The Airflow DAG used in this tutorial contains 7 tasks:
9393
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet
9494
```
9595
The file path above corresponds to the data from March 2022. So, to retrieve a specific file, the task gets the date and formats it to compose the name of the specific file. Important to mention that the data is released with 2 months of delay, so it had to be taken into consideration.
96-
* **process_parquet:** afterward, use the name to download the file to local storage and convert it from Parquet to CSV using `parquet-tools` (Apache Parquet CLI; see [Apache Arrow]).
96+
* **process_parquet:** Use the formatted name to download the file to local storage and convert it from Parquet to CSV with `parquet-tools` (Apache Parquet CLI; see [Apache Arrow]).
9797

9898
* `curl -o "<LOCAL-PARQUET-FILE-PATH>" "<REMOTE-PARQUET-FILE>"`
9999
* `parquet-tools csv <LOCAL-PARQUET-FILE-PATH> > <CSV-FILE-PATH>`
100100

101101
Both commands run within one `BashOperator`.
102-
* **copy_csv_to_s3:** Once the newly transformed file is available, it gets uploaded to an S3 Bucket to then, be used in the {ref}`crate-reference:sql-copy-from` SQL statement.
103-
* **copy_csv_staging:** copy the CSV file stored in S3 to the staging table described previously.
104-
* **copy_staging_to_trips:** finally, copy the data from the staging table to the trips table, casting the columns that are not in the right type yet.
105-
* **delete_staging:** after it is all processed, clean up the staging table by deleting all rows, and preparing for the next file.
106-
* **delete_local_parquet_csv:** delete the files (Parquet and CSV) from the storage.
102+
* **copy_csv_to_s3:** Upload the transformed file to an S3 bucket and reference it in the {ref}`crate-reference:sql-copy-from` statement.
103+
* **copy_csv_staging:** Copy the CSV file stored in S3 to the staging table described previously.
104+
* **copy_staging_to_trips:** Copy data from the staging table to the trips table, casting columns to their final types.
105+
* **delete_staging:** After processing, delete all rows from the staging table to prepare for the next file.
106+
* **delete_local_parquet_csv:** Delete the local Parquet and CSV files.
107107

108108
The DAG was configured based on the characteristics of the data in use. In this case, there are two crucial pieces of information about the data provider:
109109

@@ -113,19 +113,19 @@ The DAG was configured based on the characteristics of the data in use. In this
113113
The NYC TLC publishes trip data monthly with a two‑month delay. Set the DAG to
114114
run monthly with a start date of March 2009. The first run (logical date March
115115
2009) downloads the file for January 2009 (logical date minus two months),
116-
2010) which is the first available dataset.
116+
which is the first available dataset.
117117

118118
You may find the full code for the DAG described above available in our
119-
[GitHub repository](https://github.com/crate/crate-airflow-tutorial/blob/main/dags/nyc_taxi_dag.py).
119+
[GitHub repository](https://github.com/crate/cratedb-airflow-tutorial/blob/main/dags/nyc_taxi_dag.py).
120120

121121
## Wrap up
122122

123123
The workflow represented in this tutorial is a simple way to import Parquet files
124124
to CrateDB by transforming them into a CSV file. As previously mentioned, there
125125
are other approaches out there, we encourage you to try them out.
126126

127-
If you want to continue to explore how CrateDB can be used with Airflow, you can
128-
check other tutorials related to that topic {ref}`here <airflow-tutorials>`.
127+
To continue exploring CrateDB with Airflow, browse the related tutorials
128+
{ref}`here <airflow-tutorials>`.
129129

130130

131131
[Apache Arrow]: https://github.com/apache/arrow

docs/integrate/airflow/index.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Airflow has a modular architecture and uses a message queue to orchestrate an
2626
arbitrary number of workers. Pipelines are defined in Python, allowing for
2727
dynamic pipeline generation and on-demand, code-driven pipeline invocation.
2828

29-
Pipeline parameterization is using the powerful Jinja templating engine.
29+
Airflow parameterizes pipelines with the Jinja templating engine.
3030
To extend the system, you can define your own operators and extend libraries
3131
to fit the level of abstraction that suits your environment.
3232
:::
@@ -38,19 +38,16 @@ to fit the level of abstraction that suits your environment.
3838
[![Astronomer logo](https://logowik.com/content/uploads/images/astronomer2824.jpg){w=180px}](https://www.astronomer.io/)
3939
```
4040

41-
[Astro][Astronomer] is the best managed service in the market for teams on any step of their data
42-
journey. Spend time where it counts.
41+
[Astro][Astronomer] is a managed Airflow service.
4342

4443
- Astro runs on the cloud of your choice. Astro manages Airflow and gives you all the
4544
features you need to focus on what really matters – your data. All while connecting
4645
securely to any service in your network.
47-
- Create Airflow environments with a click of a button.
46+
- Create Airflow environments quickly.
4847
- Protect production DAGs with easy Airflow upgrades and custom high-availability configs.
4948
- Get visibility into what’s running with analytics views and easy interfaces for logs
5049
and alerts. Across environments.
51-
- Take down tech-debt and learn how to drive Airflow best practices from the experts
52-
behind the project. Get world-class support, fast-tracked bug fixes, and same-day
53-
access to new Airflow versions.
50+
- Adopt Airflow best practices with support and timely upgrades.
5451

5552
```{div} .clearfix
5653
```

0 commit comments

Comments
 (0)