From 2c1c2fe028bee9d3eba8fe016e3d3b39af5db8ff Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 24 Jan 2023 00:28:13 +0000 Subject: [PATCH 1/4] Added additional variables for mwaa support. --- metaflow/metaflow_config.py | 4 ++++ metaflow/plugins/airflow/airflow.py | 19 ++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index c037b05ecf6..e5163725038 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -264,6 +264,10 @@ ) # This configuration sets `kubernetes_conn_id` in airflow's KubernetesPodOperator. AIRFLOW_KUBERNETES_CONN_ID = from_conf("AIRFLOW_KUBERNETES_CONN_ID") +AIRFLOW_KUBERNETES_KUBECONFIG_FILE = from_conf("AIRFLOW_KUBERNETES_KUBECONFIG_FILE") +AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT = from_conf( + "AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT" +) ### diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 0e8cbad4da4..e0b8c0790ee 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -23,6 +23,8 @@ DATASTORE_SYSROOT_AZURE, CARD_AZUREROOT, AIRFLOW_KUBERNETES_CONN_ID, + AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, + AIRFLOW_KUBERNETES_KUBECONFIG_FILE, DATASTORE_SYSROOT_GS, CARD_GSROOT, ) @@ -470,7 +472,22 @@ def _to_job(self, node): ) if AIRFLOW_KUBERNETES_CONN_ID is not None: k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID - else: + k8s_operator_args["in_cluster"] = False + if AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT is not None: + k8s_operator_args["cluster_context"] = AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT + k8s_operator_args["in_cluster"] = False + if AIRFLOW_KUBERNETES_KUBECONFIG_FILE is not None: + k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE + k8s_operator_args["in_cluster"] = False + + if all( + x is None + for x in [ + AIRFLOW_KUBERNETES_KUBECONFIG_FILE, + AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, + AIRFLOW_KUBERNETES_CONN_ID, + ] + ): k8s_operator_args["in_cluster"] = True if k8s_deco.attributes["secrets"]: From 104321475b9e8d63a8e7409f75e317eb2c237ce7 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 24 Jan 2023 00:51:19 +0000 Subject: [PATCH 2/4] refactor for simplicity. --- metaflow/plugins/airflow/airflow.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index e0b8c0790ee..945681b3983 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -470,6 +470,7 @@ def _to_job(self, node): reattach_on_restart=False, secrets=[], ) + k8s_operator_args["in_cluster"] = True if AIRFLOW_KUBERNETES_CONN_ID is not None: k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID k8s_operator_args["in_cluster"] = False @@ -480,16 +481,6 @@ def _to_job(self, node): k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE k8s_operator_args["in_cluster"] = False - if all( - x is None - for x in [ - AIRFLOW_KUBERNETES_KUBECONFIG_FILE, - AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, - AIRFLOW_KUBERNETES_CONN_ID, - ] - ): - k8s_operator_args["in_cluster"] = True - if k8s_deco.attributes["secrets"]: if isinstance(k8s_deco.attributes["secrets"], str): k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"].split(",") From cb274964e640ba8da17d5e155f7476a58cbe3b05 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Mon, 30 Jan 2023 23:24:59 +0000 Subject: [PATCH 3/4] Added docstrings for sensors: - Fixed SQL Sensor import. --- metaflow/plugins/airflow/airflow_utils.py | 12 +++++- .../airflow/sensors/external_task_sensor.py | 40 +++++++++++++++++++ metaflow/plugins/airflow/sensors/s3_sensor.py | 40 +++++++++++++++++++ .../plugins/airflow/sensors/sql_sensor.py | 37 +++++++++++++++++ 4 files changed, 128 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index c94b6734818..51774486c49 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -424,7 +424,17 @@ def _get_sensor(name): ) return S3KeySensor elif name == SensorNames.SQL_SENSOR: - from airflow.sensors.sql import SqlSensor + try: + from airflow.sensors.sql import SqlSensor + except ImportError as e: + try: + from airflow.providers.common.sql.sensors.sql import SqlSensor + except ImportError as e: + raise AirflowSensorNotFound( + "This DAG requires a `SqlSensor`. " + "Install the Airflow SQL provider using : " + "`pip install apache-airflow-providers-common-sql`" + ) return SqlSensor diff --git a/metaflow/plugins/airflow/sensors/external_task_sensor.py b/metaflow/plugins/airflow/sensors/external_task_sensor.py index 649edba706c..c599a05b4a0 100644 --- a/metaflow/plugins/airflow/sensors/external_task_sensor.py +++ b/metaflow/plugins/airflow/sensors/external_task_sensor.py @@ -18,6 +18,46 @@ class ExternalTaskSensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_external_task_sensor` decorator attaches a Airflow [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) before the start step of the flow. + This decorator only works when a flow is scheduled on Airflow and is compiled using `airflow create`. More than one `@airflow_external_task_sensor` can be added as a flow decorators. Adding more than one decorator will ensure that `start` step starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + external_dag_id : string + The dag_id that contains the task you want to wait for. + external_task_ids : List[string] + The list of task_ids that you want to wait for. + If None (default value) the sensor waits for the DAG. (Default: None) + allowed_states : List[string] + Iterable of allowed states, (Default: ['success']) + failed_states : List[string] + Iterable of failed or dis-allowed states. (Default: None) + execution_delta : datetime.timedelta + time difference with the previous execution to look at, + the default is the same logical date as the current task or DAG. (Default: None) + check_existence: bool + Set to True to check if the external task exists or check if + the DAG to wait for exists. (Default: True) + """ + operator_type = SensorNames.EXTERNAL_TASK_SENSOR # Docs: # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor diff --git a/metaflow/plugins/airflow/sensors/s3_sensor.py b/metaflow/plugins/airflow/sensors/s3_sensor.py index b4f7ae5b6de..6fac6725d20 100644 --- a/metaflow/plugins/airflow/sensors/s3_sensor.py +++ b/metaflow/plugins/airflow/sensors/s3_sensor.py @@ -4,6 +4,46 @@ class S3KeySensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_s3_key_sensor` decorator attaches a Airflow [S3KeySensor](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3/index.html#airflow.providers.amazon.aws.sensors.s3.S3KeySensor) + before the start step of the flow. This decorator only works when a flow is scheduled on Airflow + and is compiled using `airflow create`. More than one `@airflow_s3_key_sensor` can be + added as a flow decorators. Adding more than one decorator will ensure that `start` step + starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + bucket_key : str | List[str] + The key(s) being waited on. Supports full s3:// style url or relative path from root level. + When it's specified as a full s3:// url, please leave `bucket_name` as None + bucket_name : str + Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url. + When specified, all the keys passed to bucket_key refers to this bucket. (Default:None) + wildcard_match : bool + whether the bucket_key should be interpreted as a Unix wildcard pattern. (Default: False) + aws_conn_id : string + a reference to the s3 connection on Airflow. (Default: None) + verify : bool + Whether or not to verify SSL certificates for S3 connection. (Default: None) + """ + name = "airflow_s3_key_sensor" operator_type = SensorNames.S3_SENSOR # Arg specification can be found here : diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py index c97c41b283e..ad2379514bb 100644 --- a/metaflow/plugins/airflow/sensors/sql_sensor.py +++ b/metaflow/plugins/airflow/sensors/sql_sensor.py @@ -4,6 +4,43 @@ class SQLSensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_sql_sensor` decorator attaches a Airflow [SqlSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor) + before the start step of the flow. This decorator only works when a flow is scheduled on Airflow + and is compiled using `airflow create`. More than one `@airflow_sql_sensor` can be + added as a flow decorators. Adding more than one decorator will ensure that `start` step + starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + sql : str + The sql to run. To pass, it needs to return at least one cell that + contains a non-zero / empty string value. + parameters : List[str] | Dict + The parameters to render the SQL query with (optional). (Default: None) + fail_on_empty : bool + Explicitly fail on no rows returned. (Default: True) + conn_id : string + a reference to the SQL connection on Airflow. + """ + name = "airflow_sql_sensor" operator_type = SensorNames.SQL_SENSOR # Arg specification can be found here : From c102499d26ce0fe82a1c387c01a0c026850b8820 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 31 Jan 2023 00:51:09 +0000 Subject: [PATCH 4/4] Remove SQL Sensor from the code. --- metaflow/plugins/__init__.py | 1 - metaflow/plugins/airflow/airflow_utils.py | 16 ----- metaflow/plugins/airflow/sensors/__init__.py | 2 - .../plugins/airflow/sensors/sql_sensor.py | 68 ------------------- 4 files changed, 87 deletions(-) delete mode 100644 metaflow/plugins/airflow/sensors/sql_sensor.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 6ff7e4c05cd..a639348ed01 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -101,7 +101,6 @@ # Add Airflow sensor related flow decorators SENSOR_FLOW_DECORATORS = [ ("airflow_external_task_sensor", ".airflow.sensors.ExternalTaskSensorDecorator"), - ("airflow_sql_sensor", ".airflow.sensors.SQLSensorDecorator"), ("airflow_s3_key_sensor", ".airflow.sensors.S3KeySensorDecorator"), ] diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index 51774486c49..ba02453189e 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -196,7 +196,6 @@ def pathspec(cls, flowname, is_foreach=False): class SensorNames: EXTERNAL_TASK_SENSOR = "ExternalTaskSensor" S3_SENSOR = "S3KeySensor" - SQL_SENSOR = "SQLSensor" @classmethod def get_supported_sensors(cls): @@ -423,21 +422,6 @@ def _get_sensor(name): "`pip install apache-airflow-providers-amazon`" ) return S3KeySensor - elif name == SensorNames.SQL_SENSOR: - try: - from airflow.sensors.sql import SqlSensor - except ImportError as e: - try: - from airflow.providers.common.sql.sensors.sql import SqlSensor - except ImportError as e: - raise AirflowSensorNotFound( - "This DAG requires a `SqlSensor`. " - "Install the Airflow SQL provider using : " - "`pip install apache-airflow-providers-common-sql`" - ) - - return SqlSensor - def get_metaflow_kubernetes_operator(): try: diff --git a/metaflow/plugins/airflow/sensors/__init__.py b/metaflow/plugins/airflow/sensors/__init__.py index 02952d0c9a4..d788d704523 100644 --- a/metaflow/plugins/airflow/sensors/__init__.py +++ b/metaflow/plugins/airflow/sensors/__init__.py @@ -1,9 +1,7 @@ from .external_task_sensor import ExternalTaskSensorDecorator from .s3_sensor import S3KeySensorDecorator -from .sql_sensor import SQLSensorDecorator SUPPORTED_SENSORS = [ ExternalTaskSensorDecorator, S3KeySensorDecorator, - SQLSensorDecorator, ] diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py deleted file mode 100644 index ad2379514bb..00000000000 --- a/metaflow/plugins/airflow/sensors/sql_sensor.py +++ /dev/null @@ -1,68 +0,0 @@ -from .base_sensor import AirflowSensorDecorator -from ..airflow_utils import SensorNames -from ..exception import AirflowException - - -class SQLSensorDecorator(AirflowSensorDecorator): - """ - The `@airflow_sql_sensor` decorator attaches a Airflow [SqlSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor) - before the start step of the flow. This decorator only works when a flow is scheduled on Airflow - and is compiled using `airflow create`. More than one `@airflow_sql_sensor` can be - added as a flow decorators. Adding more than one decorator will ensure that `start` step - starts only after all sensors finish. - - Parameters - ---------- - timeout : int - Time, in seconds before the task times out and fails. (Default: 3600) - poke_interval : int - Time in seconds that the job should wait in between each try. (Default: 60) - mode : string - How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") - exponential_backoff : bool - allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) - pool : string - the slot pool this task should run in, - slot pools are a way to limit concurrency for certain tasks. (Default:None) - soft_fail : bool - Set to true to mark the task as SKIPPED on failure. (Default: False) - name : string - Name of the sensor on Airflow - description : string - Description of sensor in the Airflow UI - sql : str - The sql to run. To pass, it needs to return at least one cell that - contains a non-zero / empty string value. - parameters : List[str] | Dict - The parameters to render the SQL query with (optional). (Default: None) - fail_on_empty : bool - Explicitly fail on no rows returned. (Default: True) - conn_id : string - a reference to the SQL connection on Airflow. - """ - - name = "airflow_sql_sensor" - operator_type = SensorNames.SQL_SENSOR - # Arg specification can be found here : - # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor - defaults = dict( - **AirflowSensorDecorator.defaults, - conn_id=None, - sql=None, - # success = None, # sucess/failure require callables. Wont be supported at start since not serialization friendly. - # failure = None, - parameters=None, - fail_on_empty=True, - ) - - def validate(self): - if self.attributes["conn_id"] is None: - raise AirflowException( - "`%s` argument of `@%s`cannot be `None`." % ("conn_id", self.name) - ) - raise _arg_exception("conn_id", self.name, None) - if self.attributes["sql"] is None: - raise AirflowException( - "`%s` argument of `@%s`cannot be `None`." % ("sql", self.name) - ) - super().validate()