13
13
14
14
from pytimeparse import parse
15
15
16
- from dbt_airflow_factory .builder import DbtAirflowTasksBuilder
17
16
from dbt_airflow_factory .builder_factory import DbtAirflowTasksBuilderFactory
18
17
from dbt_airflow_factory .config_utils import read_config
19
18
from dbt_airflow_factory .notifications .handler import NotificationHandlersFactory
19
+ from dbt_airflow_factory .tasks_builder .builder import DbtAirflowTasksBuilder
20
20
21
21
22
22
class AirflowDagFactory :
@@ -54,16 +54,12 @@ def __init__(
54
54
execution_env_config_file_name : str = "execution_env.yml" ,
55
55
airflow_config_file_name : str = "airflow.yml" ,
56
56
):
57
+ self ._notifications_handlers_builder = NotificationHandlersFactory ()
58
+ self .airflow_config = self ._read_config (dag_path , env , airflow_config_file_name )
57
59
self ._builder = DbtAirflowTasksBuilderFactory (
58
- dag_path ,
59
- env ,
60
- dbt_config_file_name ,
61
- execution_env_config_file_name ,
60
+ dag_path , env , self .airflow_config , dbt_config_file_name , execution_env_config_file_name
62
61
).create ()
63
- self ._notifications_handlers_builder = NotificationHandlersFactory ()
64
62
self .dag_path = dag_path
65
- self .env = env
66
- self .airflow_config_file_name = airflow_config_file_name
67
63
68
64
def create (self ) -> DAG :
69
65
"""
@@ -72,51 +68,45 @@ def create(self) -> DAG:
72
68
:return: Generated DAG.
73
69
:rtype: airflow.models.dag.DAG
74
70
"""
75
- config = self .read_config ()
76
- with DAG (default_args = config ["default_args" ], ** config ["dag" ]) as dag :
77
- self .create_tasks (config )
71
+ with DAG (
72
+ default_args = self .airflow_config ["default_args" ], ** self .airflow_config ["dag" ]
73
+ ) as dag :
74
+ self .create_tasks ()
78
75
return dag
79
76
80
- def create_tasks (self , config : dict ) -> None :
77
+ def create_tasks (self ) -> None :
81
78
"""
82
79
Parse ``manifest.json`` and create tasks based on the data contained there.
83
-
84
- :param config: Dictionary representing ``airflow.yml``.
85
- :type config: dict
86
80
"""
87
- start = self ._create_starting_task (config )
81
+ start = self ._create_starting_task ()
88
82
end = DummyOperator (task_id = "end" )
89
- tasks = self ._builder .parse_manifest_into_tasks (
90
- self ._manifest_file_path (config ),
91
- config .get ("use_task_group" , False ),
92
- config .get ("show_ephemeral_models" , True ),
93
- )
83
+ tasks = self ._builder .parse_manifest_into_tasks (self ._manifest_file_path ())
94
84
for starting_task in tasks .get_starting_tasks ():
95
85
start >> starting_task .get_start_task ()
96
86
for ending_task in tasks .get_ending_tasks ():
97
87
ending_task .get_end_task () >> end
98
88
99
- def _create_starting_task (self , config : dict ) -> BaseOperator :
100
- if config .get ("seed_task" , True ):
89
+ def _create_starting_task (self ) -> BaseOperator :
90
+ if self . airflow_config .get ("seed_task" , True ):
101
91
return self ._builder .create_seed_task ()
102
92
else :
103
93
return DummyOperator (task_id = "start" )
104
94
105
- def _manifest_file_path (self , config : dict ) -> str :
106
- file_dir = config .get ("manifest_dir_path" , self .dag_path )
107
- return os .path .join (file_dir , config .get ("manifest_file_name" , "manifest.json" ))
95
+ def _manifest_file_path (self ) -> str :
96
+ file_dir = self .airflow_config .get ("manifest_dir_path" , self .dag_path )
97
+ return os .path .join (
98
+ file_dir , self .airflow_config .get ("manifest_file_name" , "manifest.json" )
99
+ )
108
100
109
- def read_config (self ) -> dict :
101
+ def _read_config (self , dag_path : str , env : str , airflow_config_file_name : str ) -> dict :
110
102
"""
111
103
Read ``airflow.yml`` from ``config`` directory into a dictionary.
112
104
113
105
:return: Dictionary representing ``airflow.yml``.
114
106
:rtype: dict
115
107
:raises KeyError: No ``default_args`` key in ``airflow.yml``.
116
108
"""
117
- config = read_config (
118
- self .dag_path , self .env , self .airflow_config_file_name , replace_jinja = True
119
- )
109
+ config = read_config (dag_path , env , airflow_config_file_name , replace_jinja = True )
120
110
if "retry_delay" in config ["default_args" ]:
121
111
config ["default_args" ]["retry_delay" ] = parse (config ["default_args" ]["retry_delay" ])
122
112
if "failure_handlers" in config :
0 commit comments