Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@
)
# This configuration sets `kubernetes_conn_id` in airflow's KubernetesPodOperator.
AIRFLOW_KUBERNETES_CONN_ID = from_conf("AIRFLOW_KUBERNETES_CONN_ID")
AIRFLOW_KUBERNETES_KUBECONFIG_FILE = from_conf("AIRFLOW_KUBERNETES_KUBECONFIG_FILE")
AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT = from_conf(
"AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT"
)


###
Expand Down
1 change: 0 additions & 1 deletion metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
# Add Airflow sensor related flow decorators
SENSOR_FLOW_DECORATORS = [
("airflow_external_task_sensor", ".airflow.sensors.ExternalTaskSensorDecorator"),
("airflow_sql_sensor", ".airflow.sensors.SQLSensorDecorator"),
("airflow_s3_key_sensor", ".airflow.sensors.S3KeySensorDecorator"),
]

Expand Down
12 changes: 10 additions & 2 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
DATASTORE_SYSROOT_AZURE,
CARD_AZUREROOT,
AIRFLOW_KUBERNETES_CONN_ID,
AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT,
AIRFLOW_KUBERNETES_KUBECONFIG_FILE,
DATASTORE_SYSROOT_GS,
CARD_GSROOT,
)
Expand Down Expand Up @@ -468,10 +470,16 @@ def _to_job(self, node):
reattach_on_restart=False,
secrets=[],
)
k8s_operator_args["in_cluster"] = True
if AIRFLOW_KUBERNETES_CONN_ID is not None:
k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID
else:
k8s_operator_args["in_cluster"] = True
k8s_operator_args["in_cluster"] = False
if AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT is not None:
k8s_operator_args["cluster_context"] = AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT
k8s_operator_args["in_cluster"] = False
if AIRFLOW_KUBERNETES_KUBECONFIG_FILE is not None:
k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE
k8s_operator_args["in_cluster"] = False

if k8s_deco.attributes["secrets"]:
if isinstance(k8s_deco.attributes["secrets"], str):
Expand Down
6 changes: 0 additions & 6 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ def pathspec(cls, flowname, is_foreach=False):
class SensorNames:
EXTERNAL_TASK_SENSOR = "ExternalTaskSensor"
S3_SENSOR = "S3KeySensor"
SQL_SENSOR = "SQLSensor"

@classmethod
def get_supported_sensors(cls):
Expand Down Expand Up @@ -423,11 +422,6 @@ def _get_sensor(name):
"`pip install apache-airflow-providers-amazon`"
)
return S3KeySensor
elif name == SensorNames.SQL_SENSOR:
from airflow.sensors.sql import SqlSensor

return SqlSensor


def get_metaflow_kubernetes_operator():
try:
Expand Down
2 changes: 0 additions & 2 deletions metaflow/plugins/airflow/sensors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from .external_task_sensor import ExternalTaskSensorDecorator
from .s3_sensor import S3KeySensorDecorator
from .sql_sensor import SQLSensorDecorator

SUPPORTED_SENSORS = [
ExternalTaskSensorDecorator,
S3KeySensorDecorator,
SQLSensorDecorator,
]
40 changes: 40 additions & 0 deletions metaflow/plugins/airflow/sensors/external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,46 @@


class ExternalTaskSensorDecorator(AirflowSensorDecorator):
"""
The `@airflow_external_task_sensor` decorator attaches a Airflow [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) before the start step of the flow.
This decorator only works when a flow is scheduled on Airflow and is compiled using `airflow create`. More than one `@airflow_external_task_sensor` can be added as a flow decorators. Adding more than one decorator will ensure that `start` step starts only after all sensors finish.

Parameters
----------
timeout : int
Time, in seconds before the task times out and fails. (Default: 3600)
poke_interval : int
Time in seconds that the job should wait in between each try. (Default: 60)
mode : string
How the sensor operates. Options are: { poke | reschedule }. (Default: "poke")
exponential_backoff : bool
allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True)
pool : string
the slot pool this task should run in,
slot pools are a way to limit concurrency for certain tasks. (Default:None)
soft_fail : bool
Set to true to mark the task as SKIPPED on failure. (Default: False)
name : string
Name of the sensor on Airflow
description : string
Description of sensor in the Airflow UI
external_dag_id : string
The dag_id that contains the task you want to wait for.
external_task_ids : List[string]
The list of task_ids that you want to wait for.
If None (default value) the sensor waits for the DAG. (Default: None)
allowed_states : List[string]
Iterable of allowed states, (Default: ['success'])
failed_states : List[string]
Iterable of failed or dis-allowed states. (Default: None)
execution_delta : datetime.timedelta
time difference with the previous execution to look at,
the default is the same logical date as the current task or DAG. (Default: None)
check_existence: bool
Set to True to check if the external task exists or check if
the DAG to wait for exists. (Default: True)
"""

Comment on lines +21 to +60
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savingoyal : Added Docstrings for all 3 decorators. Please review when you get the chance.

operator_type = SensorNames.EXTERNAL_TASK_SENSOR
# Docs:
# https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor
Expand Down
40 changes: 40 additions & 0 deletions metaflow/plugins/airflow/sensors/s3_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,46 @@


class S3KeySensorDecorator(AirflowSensorDecorator):
"""
The `@airflow_s3_key_sensor` decorator attaches a Airflow [S3KeySensor](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3/index.html#airflow.providers.amazon.aws.sensors.s3.S3KeySensor)
before the start step of the flow. This decorator only works when a flow is scheduled on Airflow
and is compiled using `airflow create`. More than one `@airflow_s3_key_sensor` can be
added as a flow decorators. Adding more than one decorator will ensure that `start` step
starts only after all sensors finish.

Parameters
----------
timeout : int
Time, in seconds before the task times out and fails. (Default: 3600)
poke_interval : int
Time in seconds that the job should wait in between each try. (Default: 60)
mode : string
How the sensor operates. Options are: { poke | reschedule }. (Default: "poke")
exponential_backoff : bool
allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True)
pool : string
the slot pool this task should run in,
slot pools are a way to limit concurrency for certain tasks. (Default:None)
soft_fail : bool
Set to true to mark the task as SKIPPED on failure. (Default: False)
name : string
Name of the sensor on Airflow
description : string
Description of sensor in the Airflow UI
bucket_key : str | List[str]
The key(s) being waited on. Supports full s3:// style url or relative path from root level.
When it's specified as a full s3:// url, please leave `bucket_name` as None
bucket_name : str
Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url.
When specified, all the keys passed to bucket_key refers to this bucket. (Default:None)
wildcard_match : bool
whether the bucket_key should be interpreted as a Unix wildcard pattern. (Default: False)
aws_conn_id : string
a reference to the s3 connection on Airflow. (Default: None)
verify : bool
Whether or not to verify SSL certificates for S3 connection. (Default: None)
"""

name = "airflow_s3_key_sensor"
operator_type = SensorNames.S3_SENSOR
# Arg specification can be found here :
Expand Down
31 changes: 0 additions & 31 deletions metaflow/plugins/airflow/sensors/sql_sensor.py

This file was deleted.