Skip to content

Commit 30559f2

Browse files
github-actions[bot]p-pekalagithub-actionsbartlomiejolmabartlomiej.olma.kitopi
authored
Release 0.29.0 (#65)
* Fix missing init * Dependencies fixes (#55) * Fix missing init * Fix test * Fix precommit Co-authored-by: p-pekala <piotr.pekala@getindata.com> * Moving CC_TEST_REPORTER_ID from secrets to allow contribution (CC_TES… (#59) * Moving CC_TEST_REPORTER_ID from secrets to allow contribution (CC_TEST_REPORTER_ID is just an ID not credential). Co-authored-by: piotrpekala <piotr.pekala@getindata.com> * feat: Allows passing config file to the operator (#57) * feat: Allows passing config file to the operator Co-authored-by: bartlomiej.olma.kitopi <bartlomiej.olma@kitopi.com> * New documentation (#58) * New documentation Co-authored-by: PP <piotr.pekala@getindata.com> * Moving CC_TEST_REPORTER_ID from secrets to allow contribution (CC_TEST_REPORTER_ID is just an ID not credential). * Data 377 (#62) * feat: add config envs if datahub recipe exists * feat: add test to check if env are properly readed * feat: add execution script parameter * feat: add logic behind using execution script * chore: pre-commit * feat: add test for execution script and datahub env * chore: update bash script name * chore: parametrize script path * chore: refactor args builder * chore: refactor tests after changes in args * chore: add defualt value to backward compatible * chore: refactor tests Co-authored-by: Piotr Sierkin <piotr.sierkin@getindata.com> * Data 369 integrate scheduling airbyte from airflow in the same dag as transformations (#64) * feat: [DATA-369] Add ingestion tasks. * Data 412 add save points after each stage of transformations (#61) * feat: [DATA-412] Abstract away task with mapping task from graph to airflow. * FIX #65 - Bump version and CHANGELOG for release 0.29.0 Co-authored-by: p-pekala <piotr.pekala@getindata.com> Co-authored-by: github-actions <github-actions@github.com> Co-authored-by: Piotr Pękala <piotr_pekala@wp.pl> Co-authored-by: bartlomiejolma <b.olma92@gmail.com> Co-authored-by: bartlomiej.olma.kitopi <bartlomiej.olma@kitopi.com> Co-authored-by: Piotr Sierkin <psierkin@gmail.com> Co-authored-by: Piotr Sierkin <piotr.sierkin@getindata.com> Co-authored-by: Paweł Kociński <pawel93kocinski@gmail.com>
1 parent 4407252 commit 30559f2

27 files changed

+7679
-27
lines changed

.isort.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ known_first_party = dbt_airflow_factory
44
default_section = THIRDPARTY
55

66
[settings]
7-
known_third_party = airflow,jinja2,networkx,pytimeparse,setuptools,yaml
7+
known_third_party = airflow,jinja2,networkx,pytest,pytimeparse,setuptools,yaml

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
## [0.29.0] - 2022-09-02
6+
57
## [0.28.0] - 2022-07-19
68

79
## [0.27.0] - 2022-07-01
@@ -122,7 +124,9 @@ This version brings compatibility with `dbt 1.0`.
122124

123125
- Initial implementation of `dbt_airflow_manifest_parser` library.
124126

125-
[Unreleased]: https://github.com/getindata/dbt-airflow-factory/compare/0.28.0...HEAD
127+
[Unreleased]: https://github.com/getindata/dbt-airflow-factory/compare/0.29.0...HEAD
128+
129+
[0.29.0]: https://github.com/getindata/dbt-airflow-factory/compare/0.28.0...0.29.0
126130

127131
[0.28.0]: https://github.com/getindata/dbt-airflow-factory/compare/0.27.0...0.28.0
128132

dbt_airflow_factory/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version = "0.28.0"
1+
version = "0.29.0"

dbt_airflow_factory/airflow_dag_factory.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from airflow import DAG
77
from airflow.models import BaseOperator
88

9+
from dbt_airflow_factory.ingestion import IngestionEngine, IngestionFactory
10+
911
if airflow.__version__.startswith("1."):
1012
from airflow.operators.dummy_operator import DummyOperator
1113
else:
@@ -14,7 +16,7 @@
1416
from pytimeparse import parse
1517

1618
from dbt_airflow_factory.builder_factory import DbtAirflowTasksBuilderFactory
17-
from dbt_airflow_factory.config_utils import read_config
19+
from dbt_airflow_factory.config_utils import read_config, read_env_config
1820
from dbt_airflow_factory.notifications.handler import NotificationHandlersFactory
1921
from dbt_airflow_factory.tasks_builder.builder import DbtAirflowTasksBuilder
2022

@@ -53,6 +55,8 @@ def __init__(
5355
dbt_config_file_name: str = "dbt.yml",
5456
execution_env_config_file_name: str = "execution_env.yml",
5557
airflow_config_file_name: str = "airflow.yml",
58+
airbyte_config_file_name: str = "airbyte.yml",
59+
ingestion_config_file_name: str = "ingestion.yml",
5660
):
5761
self._notifications_handlers_builder = NotificationHandlersFactory()
5862
self.airflow_config = self._read_config(dag_path, env, airflow_config_file_name)
@@ -64,6 +68,18 @@ def __init__(
6468
execution_env_config_file_name,
6569
).create()
6670
self.dag_path = dag_path
71+
airbyte_config = read_env_config(
72+
dag_path=dag_path, env=env, file_name=airbyte_config_file_name
73+
)
74+
self.ingestion_config = read_env_config(
75+
dag_path=dag_path, env=env, file_name=ingestion_config_file_name
76+
)
77+
self.ingestion_tasks_builder_factory = IngestionFactory(
78+
ingestion_config=airbyte_config,
79+
name=IngestionEngine.value_of(
80+
self.ingestion_config.get("engine", IngestionEngine.AIRBYTE.value)
81+
),
82+
)
6783

6884
def create(self) -> DAG:
6985
"""
@@ -82,7 +98,14 @@ def create_tasks(self) -> None:
8298
"""
8399
Parse ``manifest.json`` and create tasks based on the data contained there.
84100
"""
101+
102+
ingestion_enabled = self.ingestion_config.get("enable", False)
103+
85104
start = self._create_starting_task()
105+
if ingestion_enabled and self.ingestion_tasks_builder_factory:
106+
builder = self.ingestion_tasks_builder_factory.create()
107+
ingestion_tasks = builder.build()
108+
ingestion_tasks >> start
86109
end = DummyOperator(task_id="end")
87110
tasks = self._builder.parse_manifest_into_tasks(self._manifest_file_path())
88111
for starting_task in tasks.get_starting_tasks():

dbt_airflow_factory/builder_factory.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
)
1111
from dbt_airflow_factory.operator import DbtRunOperatorBuilder
1212
from dbt_airflow_factory.tasks_builder.builder import DbtAirflowTasksBuilder
13+
from dbt_airflow_factory.tasks_builder.gateway import (
14+
GatewayConfiguration,
15+
TaskGraphConfiguration,
16+
)
1317
from dbt_airflow_factory.tasks_builder.parameters import TasksBuildingParameters
1418

1519

@@ -65,9 +69,16 @@ def create(self) -> DbtAirflowTasksBuilder:
6569
dbt_params = self._create_dbt_config()
6670
execution_env_type = self._read_execution_env_type()
6771
tasks_airflow_config = self._create_tasks_airflow_config()
72+
6873
return DbtAirflowTasksBuilder(
6974
tasks_airflow_config,
7075
self._create_operator_builder(execution_env_type, dbt_params),
76+
gateway_config=TaskGraphConfiguration(
77+
GatewayConfiguration(
78+
separation_schemas=self.airflow_config.get("save_points", []),
79+
gateway_task_name="gateway",
80+
)
81+
),
7182
)
7283

7384
def _create_tasks_airflow_config(self) -> TasksBuildingParameters:

dbt_airflow_factory/ingestion.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from enum import Enum
2+
from typing import List
3+
4+
from airflow.models import BaseOperator
5+
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
6+
7+
8+
class IngestionEngine(Enum):
9+
AIRBYTE = "airbyte"
10+
11+
@classmethod
12+
def value_of(cls, value: str) -> "IngestionEngine":
13+
return IngestionEngine(value)
14+
15+
16+
class IngestionTasksBuilder:
17+
def build(self) -> List[BaseOperator]:
18+
raise NotImplementedError("Should implement build method")
19+
20+
21+
class AirbyteIngestionTasksBuilder(IngestionTasksBuilder):
22+
def __init__(self, config: dict):
23+
self.ingestion_config = config
24+
25+
def build(self) -> List[BaseOperator]:
26+
airflow_tasks = []
27+
tasks = self.ingestion_config["tasks"]
28+
for task in tasks:
29+
airflow_tasks.append(
30+
AirbyteTriggerSyncOperator(
31+
task_id=task["task_id"],
32+
airbyte_conn_id=self.ingestion_config["airbyte_connection_id"],
33+
connection_id=task["connection_id"],
34+
asynchronous=task["asyncrounous"],
35+
api_version=task["api_version"],
36+
wait_seconds=task["wait_seconds"],
37+
timeout=task["timeout"],
38+
)
39+
)
40+
41+
return airflow_tasks
42+
43+
44+
class IngestionFactory:
45+
def __init__(self, ingestion_config: dict, name: IngestionEngine):
46+
self.ingestion_config = ingestion_config
47+
self.name = name
48+
49+
def create(
50+
self,
51+
) -> IngestionTasksBuilder:
52+
if self.name == IngestionEngine.AIRBYTE:
53+
return AirbyteIngestionTasksBuilder(self.ingestion_config)
54+
raise NotImplementedError(f"{self.name} is not supported !")

dbt_airflow_factory/tasks_builder/builder.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import airflow
77
from airflow.models.baseoperator import BaseOperator
8+
from airflow.operators.dummy import DummyOperator
89
from airflow.sensors.external_task_sensor import ExternalTaskSensor
910

1011
from dbt_airflow_factory.tasks_builder.node_type import NodeType
@@ -15,24 +16,31 @@
1516

1617
from dbt_airflow_factory.operator import DbtRunOperatorBuilder, EphemeralOperator
1718
from dbt_airflow_factory.tasks import ModelExecutionTask, ModelExecutionTasks
19+
from dbt_airflow_factory.tasks_builder.gateway import TaskGraphConfiguration
1820
from dbt_airflow_factory.tasks_builder.graph import DbtAirflowGraph
1921

2022

2123
class DbtAirflowTasksBuilder:
2224
"""
2325
Parses ``manifest.json`` into Airflow tasks.
2426
27+
:param airflow_config: DBT node operator.
28+
:type airflow_config: TasksBuildingParameters
2529
:param operator_builder: DBT node operator.
2630
:type operator_builder: DbtRunOperatorBuilder
31+
:param gateway_config: DBT node operator.
32+
:type gateway_config: TaskGraphConfiguration
2733
"""
2834

2935
def __init__(
3036
self,
3137
airflow_config: TasksBuildingParameters,
3238
operator_builder: DbtRunOperatorBuilder,
39+
gateway_config: TaskGraphConfiguration,
3340
):
3441
self.operator_builder = operator_builder
3542
self.airflow_config = airflow_config
43+
self.gateway_config = gateway_config
3644

3745
def parse_manifest_into_tasks(self, manifest_path: str) -> ModelExecutionTasks:
3846
"""
@@ -128,6 +136,8 @@ def _create_task_from_graph_node(
128136
)
129137
elif node["node_type"] == NodeType.SOURCE_SENSOR:
130138
return self._create_dag_sensor(node)
139+
elif node["node_type"] == NodeType.MOCK_GATEWAY:
140+
return self._create_dummy_task(node)
131141
else:
132142
return self._create_task_for_model(
133143
node["select"],
@@ -157,7 +167,8 @@ def _make_dbt_tasks(self, manifest_path: str) -> ModelExecutionTasks:
157167
return tasks_with_context
158168

159169
def _create_tasks_graph(self, manifest: dict) -> DbtAirflowGraph:
160-
dbt_airflow_graph = DbtAirflowGraph()
170+
171+
dbt_airflow_graph = DbtAirflowGraph(self.gateway_config)
161172
dbt_airflow_graph.add_execution_tasks(manifest)
162173
if self.airflow_config.enable_dags_dependencies:
163174
dbt_airflow_graph.add_external_dependencies(manifest)
@@ -183,3 +194,7 @@ def _create_dag_sensor(self, node: Dict[str, Any]) -> ModelExecutionTask:
183194
mode="reschedule",
184195
)
185196
)
197+
198+
@staticmethod
199+
def _create_dummy_task(node: Dict[str, Any]) -> ModelExecutionTask:
200+
return ModelExecutionTask(DummyOperator(task_id=node["select"]))
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from dataclasses import dataclass
2+
from typing import List
3+
4+
from dbt_airflow_factory.tasks_builder.utils import is_model_run_task
5+
6+
7+
@dataclass
8+
class NodeProperties:
9+
node_name: str
10+
schema_name: str
11+
12+
13+
@dataclass
14+
class GatewayConfiguration:
15+
separation_schemas: List[str]
16+
gateway_task_name: str
17+
18+
19+
@dataclass
20+
class TaskGraphConfiguration:
21+
gateway: GatewayConfiguration
22+
23+
24+
@dataclass
25+
class SeparationLayer:
26+
left: str
27+
right: str
28+
29+
30+
def is_gateway_valid_dependency(
31+
separation_layer: SeparationLayer, dependency_node_properties: NodeProperties, node_schema: str
32+
) -> bool:
33+
if is_model_run_task(dependency_node_properties.node_name):
34+
dep_schema = dependency_node_properties.schema_name
35+
if dep_schema == separation_layer.left and node_schema == separation_layer.right:
36+
return False
37+
return True
38+
return True
39+
40+
41+
def get_gateway_dependencies(manifest: dict, separation_layer: SeparationLayer) -> List:
42+
downstream_dependencies = _get_downstream_dependencies(
43+
manifest=manifest, separation_layer_right=separation_layer.right
44+
)
45+
46+
upstream_dependencies_connected_to_downstream = (
47+
_get_upstream_dependencies_connected_to_downstream(
48+
manifest=manifest,
49+
separation_layer_left=separation_layer.left,
50+
downstream_dependencies=downstream_dependencies,
51+
)
52+
)
53+
dependencies = [
54+
node_name
55+
for node_name, values in manifest["nodes"].items()
56+
if values["schema"] == separation_layer.left
57+
and node_name in upstream_dependencies_connected_to_downstream
58+
]
59+
return dependencies
60+
61+
62+
def _get_downstream_dependencies(manifest: dict, separation_layer_right: str) -> List:
63+
downstream_dependencies = [
64+
node_name
65+
for node_name, values in manifest["nodes"].items()
66+
if values["schema"] == separation_layer_right
67+
]
68+
return downstream_dependencies
69+
70+
71+
def _get_upstream_dependencies_connected_to_downstream(
72+
manifest: dict, separation_layer_left: str, downstream_dependencies: List[str]
73+
) -> List:
74+
upstream_deps_connected_to_downstream: List[str] = []
75+
76+
for downstream_node in downstream_dependencies:
77+
upstream_deps = manifest["nodes"][downstream_node]["depends_on"]["nodes"]
78+
for dep in upstream_deps:
79+
_add_upstream_dep_based_on_downstream(
80+
dep=dep,
81+
manifest=manifest,
82+
separation_layer_left=separation_layer_left,
83+
upstream_dependencies_connected_to_downstream=upstream_deps_connected_to_downstream,
84+
)
85+
return upstream_deps_connected_to_downstream
86+
87+
88+
def _add_upstream_dep_based_on_downstream(
89+
dep: str,
90+
manifest: dict,
91+
separation_layer_left: str,
92+
upstream_dependencies_connected_to_downstream: List[str],
93+
) -> None:
94+
if is_model_run_task(dep) and manifest["nodes"][dep]["schema"] == separation_layer_left:
95+
upstream_dependencies_connected_to_downstream.append(dep)
96+
97+
98+
def add_gateway_to_dependencies(
99+
gateway_name: str, filtered_dependencies: List[str], filtered_records: List[str]
100+
) -> None:
101+
if len(filtered_dependencies) < len(filtered_records):
102+
filtered_dependencies.append(gateway_name)
103+
104+
105+
def create_gateway_name(separation_layer: SeparationLayer, gateway_task_name: str) -> str:
106+
return f"{separation_layer.left}_{separation_layer.right}_{gateway_task_name}"
107+
108+
109+
def should_gateway_be_added(node_schema: str, separation_schemas: List[str]) -> bool:
110+
valid_schemas_input_length = len(separation_schemas) >= 2
111+
schema_is_in_given_schemas = node_schema in separation_schemas
112+
return valid_schemas_input_length and schema_is_in_given_schemas

0 commit comments

Comments
 (0)