From 5a3bcac9cb4a379ac6d995dceb2a9c75805576f6 Mon Sep 17 00:00:00 2001 From: Jacob Ferriero Date: Wed, 8 Jul 2020 19:43:52 -0700 Subject: [PATCH] add 22 DAGs --- composer/config/running_dags.txt | 22 ++ composer/dags/example_dags/__init__.py | 18 ++ composer/dags/example_dags/bash_operator.py | 72 +++++ composer/dags/example_dags/branch_operator.py | 68 +++++ .../branch_python_dop_operator_3.py | 63 +++++ composer/dags/example_dags/complex.py | 245 ++++++++++++++++++ composer/dags/example_dags/http_operator.py | 103 ++++++++ .../kubernetes_executor_config.py | 100 +++++++ composer/dags/example_dags/latest_only.py | 40 +++ .../example_dags/latest_only_with_trigger.py | 50 ++++ .../dags/example_dags/nested_branch_dag.py | 54 ++++ .../passing_params_via_test_command.py | 70 +++++ composer/dags/example_dags/pig_operator.py | 42 +++ composer/dags/example_dags/python_operator.py | 75 ++++++ .../example_dags/short_circuit_operator.py | 50 ++++ composer/dags/example_dags/skip_dag.py | 58 +++++ composer/dags/example_dags/subdag_operator.py | 68 +++++ .../dags/example_dags/subdags/__init__.py | 18 ++ composer/dags/example_dags/subdags/subdag.py | 40 +++ .../example_dags/trigger_controller_dag.py | 74 ++++++ .../dags/example_dags/trigger_target_dag.py | 73 ++++++ composer/dags/example_dags/xcom.py | 84 ++++++ 22 files changed, 1487 insertions(+) create mode 100644 composer/dags/example_dags/__init__.py create mode 100644 composer/dags/example_dags/bash_operator.py create mode 100644 composer/dags/example_dags/branch_operator.py create mode 100644 composer/dags/example_dags/branch_python_dop_operator_3.py create mode 100644 composer/dags/example_dags/complex.py create mode 100644 composer/dags/example_dags/http_operator.py create mode 100644 composer/dags/example_dags/kubernetes_executor_config.py create mode 100644 composer/dags/example_dags/latest_only.py create mode 100644 composer/dags/example_dags/latest_only_with_trigger.py create mode 100644 composer/dags/example_dags/nested_branch_dag.py create mode 100644 composer/dags/example_dags/passing_params_via_test_command.py create mode 100644 composer/dags/example_dags/pig_operator.py create mode 100644 composer/dags/example_dags/python_operator.py create mode 100644 composer/dags/example_dags/short_circuit_operator.py create mode 100644 composer/dags/example_dags/skip_dag.py create mode 100644 composer/dags/example_dags/subdag_operator.py create mode 100644 composer/dags/example_dags/subdags/__init__.py create mode 100644 composer/dags/example_dags/subdags/subdag.py create mode 100644 composer/dags/example_dags/trigger_controller_dag.py create mode 100644 composer/dags/example_dags/trigger_target_dag.py create mode 100644 composer/dags/example_dags/xcom.py diff --git a/composer/config/running_dags.txt b/composer/config/running_dags.txt index e69de29..0493868 100644 --- a/composer/config/running_dags.txt +++ b/composer/config/running_dags.txt @@ -0,0 +1,22 @@ +wordcount_dag +tutorial +ephemeral_dataproc_spark_dag +bash_operator +branch_operator +branch_python_dop_operator_3 +complex +http_operator +kubernetes_executor_config +latest_only +latest_only_with_trigger +nested_branch_dag +passing_params_via_test_command +pig_operator +python_operator +short_circuit_operator +skip_dag +subdag_operator +trigger_controller_dag +trigger_target_dag +tutorial +xcom diff --git a/composer/dags/example_dags/__init__.py b/composer/dags/example_dags/__init__.py new file mode 100644 index 0000000..114d189 --- /dev/null +++ b/composer/dags/example_dags/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/composer/dags/example_dags/bash_operator.py b/composer/dags/example_dags/bash_operator.py new file mode 100644 index 0000000..937937e --- /dev/null +++ b/composer/dags/example_dags/bash_operator.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from builtins import range +from datetime import timedelta + +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), +} + +dag = DAG(dag_id='bash_operator', + default_args=args, + schedule_interval='0 0 * * *', + dagrun_timeout=timedelta(minutes=60), + ) + +run_this_last = DummyOperator( + task_id='run_this_last', + dag=dag, +) + +# [START howto_operator_bash] +run_this = BashOperator( + task_id='run_after_loop', + bash_command='echo 1', + dag=dag, +) +# [END howto_operator_bash] + +run_this >> run_this_last + +for i in range(3): + task = BashOperator( + task_id='runme_' + str(i), + bash_command='echo "{{ task_instance_key_str }}" && sleep 1', + dag=dag, + ) + task >> run_this + +# [START howto_operator_bash_template] +also_run_this = BashOperator( + task_id='also_run_this', + bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', + dag=dag, +) +# [END howto_operator_bash_template] +also_run_this >> run_this_last + +if __name__ == "__main__": + dag.cli() diff --git a/composer/dags/example_dags/branch_operator.py b/composer/dags/example_dags/branch_operator.py new file mode 100644 index 0000000..7b98116 --- /dev/null +++ b/composer/dags/example_dags/branch_operator.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import random + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), +} + +dag = DAG(dag_id='branch_operator', + default_args=args, + schedule_interval="@daily", + ) + +run_this_first = DummyOperator( + task_id='run_this_first', + dag=dag, +) + +options = ['branch_a', 'branch_b', 'branch_c', 'branch_d'] + +branching = BranchPythonOperator( + task_id='branching', + python_callable=lambda: random.choice(options), + dag=dag, +) +run_this_first >> branching + +join = DummyOperator( + task_id='join', + trigger_rule='all_success', + dag=dag, +) + +for option in options: + t = DummyOperator( + task_id=option, + dag=dag, + ) + + dummy_follow = DummyOperator( + task_id='follow_' + option, + dag=dag, + ) + + branching >> t >> dummy_follow >> join diff --git a/composer/dags/example_dags/branch_python_dop_operator_3.py b/composer/dags/example_dags/branch_python_dop_operator_3.py new file mode 100644 index 0000000..f779dc1 --- /dev/null +++ b/composer/dags/example_dags/branch_python_dop_operator_3.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG demonstrating the usage of BranchPythonOperator with +depends_on_past=True, where tasks may be run +or skipped on alternating runs. +""" + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), + 'depends_on_past': True, +} + +# BranchPython operator that depends on past +# and where tasks may run or be skipped on +# alternating runs +dag = DAG(dag_id='branch_python_dop_operator_3', + schedule_interval='*/1 * * * *', + default_args=args, + ) + + +def should_run(**kwargs): + print('------------- exec dttm = {} and minute = {}'.format( + kwargs['execution_date'], kwargs['execution_date'].minute)) + if kwargs['execution_date'].minute % 2 == 0: + return "dummy_task_1" + else: + return "dummy_task_2" + + +cond = BranchPythonOperator( + task_id='condition', + provide_context=True, + python_callable=should_run, + dag=dag, +) + +dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag) +dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag) +cond >> [dummy_task_1, dummy_task_2] diff --git a/composer/dags/example_dags/complex.py b/composer/dags/example_dags/complex.py new file mode 100644 index 0000000..41c6ed3 --- /dev/null +++ b/composer/dags/example_dags/complex.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that shows the complex DAG structure. +""" +import sys + +from airflow import models +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +from airflow.utils.helpers import chain + +default_args = { + "start_date": days_ago(1), + "owner": "jferriero@google.com", +} + +with models.DAG("complex", + default_args=default_args, + schedule_interval=None) as dag: + + # Create + create_entry_group = BashOperator(task_id="create_entry_group", + bash_command="echo create_entry_group") + + create_entry_group_result = BashOperator( + task_id="create_entry_group_result", + bash_command="echo create_entry_group_result") + + create_entry_group_result2 = BashOperator( + task_id="create_entry_group_result2", + bash_command="echo create_entry_group_result2") + + create_entry_gcs = BashOperator(task_id="create_entry_gcs", + bash_command="echo create_entry_gcs") + + create_entry_gcs_result = BashOperator( + task_id="create_entry_gcs_result", + bash_command="echo create_entry_gcs_result") + + create_entry_gcs_result2 = BashOperator( + task_id="create_entry_gcs_result2", + bash_command="echo create_entry_gcs_result2") + + create_tag = BashOperator(task_id="create_tag", + bash_command="echo create_tag") + + create_tag_result = BashOperator(task_id="create_tag_result", + bash_command="echo create_tag_result") + + create_tag_result2 = BashOperator(task_id="create_tag_result2", + bash_command="echo create_tag_result2") + + create_tag_template = BashOperator(task_id="create_tag_template", + bash_command="echo create_tag_template") + + create_tag_template_result = BashOperator( + task_id="create_tag_template_result", + bash_command="echo create_tag_template_result") + + create_tag_template_result2 = BashOperator( + task_id="create_tag_template_result2", + bash_command="echo create_tag_template_result2") + + create_tag_template_field = BashOperator( + task_id="create_tag_template_field", + bash_command="echo create_tag_template_field") + + create_tag_template_field_result = BashOperator( + task_id="create_tag_template_field_result", + bash_command="echo create_tag_template_field_result") + + create_tag_template_field_result2 = BashOperator( + task_id="create_tag_template_field_result", + bash_command="echo create_tag_template_field_result") + + # Delete + delete_entry = BashOperator(task_id="delete_entry", + bash_command="echo delete_entry") + create_entry_gcs >> delete_entry + + delete_entry_group = BashOperator(task_id="delete_entry_group", + bash_command="echo delete_entry_group") + create_entry_group >> delete_entry_group + + delete_tag = BashOperator(task_id="delete_tag", + bash_command="echo delete_tag") + create_tag >> delete_tag + + delete_tag_template_field = BashOperator( + task_id="delete_tag_template_field", + bash_command="echo delete_tag_template_field") + + delete_tag_template = BashOperator(task_id="delete_tag_template", + bash_command="echo delete_tag_template") + + # Get + get_entry_group = BashOperator(task_id="get_entry_group", + bash_command="echo get_entry_group") + + get_entry_group_result = BashOperator( + task_id="get_entry_group_result", + bash_command="echo get_entry_group_result") + + get_entry = BashOperator(task_id="get_entry", bash_command="echo get_entry") + + get_entry_result = BashOperator(task_id="get_entry_result", + bash_command="echo get_entry_result") + + get_tag_template = BashOperator(task_id="get_tag_template", + bash_command="echo get_tag_template") + + get_tag_template_result = BashOperator( + task_id="get_tag_template_result", + bash_command="echo get_tag_template_result") + + # List + list_tags = BashOperator(task_id="list_tags", bash_command="echo list_tags") + + list_tags_result = BashOperator(task_id="list_tags_result", + bash_command="echo list_tags_result") + + # Lookup + lookup_entry = BashOperator(task_id="lookup_entry", + bash_command="echo lookup_entry") + + lookup_entry_result = BashOperator(task_id="lookup_entry_result", + bash_command="echo lookup_entry_result") + + # Rename + rename_tag_template_field = BashOperator( + task_id="rename_tag_template_field", + bash_command="echo rename_tag_template_field") + + # Search + search_catalog = PythonOperator( + task_id="search_catalog", + python_callable=lambda _: sys.stdout.write("search_catalog\n")) + + search_catalog_result = BashOperator( + task_id="search_catalog_result", + bash_command="echo search_catalog_result") + + # Update + update_entry = BashOperator(task_id="update_entry", + bash_command="echo update_entry") + + update_tag = BashOperator(task_id="update_tag", + bash_command="echo update_tag") + + update_tag_template = BashOperator(task_id="update_tag_template", + bash_command="echo update_tag_template") + + update_tag_template_field = BashOperator( + task_id="update_tag_template_field", + bash_command="echo update_tag_template_field") + + # Create + create_tasks = [ + create_entry_group, + create_entry_gcs, + create_tag_template, + create_tag_template_field, + create_tag, + ] + chain(*create_tasks) + + create_entry_group >> delete_entry_group + create_entry_group >> create_entry_group_result + create_entry_group >> create_entry_group_result2 + + create_entry_gcs >> delete_entry + create_entry_gcs >> create_entry_gcs_result + create_entry_gcs >> create_entry_gcs_result2 + + create_tag_template >> delete_tag_template_field + create_tag_template >> create_tag_template_result + create_tag_template >> create_tag_template_result2 + + create_tag_template_field >> delete_tag_template_field + create_tag_template_field >> create_tag_template_field_result + create_tag_template_field >> create_tag_template_field_result2 + + create_tag >> delete_tag + create_tag >> create_tag_result + create_tag >> create_tag_result2 + + # Delete + delete_tasks = [ + delete_tag, + delete_tag_template_field, + delete_tag_template, + delete_entry_group, + delete_entry, + ] + chain(*delete_tasks) + + # Get + create_tag_template >> get_tag_template >> delete_tag_template + get_tag_template >> get_tag_template_result + + create_entry_gcs >> get_entry >> delete_entry + get_entry >> get_entry_result + + create_entry_group >> get_entry_group >> delete_entry_group + get_entry_group >> get_entry_group_result + + # List + create_tag >> list_tags >> delete_tag + list_tags >> list_tags_result + + # Lookup + create_entry_gcs >> lookup_entry >> delete_entry + lookup_entry >> lookup_entry_result + + # Rename + create_tag_template_field >> rename_tag_template_field >> delete_tag_template_field + + # Search + search_catalog.set_upstream(create_tasks) + search_catalog.set_downstream(delete_tasks) + search_catalog >> search_catalog_result + + # Update + create_entry_gcs >> update_entry >> delete_entry + create_tag >> update_tag >> delete_tag + create_tag_template >> update_tag_template >> delete_tag_template + create_tag_template_field >> update_tag_template_field >> rename_tag_template_field diff --git a/composer/dags/example_dags/http_operator.py b/composer/dags/example_dags/http_operator.py new file mode 100644 index 0000000..7907a7d --- /dev/null +++ b/composer/dags/example_dags/http_operator.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +### Example HTTP operator and sensor +""" +import json +from datetime import timedelta + +from airflow import DAG +from airflow.operators.http_operator import SimpleHttpOperator +from airflow.sensors.http_sensor import HttpSensor +from airflow.utils.dates import days_ago + +default_args = { + 'owner': 'jferriero@google.com', + 'depends_on_past': False, + 'start_date': days_ago(2), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG('http_operator', default_args=default_args, ) + +dag.doc_md = __doc__ + +# t1, t2 and t3 are examples of tasks created by instantiating operators +t1 = SimpleHttpOperator( + task_id='post_op', + endpoint='post', + data=json.dumps({"priority": 5}), + headers={"Content-Type": "application/json"}, + response_check=lambda response: response.json()['json']['priority'] == 5, + dag=dag, +) + +t5 = SimpleHttpOperator( + task_id='post_op_formenc', + endpoint='post', + data="name=Joe", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + dag=dag, +) + +t2 = SimpleHttpOperator( + task_id='get_op', + method='GET', + endpoint='get', + data={ + "param1": "value1", + "param2": "value2" + }, + headers={}, + dag=dag, +) + +t3 = SimpleHttpOperator( + task_id='put_op', + method='PUT', + endpoint='put', + data=json.dumps({"priority": 5}), + headers={"Content-Type": "application/json"}, + dag=dag, +) + +t4 = SimpleHttpOperator( + task_id='del_op', + method='DELETE', + endpoint='delete', + data="some=data", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + dag=dag, +) + +sensor = HttpSensor( + task_id='http_sensor_check', + http_conn_id='http_default', + endpoint='', + request_params={}, + response_check=lambda response: "httpbin" in response.text, + poke_interval=5, + dag=dag, +) + +sensor >> t1 >> t2 >> t3 >> t4 >> t5 diff --git a/composer/dags/example_dags/kubernetes_executor_config.py b/composer/dags/example_dags/kubernetes_executor_config.py new file mode 100644 index 0000000..ae6464b --- /dev/null +++ b/composer/dags/example_dags/kubernetes_executor_config.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example dag for using a Kubernetes Executor Configuration. +""" +from __future__ import print_function + +import os + +from airflow.contrib.example_dags.libs.helper import print_stuff +from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +default_args = {'owner': 'jferriero@google.com', 'start_date': days_ago(2)} + +with DAG(dag_id='kubernetes_executor_config', + default_args=default_args, + schedule_interval=None) as dag: + + def test_volume_mount(): + """ + Tests whether the volume has been mounted. + """ + with open('/foo/volume_mount_test.txt', 'w') as foo: + foo.write('Hello') + + return_code = os.system("cat /foo/volume_mount_test.txt") + assert return_code == 0 + + # You can use annotations on your kubernetes pods! + start_task = PythonOperator(task_id="start_task", + python_callable=print_stuff, + executor_config={ + "KubernetesExecutor": { + "annotations": { + "test": "annotation" + } + } + }) + + # You can mount volume or secret to the worker pod + second_task = PythonOperator( + task_id="four_task", + python_callable=test_volume_mount, + executor_config={ + "KubernetesExecutor": { + "volumes": [{ + "name": "example-kubernetes-test-volume", + "hostPath": { + "path": "/tmp/" + }, + }, ], + "volume_mounts": [{ + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, ] + } + }) + + # Test that we can add labels to pods + third_task = PythonOperator(task_id="non_root_task", + python_callable=print_stuff, + executor_config={ + "KubernetesExecutor": { + "labels": { + "release": "stable" + } + } + }) + + other_ns_task = PythonOperator(task_id="other_namespace_task", + python_callable=print_stuff, + executor_config={ + "KubernetesExecutor": { + "namespace": "test-namespace", + "labels": { + "release": "stable" + } + } + }) + + start_task >> second_task >> third_task + start_task >> other_ns_task diff --git a/composer/dags/example_dags/latest_only.py b/composer/dags/example_dags/latest_only.py new file mode 100644 index 0000000..fd96c50 --- /dev/null +++ b/composer/dags/example_dags/latest_only.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example of the LatestOnlyOperator +""" +import datetime as dt + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.latest_only_operator import LatestOnlyOperator +from airflow.utils.dates import days_ago + +args = {'owner': 'jferriero@google.com'} + +dag = DAG(dag_id='latest_only', + schedule_interval=dt.timedelta(hours=4), + start_date=days_ago(2), + default_args=args, + ) + +latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) +task1 = DummyOperator(task_id='task1', dag=dag) + +latest_only >> task1 diff --git a/composer/dags/example_dags/latest_only_with_trigger.py b/composer/dags/example_dags/latest_only_with_trigger.py new file mode 100644 index 0000000..d4aac92 --- /dev/null +++ b/composer/dags/example_dags/latest_only_with_trigger.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example LatestOnlyOperator and TriggerRule interactions +""" + +# [START example] +import datetime as dt + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.latest_only_operator import LatestOnlyOperator +from airflow.utils.dates import days_ago +from airflow.utils.trigger_rule import TriggerRule + +args = {'owner': 'jferriero@google.com'} + +dag = DAG(dag_id='latest_only_with_trigger', + schedule_interval=dt.timedelta(hours=4), + start_date=days_ago(2), + default_args=args, + ) + +latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) +task1 = DummyOperator(task_id='task1', dag=dag) +task2 = DummyOperator(task_id='task2', dag=dag) +task3 = DummyOperator(task_id='task3', dag=dag) +task4 = DummyOperator(task_id='task4', + dag=dag, + trigger_rule=TriggerRule.ALL_DONE) + +latest_only >> task1 >> [task3, task4] +task2 >> [task3, task4] +# [END example] diff --git a/composer/dags/example_dags/nested_branch_dag.py b/composer/dags/example_dags/nested_branch_dag.py new file mode 100644 index 0000000..350227d --- /dev/null +++ b/composer/dags/example_dags/nested_branch_dag.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG demonstrating a workflow with nested branching. The join tasks are +created with +``none_failed_or_skipped`` trigger rule such that they are skipped whenever +their corresponding +``BranchPythonOperator`` are skipped. +""" + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator +from airflow.utils.dates import days_ago + +args = {'owner': 'jferriero@google.com'} + +with DAG(dag_id="nested_branch_dag", + start_date=days_ago(2), + schedule_interval="@daily", + default_args=args, + ) as dag: + branch_1 = BranchPythonOperator(task_id="branch_1", + python_callable=lambda: "true_1") + join_1 = DummyOperator(task_id="join_1", + trigger_rule="all_success") + true_1 = DummyOperator(task_id="true_1") + false_1 = DummyOperator(task_id="false_1") + branch_2 = BranchPythonOperator(task_id="branch_2", + python_callable=lambda: "true_2") + join_2 = DummyOperator(task_id="join_2", + trigger_rule="all_success") + true_2 = DummyOperator(task_id="true_2") + false_2 = DummyOperator(task_id="false_2") + false_3 = DummyOperator(task_id="false_3") + + branch_1 >> true_1 >> join_1 + branch_1 >> false_1 >> branch_2 >> \ + [true_2, false_2] >> join_2 >> false_3 >> join_1 diff --git a/composer/dags/example_dags/passing_params_via_test_command.py b/composer/dags/example_dags/passing_params_via_test_command.py new file mode 100644 index 0000000..1bf4208 --- /dev/null +++ b/composer/dags/example_dags/passing_params_via_test_command.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import timedelta + +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +dag = DAG("passing_params_via_test_command", + default_args={ + "owner": "jferriero@google.com", + "start_date": days_ago(1), + }, + schedule_interval='*/1 * * * *', + dagrun_timeout=timedelta(minutes=4), + ) + + +def my_py_command(ds, **kwargs): + # Print out the "foo" param passed in via + # `airflow test example_passing_params_via_test_command run_this + # -tp '{"foo":"bar"}'` + if kwargs["test_mode"]: + print(" 'foo' was passed in via test={} command : kwargs[params][foo] \ + = {}".format(kwargs["test_mode"], kwargs["params"]["foo"])) + # Print out the value of "miff", passed in below via the Python Operator + print(" 'miff' was passed in via task params = {}".format( + kwargs["params"]["miff"])) + return 1 + + +my_templated_command = """ + echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} " + echo " 'miff was passed in via BashOperator with value {{ params.miff }} " +""" + +run_this = PythonOperator( + task_id='run_this', + provide_context=True, + python_callable=my_py_command, + params={"miff": "agg"}, + dag=dag, +) + +also_run_this = BashOperator( + task_id='also_run_this', + bash_command=my_templated_command, + params={"miff": "agg"}, + dag=dag, +) + +run_this >> also_run_this diff --git a/composer/dags/example_dags/pig_operator.py b/composer/dags/example_dags/pig_operator.py new file mode 100644 index 0000000..f4172f5 --- /dev/null +++ b/composer/dags/example_dags/pig_operator.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAG demonstrating the usage of the PigOperator.""" + +from airflow.models import DAG +from airflow.operators.pig_operator import PigOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), +} + +dag = DAG(dag_id='pig_operator', + default_args=args, + schedule_interval=None, + ) + +run_this = PigOperator( + task_id="run_example_pig_script", + pig="ls /;", + pig_opts="-x local", + dag=dag, +) + +run_this diff --git a/composer/dags/example_dags/python_operator.py b/composer/dags/example_dags/python_operator.py new file mode 100644 index 0000000..9a9fae4 --- /dev/null +++ b/composer/dags/example_dags/python_operator.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import time +from builtins import range +from pprint import pprint + +from airflow.utils.dates import days_ago + +from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), +} + +dag = DAG(dag_id='python_operator', + default_args=args, + schedule_interval=None, + ) + + +# [START howto_operator_python] +def print_context(ds, **kwargs): + pprint(kwargs) + print(ds) + return 'Whatever you return gets printed in the logs' + + +run_this = PythonOperator( + task_id='print_the_context', + provide_context=True, + python_callable=print_context, + dag=dag, +) + +# [END howto_operator_python] + + +# [START howto_operator_python_kwargs] +def my_sleeping_function(random_base): + """This is a function that will run within the DAG execution""" + time.sleep(random_base) + + +# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively +for i in range(5): + task = PythonOperator( + task_id='sleep_for_' + str(i), + python_callable=my_sleeping_function, + op_kwargs={'random_base': float(i) / 10}, + dag=dag, + ) + + run_this >> task +# [END howto_operator_python_kwargs] diff --git a/composer/dags/example_dags/short_circuit_operator.py b/composer/dags/example_dags/short_circuit_operator.py new file mode 100644 index 0000000..57a179d --- /dev/null +++ b/composer/dags/example_dags/short_circuit_operator.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import airflow.utils.helpers +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import ShortCircuitOperator + +args = { + 'owner': 'jferriero@google.com', + 'start_date': airflow.utils.dates.days_ago(2), +} + +dag = DAG(dag_id='short_circuit_operator', + default_args=args, + ) + +cond_true = ShortCircuitOperator( + task_id='condition_is_True', + python_callable=lambda: True, + dag=dag, +) + +cond_false = ShortCircuitOperator( + task_id='condition_is_False', + python_callable=lambda: False, + dag=dag, +) + +ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]] +ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]] + +airflow.utils.helpers.chain(cond_true, *ds_true) +airflow.utils.helpers.chain(cond_false, *ds_false) diff --git a/composer/dags/example_dags/skip_dag.py b/composer/dags/example_dags/skip_dag.py new file mode 100644 index 0000000..710495b --- /dev/null +++ b/composer/dags/example_dags/skip_dag.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAG demonstrating the DummyOperator and a custom DummySkipOperator +which skips by default.""" + +from airflow.exceptions import AirflowSkipException +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), +} + + +# Create some placeholder operators +class DummySkipOperator(DummyOperator): + ui_color = '#e8b7e4' + + def execute(self, context): + raise AirflowSkipException + + +def create_test_pipeline(suffix, trigger_rule, dag): + skip_operator = DummySkipOperator(task_id='skip_operator_{}'.format(suffix), + dag=dag) + always_true = DummyOperator(task_id='always_true_{}'.format(suffix), + dag=dag) + join = DummyOperator(task_id=trigger_rule, + dag=dag, + trigger_rule=trigger_rule) + final = DummyOperator(task_id='final_{}'.format(suffix), dag=dag) + + skip_operator >> join + always_true >> join + join >> final + + +dag = DAG(dag_id='skip_dag', default_args=args, ) +create_test_pipeline('1', 'all_success', dag) +create_test_pipeline('2', 'one_success', dag) diff --git a/composer/dags/example_dags/subdag_operator.py b/composer/dags/example_dags/subdag_operator.py new file mode 100644 index 0000000..6d32e79 --- /dev/null +++ b/composer/dags/example_dags/subdag_operator.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAG demonstrating the usage of the SubDagOperator.""" + +# [START example_subdag_operator] +from airflow.example_dags.subdags.subdag import subdag +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.subdag_operator import SubDagOperator +from airflow.utils.dates import days_ago + +DAG_NAME = 'subdag_operator' + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), +} + +dag = DAG(dag_id=DAG_NAME, + default_args=args, + schedule_interval="@once", + ) + +start = DummyOperator( + task_id='start', + dag=dag, +) + +section_1 = SubDagOperator( + task_id='section-1', + subdag=subdag(DAG_NAME, 'section-1', args), + dag=dag, +) + +some_other_task = DummyOperator( + task_id='some-other-task', + dag=dag, +) + +section_2 = SubDagOperator( + task_id='section-2', + subdag=subdag(DAG_NAME, 'section-2', args), + dag=dag, +) + +end = DummyOperator( + task_id='end', + dag=dag, +) + +start >> section_1 >> some_other_task >> section_2 >> end +# [END example_subdag_operator] diff --git a/composer/dags/example_dags/subdags/__init__.py b/composer/dags/example_dags/subdags/__init__.py new file mode 100644 index 0000000..114d189 --- /dev/null +++ b/composer/dags/example_dags/subdags/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/composer/dags/example_dags/subdags/subdag.py b/composer/dags/example_dags/subdags/subdag.py new file mode 100644 index 0000000..ff7fee5 --- /dev/null +++ b/composer/dags/example_dags/subdags/subdag.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# [START subdag] +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator + + +def subdag(parent_dag_name, child_dag_name, args): + dag_subdag = DAG( + dag_id='%s.%s' % (parent_dag_name, child_dag_name), + default_args=args, + schedule_interval="@daily", + ) + + for i in range(5): + DummyOperator( + task_id='%s-task-%s' % (child_dag_name, i + 1), + default_args=args, + dag=dag_subdag, + ) + + return dag_subdag +# [END subdag] diff --git a/composer/dags/example_dags/trigger_controller_dag.py b/composer/dags/example_dags/trigger_controller_dag.py new file mode 100644 index 0000000..c2637b5 --- /dev/null +++ b/composer/dags/example_dags/trigger_controller_dag.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This example illustrates the use of the TriggerDagRunOperator. There are 2 +entities at work in this scenario: +1. The Controller DAG - the DAG that conditionally executes the trigger +2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py) + +This example illustrates the following features : +1. A TriggerDagRunOperator that takes: + a. A python callable that decides whether or not to trigger the Target DAG + b. An optional params dict passed to the python callable to help in + evaluating whether or not to trigger the Target DAG + c. The id (name) of the Target DAG + d. The python callable can add contextual info to the DagRun created by + way of adding a Pickleable payload (e.g. dictionary of primitives). This + state is then made available to the TargetDag +2. A Target DAG : c.f. example_trigger_target_dag.py +""" + +import pprint + +from airflow import DAG +from airflow.operators.dagrun_operator import TriggerDagRunOperator +from airflow.utils.dates import days_ago + +pp = pprint.PrettyPrinter(indent=4) + + +def conditionally_trigger(context, dag_run_obj): + """This function decides whether or not to Trigger the remote DAG""" + c_p = context['params']['condition_param'] + print("Controller DAG : conditionally_trigger = {}".format(c_p)) + if context['params']['condition_param']: + dag_run_obj.payload = {'message': context['params']['message']} + pp.pprint(dag_run_obj.payload) + return dag_run_obj + + +# Define the DAG +dag = DAG(dag_id="trigger_controller_dag", + default_args={ + "owner": "jferriero@google.com", + "start_date": days_ago(2) + }, + schedule_interval="@once", + ) + +# Define the single task in this controller example DAG +trigger = TriggerDagRunOperator( + task_id='test_trigger_dagrun', + trigger_dag_id="example_trigger_target_dag", + python_callable=conditionally_trigger, + params={ + 'condition_param': True, + 'message': 'Hello World' + }, + dag=dag, +) diff --git a/composer/dags/example_dags/trigger_target_dag.py b/composer/dags/example_dags/trigger_target_dag.py new file mode 100644 index 0000000..3a84d99 --- /dev/null +++ b/composer/dags/example_dags/trigger_target_dag.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pprint + +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +pp = pprint.PrettyPrinter(indent=4) + +# This example illustrates the use of the TriggerDagRunOperator. There are 2 +# entities at work in this scenario: +# 1. The Controller DAG - the DAG that conditionally executes the trigger +# (in example_trigger_controller.py) +# 2. The Target DAG - DAG being triggered +# +# This example illustrates the following features : +# 1. A TriggerDagRunOperator that takes: +# a. A python callable that decides whether or not to trigger the Target DAG +# b. An optional params dict passed to the python callable to help in +# evaluating whether or not to trigger the Target DAG +# c. The id (name) of the Target DAG +# d. The python callable can add contextual info to the DagRun created by +# way of adding a Pickleable payload (e.g. dictionary of primitives). This +# state is then made available to the TargetDag +# 2. A Target DAG : c.f. example_trigger_target_dag.py + +dag = DAG(dag_id="trigger_target_dag", + default_args={ + "start_date": days_ago(2), + "owner": "jferriero@google.com" + }, + schedule_interval=None, + ) + + +def run_this_func(ds, **kwargs): + print("Remotely received value of {} for key=message".format( + kwargs['dag_run'].conf['message'])) + + +run_this = PythonOperator( + task_id='run_this', + provide_context=True, + python_callable=run_this_func, + dag=dag, +) + +# You can also access the DagRun object in templates +bash_task = BashOperator( + task_id="bash_task", + bash_command='echo "Here is the message: $message"', + env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, + dag=dag, +) diff --git a/composer/dags/example_dags/xcom.py b/composer/dags/example_dags/xcom.py new file mode 100644 index 0000000..1141634 --- /dev/null +++ b/composer/dags/example_dags/xcom.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import print_function + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +args = { + 'owner': 'jferriero@google.com', + 'start_date': days_ago(2), + 'provide_context': True, +} + +dag = DAG('xcom', + schedule_interval="@once", + default_args=args, + ) + +value_1 = [1, 2, 3] +value_2 = {'a': 'b'} + + +def push(**kwargs): + """Pushes an XCom without a specific target""" + kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1) + + +def push_by_returning(**kwargs): + """Pushes an XCom without a specific target, just by returning it""" + return value_2 + + +def puller(**kwargs): + ti = kwargs['ti'] + + # get value_1 + v1 = ti.xcom_pull(key=None, task_ids='push') + assert v1 == value_1 + + # get value_2 + v2 = ti.xcom_pull(task_ids='push_by_returning') + assert v2 == value_2 + + # get both value_1 and value_2 + v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning']) + assert (v1, v2) == (value_1, value_2) + + +push1 = PythonOperator( + task_id='push', + dag=dag, + python_callable=push, +) + +push2 = PythonOperator( + task_id='push_by_returning', + dag=dag, + python_callable=push_by_returning, +) + +pull = PythonOperator( + task_id='puller', + dag=dag, + python_callable=puller, +) + +pull << [push1, push2]