diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index c037b05ecf6..e5163725038 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -264,6 +264,10 @@ ) # This configuration sets `kubernetes_conn_id` in airflow's KubernetesPodOperator. AIRFLOW_KUBERNETES_CONN_ID = from_conf("AIRFLOW_KUBERNETES_CONN_ID") +AIRFLOW_KUBERNETES_KUBECONFIG_FILE = from_conf("AIRFLOW_KUBERNETES_KUBECONFIG_FILE") +AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT = from_conf( + "AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT" +) ### diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 6ff7e4c05cd..a639348ed01 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -101,7 +101,6 @@ # Add Airflow sensor related flow decorators SENSOR_FLOW_DECORATORS = [ ("airflow_external_task_sensor", ".airflow.sensors.ExternalTaskSensorDecorator"), - ("airflow_sql_sensor", ".airflow.sensors.SQLSensorDecorator"), ("airflow_s3_key_sensor", ".airflow.sensors.S3KeySensorDecorator"), ] diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 0e8cbad4da4..945681b3983 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -23,6 +23,8 @@ DATASTORE_SYSROOT_AZURE, CARD_AZUREROOT, AIRFLOW_KUBERNETES_CONN_ID, + AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, + AIRFLOW_KUBERNETES_KUBECONFIG_FILE, DATASTORE_SYSROOT_GS, CARD_GSROOT, ) @@ -468,10 +470,16 @@ def _to_job(self, node): reattach_on_restart=False, secrets=[], ) + k8s_operator_args["in_cluster"] = True if AIRFLOW_KUBERNETES_CONN_ID is not None: k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID - else: - k8s_operator_args["in_cluster"] = True + k8s_operator_args["in_cluster"] = False + if AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT is not None: + k8s_operator_args["cluster_context"] = AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT + k8s_operator_args["in_cluster"] = False + if AIRFLOW_KUBERNETES_KUBECONFIG_FILE is not None: + k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE + k8s_operator_args["in_cluster"] = False if k8s_deco.attributes["secrets"]: if isinstance(k8s_deco.attributes["secrets"], str): diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index c94b6734818..ba02453189e 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -196,7 +196,6 @@ def pathspec(cls, flowname, is_foreach=False): class SensorNames: EXTERNAL_TASK_SENSOR = "ExternalTaskSensor" S3_SENSOR = "S3KeySensor" - SQL_SENSOR = "SQLSensor" @classmethod def get_supported_sensors(cls): @@ -423,11 +422,6 @@ def _get_sensor(name): "`pip install apache-airflow-providers-amazon`" ) return S3KeySensor - elif name == SensorNames.SQL_SENSOR: - from airflow.sensors.sql import SqlSensor - - return SqlSensor - def get_metaflow_kubernetes_operator(): try: diff --git a/metaflow/plugins/airflow/sensors/__init__.py b/metaflow/plugins/airflow/sensors/__init__.py index 02952d0c9a4..d788d704523 100644 --- a/metaflow/plugins/airflow/sensors/__init__.py +++ b/metaflow/plugins/airflow/sensors/__init__.py @@ -1,9 +1,7 @@ from .external_task_sensor import ExternalTaskSensorDecorator from .s3_sensor import S3KeySensorDecorator -from .sql_sensor import SQLSensorDecorator SUPPORTED_SENSORS = [ ExternalTaskSensorDecorator, S3KeySensorDecorator, - SQLSensorDecorator, ] diff --git a/metaflow/plugins/airflow/sensors/external_task_sensor.py b/metaflow/plugins/airflow/sensors/external_task_sensor.py index 649edba706c..c599a05b4a0 100644 --- a/metaflow/plugins/airflow/sensors/external_task_sensor.py +++ b/metaflow/plugins/airflow/sensors/external_task_sensor.py @@ -18,6 +18,46 @@ class ExternalTaskSensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_external_task_sensor` decorator attaches a Airflow [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) before the start step of the flow. + This decorator only works when a flow is scheduled on Airflow and is compiled using `airflow create`. More than one `@airflow_external_task_sensor` can be added as a flow decorators. Adding more than one decorator will ensure that `start` step starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + external_dag_id : string + The dag_id that contains the task you want to wait for. + external_task_ids : List[string] + The list of task_ids that you want to wait for. + If None (default value) the sensor waits for the DAG. (Default: None) + allowed_states : List[string] + Iterable of allowed states, (Default: ['success']) + failed_states : List[string] + Iterable of failed or dis-allowed states. (Default: None) + execution_delta : datetime.timedelta + time difference with the previous execution to look at, + the default is the same logical date as the current task or DAG. (Default: None) + check_existence: bool + Set to True to check if the external task exists or check if + the DAG to wait for exists. (Default: True) + """ + operator_type = SensorNames.EXTERNAL_TASK_SENSOR # Docs: # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor diff --git a/metaflow/plugins/airflow/sensors/s3_sensor.py b/metaflow/plugins/airflow/sensors/s3_sensor.py index b4f7ae5b6de..6fac6725d20 100644 --- a/metaflow/plugins/airflow/sensors/s3_sensor.py +++ b/metaflow/plugins/airflow/sensors/s3_sensor.py @@ -4,6 +4,46 @@ class S3KeySensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_s3_key_sensor` decorator attaches a Airflow [S3KeySensor](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3/index.html#airflow.providers.amazon.aws.sensors.s3.S3KeySensor) + before the start step of the flow. This decorator only works when a flow is scheduled on Airflow + and is compiled using `airflow create`. More than one `@airflow_s3_key_sensor` can be + added as a flow decorators. Adding more than one decorator will ensure that `start` step + starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + bucket_key : str | List[str] + The key(s) being waited on. Supports full s3:// style url or relative path from root level. + When it's specified as a full s3:// url, please leave `bucket_name` as None + bucket_name : str + Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url. + When specified, all the keys passed to bucket_key refers to this bucket. (Default:None) + wildcard_match : bool + whether the bucket_key should be interpreted as a Unix wildcard pattern. (Default: False) + aws_conn_id : string + a reference to the s3 connection on Airflow. (Default: None) + verify : bool + Whether or not to verify SSL certificates for S3 connection. (Default: None) + """ + name = "airflow_s3_key_sensor" operator_type = SensorNames.S3_SENSOR # Arg specification can be found here : diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py deleted file mode 100644 index c97c41b283e..00000000000 --- a/metaflow/plugins/airflow/sensors/sql_sensor.py +++ /dev/null @@ -1,31 +0,0 @@ -from .base_sensor import AirflowSensorDecorator -from ..airflow_utils import SensorNames -from ..exception import AirflowException - - -class SQLSensorDecorator(AirflowSensorDecorator): - name = "airflow_sql_sensor" - operator_type = SensorNames.SQL_SENSOR - # Arg specification can be found here : - # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor - defaults = dict( - **AirflowSensorDecorator.defaults, - conn_id=None, - sql=None, - # success = None, # sucess/failure require callables. Wont be supported at start since not serialization friendly. - # failure = None, - parameters=None, - fail_on_empty=True, - ) - - def validate(self): - if self.attributes["conn_id"] is None: - raise AirflowException( - "`%s` argument of `@%s`cannot be `None`." % ("conn_id", self.name) - ) - raise _arg_exception("conn_id", self.name, None) - if self.attributes["sql"] is None: - raise AirflowException( - "`%s` argument of `@%s`cannot be `None`." % ("sql", self.name) - ) - super().validate()