From 345e0f4171c8472e57e3649e27ca8d0a1cdfb41f Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Mon, 9 Jan 2023 08:27:33 +0000 Subject: [PATCH 01/14] Support for multi-flow decorators. --- metaflow/decorators.py | 34 ++++++++++++------- metaflow/plugins/airflow/airflow.py | 1 + metaflow/plugins/argo/argo_workflows.py | 1 + .../aws/step_functions/step_functions.py | 1 + .../plugins/conda/conda_step_decorator.py | 2 +- 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 8723367b79b..b13b927825e 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -108,6 +108,8 @@ class Decorator(object): name = "NONAME" defaults = {} + # `allow_multiple` allows setting many decorators of the same type to a step/flow. + allow_multiple = False def __init__(self, attributes=None, statically_defined=False): self.attributes = self.defaults.copy() @@ -255,9 +257,6 @@ class MyDecorator(StepDecorator): pass them around with every lifecycle call. """ - # `allow_multiple` allows setting many decorators of the same type to a step. - allow_multiple = False - def step_init( self, flow, graph, step_name, decorators, environment, flow_datastore, logger ): @@ -402,13 +401,15 @@ def _base_flow_decorator(decofunc, *args, **kwargs): cls = args[0] if isinstance(cls, type) and issubclass(cls, FlowSpec): # flow decorators add attributes in the class dictionary, - # _flow_decorators. - if decofunc.name in cls._flow_decorators: + # _flow_decorators. _flow_decorators is of type `{key:[decos]}` + if decofunc.name in cls._flow_decorators and not decofunc.allow_multiple: raise DuplicateFlowDecoratorException(decofunc.name) else: - cls._flow_decorators[decofunc.name] = decofunc( - attributes=kwargs, statically_defined=True - ) + deco_instance = decofunc(attributes=kwargs, statically_defined=True) + if decofunc.name not in cls._flow_decorators: + cls._flow_decorators[decofunc.name] = [deco_instance] + else: + cls._flow_decorators[decofunc.name].append(deco_instance) else: raise BadFlowDecoratorException(decofunc.name) return cls @@ -503,11 +504,20 @@ def _attach_decorators_to_step(step, decospecs): def _init_flow_decorators( flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options ): + # Since all flow decorators are stored as `{key:[deco]}` we iterate through each of them. for deco in flow._flow_decorators.values(): - opts = {option: deco_options[option] for option in deco.options} - deco.flow_init( - flow, graph, environment, flow_datastore, metadata, logger, echo, opts - ) + for rd in deco: + opts = {option: deco_options[option] for option in rd.options} + rd.flow_init( + flow, + graph, + environment, + flow_datastore, + metadata, + logger, + echo, + opts, + ) def _init_step_decorators(flow, graph, environment, flow_datastore, logger): diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 5480a79c59e..02a71d856ab 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -140,6 +140,7 @@ def _get_schedule(self): schedule = self.flow._flow_decorators.get("schedule") if not schedule: return None + schedule = schedule[0] if schedule.attributes["cron"]: return schedule.attributes["cron"] elif schedule.attributes["weekly"]: diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 1730b493248..caa61403e70 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -174,6 +174,7 @@ def trigger(cls, name, parameters=None): def _cron(self): schedule = self.flow._flow_decorators.get("schedule") + schedule = schedule[0] if schedule: # Remove the field "Year" if it exists return " ".join(schedule.schedule.split()[:5]), schedule.timezone diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 8c9584ed706..370d4adec93 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -328,6 +328,7 @@ def _visit(node, workflow, exit_node=None): def _cron(self): schedule = self.flow._flow_decorators.get("schedule") + schedule = schedule[0] if schedule: if schedule.timezone is not None: raise StepFunctionsException( diff --git a/metaflow/plugins/conda/conda_step_decorator.py b/metaflow/plugins/conda/conda_step_decorator.py index 6896e293c25..818e312a115 100644 --- a/metaflow/plugins/conda/conda_step_decorator.py +++ b/metaflow/plugins/conda/conda_step_decorator.py @@ -63,7 +63,7 @@ class CondaStepDecorator(StepDecorator): def _get_base_attributes(self): if "conda_base" in self.flow._flow_decorators: - return self.flow._flow_decorators["conda_base"].attributes + return self.flow._flow_decorators["conda_base"][0].attributes return self.defaults def _python_version(self): From e46c51b0c63b4a83dcc71f70f688085df3802b6e Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Fri, 27 Jan 2023 19:47:01 +0000 Subject: [PATCH 02/14] Refactor option setting from flow decorators. --- metaflow/decorators.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/metaflow/decorators.py b/metaflow/decorators.py index b13b927825e..b8bdcaaedcb 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -505,10 +505,28 @@ def _init_flow_decorators( flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options ): # Since all flow decorators are stored as `{key:[deco]}` we iterate through each of them. - for deco in flow._flow_decorators.values(): - for rd in deco: - opts = {option: deco_options[option] for option in rd.options} - rd.flow_init( + for decorators in flow._flow_decorators.values(): + # First resolve the `options` for the flow decorator. + # Options are passed from cli. + # For example `@project` can take a `--name` / `--branch` from the cli as options. + deco_flow_init_options = {} + deco = decorators[0] + # If a flow decorator allow multiple of same type then we don't allow multiple options for it. + if deco.allow_multiple: + if len(deco.options) > 0: + raise MetaflowException( + "Flow decorator `@%s` has multiple options, which is not allowed. " + "Please ensure the FlowDecorator `%s` has no options since flow decorators with " + "`allow_mutiple=True` are not allowed to have options" + % (deco.name, deco.__class__.__name__) + ) + else: + # Each "non-multiple" flow decorator is only allowed to have one set of options + deco_flow_init_options = { + option: deco_options[option] for option in deco.options + } + for deco in decorators: + deco.flow_init( flow, graph, environment, @@ -516,7 +534,7 @@ def _init_flow_decorators( metadata, logger, echo, - opts, + deco_flow_init_options, ) From 48197c056004df241dbc83619aba9e0d0d47a21c Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Fri, 27 Jan 2023 19:52:38 +0000 Subject: [PATCH 03/14] refactor logic for simplicity --- metaflow/decorators.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/metaflow/decorators.py b/metaflow/decorators.py index b8bdcaaedcb..b3f88df56d5 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -406,10 +406,7 @@ def _base_flow_decorator(decofunc, *args, **kwargs): raise DuplicateFlowDecoratorException(decofunc.name) else: deco_instance = decofunc(attributes=kwargs, statically_defined=True) - if decofunc.name not in cls._flow_decorators: - cls._flow_decorators[decofunc.name] = [deco_instance] - else: - cls._flow_decorators[decofunc.name].append(deco_instance) + cls._flow_decorators.setdefault(decofunc.name, []).append(deco_instance) else: raise BadFlowDecoratorException(decofunc.name) return cls From 75c690b11dc943f9abbbcf6dde6debed32364747 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Fri, 27 Jan 2023 21:25:37 +0000 Subject: [PATCH 04/14] Fixed bug due to fat-fingered indent --- metaflow/decorators.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metaflow/decorators.py b/metaflow/decorators.py index b3f88df56d5..ebdd77ebd7e 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -517,11 +517,11 @@ def _init_flow_decorators( "`allow_mutiple=True` are not allowed to have options" % (deco.name, deco.__class__.__name__) ) - else: - # Each "non-multiple" flow decorator is only allowed to have one set of options - deco_flow_init_options = { - option: deco_options[option] for option in deco.options - } + else: + # Each "non-multiple" flow decorator is only allowed to have one set of options + deco_flow_init_options = { + option: deco_options[option] for option in deco.options + } for deco in decorators: deco.flow_init( flow, From 1ab2bcbcace2f4c2263e38ebd83b876d60d95767 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 24 Jan 2023 00:24:33 +0000 Subject: [PATCH 05/14] Airflow Foreach Base + Sensors --- metaflow/plugins/__init__.py | 9 ++ metaflow/plugins/airflow/airflow.py | 18 ++++ metaflow/plugins/airflow/airflow_cli.py | 10 +- metaflow/plugins/airflow/airflow_utils.py | 63 +++++++++++++ metaflow/plugins/airflow/sensors/__init__.py | 9 ++ .../plugins/airflow/sensors/base_sensor.py | 74 +++++++++++++++ .../airflow/sensors/external_task_sensor.py | 94 +++++++++++++++++++ metaflow/plugins/airflow/sensors/s3_sensor.py | 26 +++++ .../plugins/airflow/sensors/sql_sensor.py | 31 ++++++ 9 files changed, 326 insertions(+), 8 deletions(-) create mode 100644 metaflow/plugins/airflow/sensors/__init__.py create mode 100644 metaflow/plugins/airflow/sensors/base_sensor.py create mode 100644 metaflow/plugins/airflow/sensors/external_task_sensor.py create mode 100644 metaflow/plugins/airflow/sensors/s3_sensor.py create mode 100644 metaflow/plugins/airflow/sensors/sql_sensor.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index b59ab65591c..6ff7e4c05cd 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -98,6 +98,15 @@ # Add AWS client providers here AWS_CLIENT_PROVIDERS_DESC = [("boto3", ".aws.aws_client.Boto3ClientProvider")] +# Add Airflow sensor related flow decorators +SENSOR_FLOW_DECORATORS = [ + ("airflow_external_task_sensor", ".airflow.sensors.ExternalTaskSensorDecorator"), + ("airflow_sql_sensor", ".airflow.sensors.SQLSensorDecorator"), + ("airflow_s3_key_sensor", ".airflow.sensors.S3KeySensorDecorator"), +] + +FLOW_DECORATORS_DESC += SENSOR_FLOW_DECORATORS + process_plugins(globals()) diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 02a71d856ab..9e8d0a0d398 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -35,6 +35,7 @@ from . import airflow_utils from .exception import AirflowException +from .sensors import SUPPORTED_SENSORS from .airflow_utils import ( TASK_ID_XCOM_KEY, AirflowTask, @@ -88,6 +89,7 @@ def __init__( self.username = username self.max_workers = max_workers self.description = description + self._depends_on_upstream_sensors = False self._file_path = file_path _, self.graph_structure = self.graph.output_steps() self.worker_pool = worker_pool @@ -585,6 +587,17 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): cmds.append(" ".join(entrypoint + top_level + step)) return cmds + def _collect_flow_sensors(self): + decos_lists = [ + self.flow._flow_decorators.get(s.name) + for s in SUPPORTED_SENSORS + if self.flow._flow_decorators.get(s.name) is not None + ] + af_tasks = [deco.create_task() for decos in decos_lists for deco in decos] + if len(af_tasks) > 0: + self._depends_on_upstream_sensors = True + return af_tasks + def _contains_foreach(self): for node in self.graph: if node.type == "foreach": @@ -639,6 +652,7 @@ def _visit(node, workflow, exit_node=None): if self.workflow_timeout is not None and self.schedule is not None: airflow_dag_args["dagrun_timeout"] = dict(seconds=self.workflow_timeout) + appending_sensors = self._collect_flow_sensors() workflow = Workflow( dag_id=self.name, default_args=self._create_defaults(), @@ -659,6 +673,10 @@ def _visit(node, workflow, exit_node=None): workflow = _visit(self.graph["start"], workflow) workflow.set_parameters(self.parameters) + if len(appending_sensors) > 0: + for s in appending_sensors: + workflow.add_state(s) + workflow.graph_structure.insert(0, [[s.name] for s in appending_sensors]) return self._to_airflow_dag_file(workflow.to_dict()) def _to_airflow_dag_file(self, json_dag): diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 5ac676978c2..1f48a1fa481 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -322,7 +322,6 @@ def make_flow( def _validate_foreach_constraints(graph): - # Todo :Invoke this function when we integrate `foreach`s def traverse_graph(node, state): if node.type == "foreach" and node.is_inside_foreach: raise NotSupportedException( @@ -338,7 +337,7 @@ def traverse_graph(node, state): if node.type == "linear" and node.is_inside_foreach: state["foreach_stack"].append(node.name) - if len(state["foreach_stack"]) > 2: + if "foreach_stack" in state and len(state["foreach_stack"]) > 2: raise NotSupportedException( "The foreach step *%s* created by step *%s* needs to have an immediate join step. " "Step *%s* is invalid since it is a linear step with a foreach. " @@ -378,18 +377,13 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): "A default value is required for parameters when deploying flows on Airflow." ) # check for other compute related decorators. + _validate_foreach_constraints(graph) for node in graph: if node.parallel_foreach: raise AirflowException( "Deploying flows with @parallel decorator(s) " "to Airflow is not supported currently." ) - - if node.type == "foreach": - raise NotSupportedException( - "Step *%s* is a foreach step and Foreach steps are not currently supported with Airflow." - % node.name - ) if any([d.name == "batch" for d in node.decorators]): raise NotSupportedException( "Step *%s* is marked for execution on AWS Batch with Airflow which isn't currently supported." diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index 26e544e8d61..c94b6734818 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -44,6 +44,10 @@ class IncompatibleKubernetesProviderVersionException(Exception): ) % (sys.executable, KUBERNETES_PROVIDER_FOREACH_VERSION) +class AirflowSensorNotFound(Exception): + headline = "Sensor package not found" + + def create_absolute_version_number(version): abs_version = None # For all digits @@ -189,6 +193,16 @@ def pathspec(cls, flowname, is_foreach=False): ) +class SensorNames: + EXTERNAL_TASK_SENSOR = "ExternalTaskSensor" + S3_SENSOR = "S3KeySensor" + SQL_SENSOR = "SQLSensor" + + @classmethod + def get_supported_sensors(cls): + return list(cls.__dict__.values()) + + def run_id_creator(val): # join `[dag-id,run-id]` of airflow dag. return hashlib.md5("-".join([str(x) for x in val]).encode("utf-8")).hexdigest()[ @@ -375,6 +389,46 @@ def _kubernetes_pod_operator_args(operator_args): return args +def _parse_sensor_args(name, kwargs): + if name == SensorNames.EXTERNAL_TASK_SENSOR: + if "execution_delta" in kwargs: + if type(kwargs["execution_delta"]) == dict: + kwargs["execution_delta"] = timedelta(**kwargs["execution_delta"]) + else: + del kwargs["execution_delta"] + return kwargs + + +def _get_sensor(name): + # from airflow import XComArg + # XComArg() + if name == SensorNames.EXTERNAL_TASK_SENSOR: + # ExternalTaskSensors uses an execution_date of a dag to + # determine the appropriate DAG. + # This is set to the exact date the current dag gets executed on. + # For example if "DagA" (Upstream DAG) got scheduled at + # 12 Jan 4:00 PM PDT then "DagB"(current DAG)'s task sensor will try to + # look for a "DagA" that got executed at 12 Jan 4:00 PM PDT **exactly**. + # They also support a `execution_timeout` argument to + from airflow.sensors.external_task_sensor import ExternalTaskSensor + + return ExternalTaskSensor + elif name == SensorNames.S3_SENSOR: + try: + from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor + except ImportError: + raise AirflowSensorNotFound( + "This DAG requires a `S3KeySensor`. " + "Install the Airflow AWS provider using : " + "`pip install apache-airflow-providers-amazon`" + ) + return S3KeySensor + elif name == SensorNames.SQL_SENSOR: + from airflow.sensors.sql import SqlSensor + + return SqlSensor + + def get_metaflow_kubernetes_operator(): try: from airflow.contrib.operators.kubernetes_pod_operator import ( @@ -493,6 +547,13 @@ def set_operator_args(self, **kwargs): self._operator_args = kwargs return self + def _make_sensor(self): + TaskSensor = _get_sensor(self._operator_type) + return TaskSensor( + task_id=self.name, + **_parse_sensor_args(self._operator_type, self._operator_args) + ) + def to_dict(self): return { "name": self.name, @@ -541,6 +602,8 @@ def to_task(self): return self._kubernetes_task() else: return self._kubernetes_mapper_task() + elif self._operator_type in SensorNames.get_supported_sensors(): + return self._make_sensor() class Workflow(object): diff --git a/metaflow/plugins/airflow/sensors/__init__.py b/metaflow/plugins/airflow/sensors/__init__.py new file mode 100644 index 00000000000..02952d0c9a4 --- /dev/null +++ b/metaflow/plugins/airflow/sensors/__init__.py @@ -0,0 +1,9 @@ +from .external_task_sensor import ExternalTaskSensorDecorator +from .s3_sensor import S3KeySensorDecorator +from .sql_sensor import SQLSensorDecorator + +SUPPORTED_SENSORS = [ + ExternalTaskSensorDecorator, + S3KeySensorDecorator, + SQLSensorDecorator, +] diff --git a/metaflow/plugins/airflow/sensors/base_sensor.py b/metaflow/plugins/airflow/sensors/base_sensor.py new file mode 100644 index 00000000000..9412072cd23 --- /dev/null +++ b/metaflow/plugins/airflow/sensors/base_sensor.py @@ -0,0 +1,74 @@ +import uuid +from metaflow.decorators import FlowDecorator +from ..exception import AirflowException +from ..airflow_utils import AirflowTask, id_creator, TASK_ID_HASH_LEN + + +class AirflowSensorDecorator(FlowDecorator): + """ + Base class for all Airflow sensor decorators. + """ + + allow_multiple = True + + defaults = dict( + timeout=3600, + poke_interval=60, + mode="reschedule", + exponential_backoff=True, + pool=None, + soft_fail=False, + name=None, + description=None, + ) + + operator_type = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._airflow_task_name = None + self._id = str(uuid.uuid4()) + + def serialize_operator_args(self): + """ + Subclasses will parse the decorator arguments to + Airflow task serializable arguments. + """ + task_args = dict(**self.attributes) + del task_args["name"] + if task_args["description"] is not None: + task_args["doc"] = task_args["description"] + del task_args["description"] + task_args["do_xcom_push"] = True + return task_args + + def create_task(self): + task_args = self.serialize_operator_args() + return AirflowTask( + self._airflow_task_name, + operator_type=self.operator_type, + ).set_operator_args(**{k: v for k, v in task_args.items() if v is not None}) + + def validate(self): + """ + Validate if the arguments for the sensor are correct. + """ + # If there is no name set then auto-generate the name. This is done because there can be more than + # one `AirflowSensorDecorator` of the same type. + if self.attributes["name"] is None: + deco_index = [ + d._id + for d in self._flow_decorators + if issubclass(d.__class__, AirflowSensorDecorator) + ].index(self._id) + self._airflow_task_name = "%s-%s" % ( + self.operator_type, + id_creator([self.operator_type, str(deco_index)], TASK_ID_HASH_LEN), + ) + else: + self._airflow_task_name = self.attributes["name"] + + def flow_init( + self, flow, graph, environment, flow_datastore, metadata, logger, echo, options + ): + self.validate() diff --git a/metaflow/plugins/airflow/sensors/external_task_sensor.py b/metaflow/plugins/airflow/sensors/external_task_sensor.py new file mode 100644 index 00000000000..649edba706c --- /dev/null +++ b/metaflow/plugins/airflow/sensors/external_task_sensor.py @@ -0,0 +1,94 @@ +from .base_sensor import AirflowSensorDecorator +from ..airflow_utils import SensorNames +from ..exception import AirflowException +from datetime import timedelta + + +AIRFLOW_STATES = dict( + QUEUED="queued", + RUNNING="running", + SUCCESS="success", + SHUTDOWN="shutdown", # External request to shut down, + FAILED="failed", + UP_FOR_RETRY="up_for_retry", + UP_FOR_RESCHEDULE="up_for_reschedule", + UPSTREAM_FAILED="upstream_failed", + SKIPPED="skipped", +) + + +class ExternalTaskSensorDecorator(AirflowSensorDecorator): + operator_type = SensorNames.EXTERNAL_TASK_SENSOR + # Docs: + # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor + name = "airflow_external_task_sensor" + defaults = dict( + **AirflowSensorDecorator.defaults, + external_dag_id=None, + external_task_ids=None, + allowed_states=[AIRFLOW_STATES["SUCCESS"]], + failed_states=None, + execution_delta=None, + check_existence=True, + # We cannot add `execution_date_fn` as it requires a python callable. + # Passing around a python callable is non-trivial since we are passing a + # callable from metaflow-code to airflow python script. Since we cannot + # transfer dependencies of the callable, we cannot gaurentee that the callable + # behave exactly as the user expects + ) + + def serialize_operator_args(self): + task_args = super().serialize_operator_args() + if task_args["execution_delta"] is not None: + task_args["execution_delta"] = dict( + seconds=task_args["execution_delta"].total_seconds() + ) + return task_args + + def validate(self): + if self.attributes["external_dag_id"] is None: + raise AirflowException( + "`%s` argument of `@%s`cannot be `None`." + % ("external_dag_id", self.name) + ) + + if type(self.attributes["allowed_states"]) == str: + if self.attributes["allowed_states"] not in list(AIRFLOW_STATES.values()): + raise AirflowException( + "`%s` is an invalid input of `%s` for `@%s`. Accepted values are %s" + % ( + str(self.attributes["allowed_states"]), + "allowed_states", + self.name, + ", ".join(list(AIRFLOW_STATES.values())), + ) + ) + elif type(self.attributes["allowed_states"]) == list: + enum_not_matched = [ + x + for x in self.attributes["allowed_states"] + if x not in list(AIRFLOW_STATES.values()) + ] + if len(enum_not_matched) > 0: + raise AirflowException( + "`%s` is an invalid input of `%s` for `@%s`. Accepted values are %s" + % ( + str(" OR ".join(["'%s'" % i for i in enum_not_matched])), + "allowed_states", + self.name, + ", ".join(list(AIRFLOW_STATES.values())), + ) + ) + else: + self.attributes["allowed_states"] = [AIRFLOW_STATES["SUCCESS"]] + + if self.attributes["execution_delta"] is not None: + if not isinstance(self.attributes["execution_delta"], timedelta): + raise AirflowException( + "`%s` is an invalid input type of `execution_delta` for `@%s`. Accepted type is `datetime.timedelta`" + % ( + str(type(self.attributes["execution_delta"])), + self.name, + ) + ) + super().validate() diff --git a/metaflow/plugins/airflow/sensors/s3_sensor.py b/metaflow/plugins/airflow/sensors/s3_sensor.py new file mode 100644 index 00000000000..b4f7ae5b6de --- /dev/null +++ b/metaflow/plugins/airflow/sensors/s3_sensor.py @@ -0,0 +1,26 @@ +from .base_sensor import AirflowSensorDecorator +from ..airflow_utils import SensorNames +from ..exception import AirflowException + + +class S3KeySensorDecorator(AirflowSensorDecorator): + name = "airflow_s3_key_sensor" + operator_type = SensorNames.S3_SENSOR + # Arg specification can be found here : + # https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3/index.html#airflow.providers.amazon.aws.sensors.s3.S3KeySensor + defaults = dict( + **AirflowSensorDecorator.defaults, + bucket_key=None, # Required + bucket_name=None, + wildcard_match=False, + aws_conn_id=None, + verify=None, # `verify (Optional[Union[str, bool]])` Whether or not to verify SSL certificates for S3 connection. + # `verify` is a airflow variable. + ) + + def validate(self): + if self.attributes["bucket_key"] is None: + raise AirflowException( + "`bucket_key` for `@%s`cannot be empty." % (self.name) + ) + super().validate() diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py new file mode 100644 index 00000000000..c97c41b283e --- /dev/null +++ b/metaflow/plugins/airflow/sensors/sql_sensor.py @@ -0,0 +1,31 @@ +from .base_sensor import AirflowSensorDecorator +from ..airflow_utils import SensorNames +from ..exception import AirflowException + + +class SQLSensorDecorator(AirflowSensorDecorator): + name = "airflow_sql_sensor" + operator_type = SensorNames.SQL_SENSOR + # Arg specification can be found here : + # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor + defaults = dict( + **AirflowSensorDecorator.defaults, + conn_id=None, + sql=None, + # success = None, # sucess/failure require callables. Wont be supported at start since not serialization friendly. + # failure = None, + parameters=None, + fail_on_empty=True, + ) + + def validate(self): + if self.attributes["conn_id"] is None: + raise AirflowException( + "`%s` argument of `@%s`cannot be `None`." % ("conn_id", self.name) + ) + raise _arg_exception("conn_id", self.name, None) + if self.attributes["sql"] is None: + raise AirflowException( + "`%s` argument of `@%s`cannot be `None`." % ("sql", self.name) + ) + super().validate() From a1edca6d7e625afd0d362fb604cb656080a5c4ef Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 24 Jan 2023 00:26:27 +0000 Subject: [PATCH 06/14] gcp support with AF --- metaflow/plugins/airflow/airflow.py | 7 ++++++- metaflow/plugins/airflow/airflow_cli.py | 11 ++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 9e8d0a0d398..0e8cbad4da4 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -23,6 +23,8 @@ DATASTORE_SYSROOT_AZURE, CARD_AZUREROOT, AIRFLOW_KUBERNETES_CONN_ID, + DATASTORE_SYSROOT_GS, + CARD_GSROOT, ) from metaflow.parameters import DelayedEvaluationParameter, deploy_time_eval from metaflow.plugins.kubernetes.kubernetes import Kubernetes @@ -361,7 +363,7 @@ def _to_job(self, node): "METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS), "METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3, "METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT, - "METAFLOW_DEFAULT_DATASTORE": "s3", + "METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE, "METAFLOW_DEFAULT_METADATA": "service", "METAFLOW_KUBERNETES_WORKLOAD": str( 1 @@ -376,6 +378,9 @@ def _to_job(self, node): "METAFLOW_AIRFLOW_JOB_ID": AIRFLOW_MACROS.AIRFLOW_JOB_ID, "METAFLOW_PRODUCTION_TOKEN": self.production_token, "METAFLOW_ATTEMPT_NUMBER": AIRFLOW_MACROS.ATTEMPT, + # GCP stuff + "METAFLOW_DATASTORE_SYSROOT_GS": DATASTORE_SYSROOT_GS, + "METAFLOW_CARD_GSROOT": CARD_GSROOT, } env[ "METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT" diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 1f48a1fa481..e9ea4521e28 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -389,10 +389,15 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): "Step *%s* is marked for execution on AWS Batch with Airflow which isn't currently supported." % node.name ) - - if flow_datastore.TYPE not in ("azure", "s3"): + SUPPORTED_DATASTORES = ("azure", "s3", "gs") + if flow_datastore.TYPE not in SUPPORTED_DATASTORES: raise AirflowException( - 'Datastore of type "s3" or "azure" required with `airflow create`' + "Datastore type `%s` is not supported with `airflow create`. " + "Please choose from datastore of type %s when calling `airflow create`" + % ( + str(flow_datastore.TYPE), + "or ".join(["`%s`" % x for x in SUPPORTED_DATASTORES]), + ) ) From 2c1c2fe028bee9d3eba8fe016e3d3b39af5db8ff Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 24 Jan 2023 00:28:13 +0000 Subject: [PATCH 07/14] Added additional variables for mwaa support. --- metaflow/metaflow_config.py | 4 ++++ metaflow/plugins/airflow/airflow.py | 19 ++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index c037b05ecf6..e5163725038 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -264,6 +264,10 @@ ) # This configuration sets `kubernetes_conn_id` in airflow's KubernetesPodOperator. AIRFLOW_KUBERNETES_CONN_ID = from_conf("AIRFLOW_KUBERNETES_CONN_ID") +AIRFLOW_KUBERNETES_KUBECONFIG_FILE = from_conf("AIRFLOW_KUBERNETES_KUBECONFIG_FILE") +AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT = from_conf( + "AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT" +) ### diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 0e8cbad4da4..e0b8c0790ee 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -23,6 +23,8 @@ DATASTORE_SYSROOT_AZURE, CARD_AZUREROOT, AIRFLOW_KUBERNETES_CONN_ID, + AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, + AIRFLOW_KUBERNETES_KUBECONFIG_FILE, DATASTORE_SYSROOT_GS, CARD_GSROOT, ) @@ -470,7 +472,22 @@ def _to_job(self, node): ) if AIRFLOW_KUBERNETES_CONN_ID is not None: k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID - else: + k8s_operator_args["in_cluster"] = False + if AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT is not None: + k8s_operator_args["cluster_context"] = AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT + k8s_operator_args["in_cluster"] = False + if AIRFLOW_KUBERNETES_KUBECONFIG_FILE is not None: + k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE + k8s_operator_args["in_cluster"] = False + + if all( + x is None + for x in [ + AIRFLOW_KUBERNETES_KUBECONFIG_FILE, + AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, + AIRFLOW_KUBERNETES_CONN_ID, + ] + ): k8s_operator_args["in_cluster"] = True if k8s_deco.attributes["secrets"]: From 104321475b9e8d63a8e7409f75e317eb2c237ce7 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 24 Jan 2023 00:51:19 +0000 Subject: [PATCH 08/14] refactor for simplicity. --- metaflow/plugins/airflow/airflow.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index e0b8c0790ee..945681b3983 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -470,6 +470,7 @@ def _to_job(self, node): reattach_on_restart=False, secrets=[], ) + k8s_operator_args["in_cluster"] = True if AIRFLOW_KUBERNETES_CONN_ID is not None: k8s_operator_args["kubernetes_conn_id"] = AIRFLOW_KUBERNETES_CONN_ID k8s_operator_args["in_cluster"] = False @@ -480,16 +481,6 @@ def _to_job(self, node): k8s_operator_args["config_file"] = AIRFLOW_KUBERNETES_KUBECONFIG_FILE k8s_operator_args["in_cluster"] = False - if all( - x is None - for x in [ - AIRFLOW_KUBERNETES_KUBECONFIG_FILE, - AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT, - AIRFLOW_KUBERNETES_CONN_ID, - ] - ): - k8s_operator_args["in_cluster"] = True - if k8s_deco.attributes["secrets"]: if isinstance(k8s_deco.attributes["secrets"], str): k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"].split(",") From cb274964e640ba8da17d5e155f7476a58cbe3b05 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Mon, 30 Jan 2023 23:24:59 +0000 Subject: [PATCH 09/14] Added docstrings for sensors: - Fixed SQL Sensor import. --- metaflow/plugins/airflow/airflow_utils.py | 12 +++++- .../airflow/sensors/external_task_sensor.py | 40 +++++++++++++++++++ metaflow/plugins/airflow/sensors/s3_sensor.py | 40 +++++++++++++++++++ .../plugins/airflow/sensors/sql_sensor.py | 37 +++++++++++++++++ 4 files changed, 128 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index c94b6734818..51774486c49 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -424,7 +424,17 @@ def _get_sensor(name): ) return S3KeySensor elif name == SensorNames.SQL_SENSOR: - from airflow.sensors.sql import SqlSensor + try: + from airflow.sensors.sql import SqlSensor + except ImportError as e: + try: + from airflow.providers.common.sql.sensors.sql import SqlSensor + except ImportError as e: + raise AirflowSensorNotFound( + "This DAG requires a `SqlSensor`. " + "Install the Airflow SQL provider using : " + "`pip install apache-airflow-providers-common-sql`" + ) return SqlSensor diff --git a/metaflow/plugins/airflow/sensors/external_task_sensor.py b/metaflow/plugins/airflow/sensors/external_task_sensor.py index 649edba706c..c599a05b4a0 100644 --- a/metaflow/plugins/airflow/sensors/external_task_sensor.py +++ b/metaflow/plugins/airflow/sensors/external_task_sensor.py @@ -18,6 +18,46 @@ class ExternalTaskSensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_external_task_sensor` decorator attaches a Airflow [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) before the start step of the flow. + This decorator only works when a flow is scheduled on Airflow and is compiled using `airflow create`. More than one `@airflow_external_task_sensor` can be added as a flow decorators. Adding more than one decorator will ensure that `start` step starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + external_dag_id : string + The dag_id that contains the task you want to wait for. + external_task_ids : List[string] + The list of task_ids that you want to wait for. + If None (default value) the sensor waits for the DAG. (Default: None) + allowed_states : List[string] + Iterable of allowed states, (Default: ['success']) + failed_states : List[string] + Iterable of failed or dis-allowed states. (Default: None) + execution_delta : datetime.timedelta + time difference with the previous execution to look at, + the default is the same logical date as the current task or DAG. (Default: None) + check_existence: bool + Set to True to check if the external task exists or check if + the DAG to wait for exists. (Default: True) + """ + operator_type = SensorNames.EXTERNAL_TASK_SENSOR # Docs: # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor diff --git a/metaflow/plugins/airflow/sensors/s3_sensor.py b/metaflow/plugins/airflow/sensors/s3_sensor.py index b4f7ae5b6de..6fac6725d20 100644 --- a/metaflow/plugins/airflow/sensors/s3_sensor.py +++ b/metaflow/plugins/airflow/sensors/s3_sensor.py @@ -4,6 +4,46 @@ class S3KeySensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_s3_key_sensor` decorator attaches a Airflow [S3KeySensor](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3/index.html#airflow.providers.amazon.aws.sensors.s3.S3KeySensor) + before the start step of the flow. This decorator only works when a flow is scheduled on Airflow + and is compiled using `airflow create`. More than one `@airflow_s3_key_sensor` can be + added as a flow decorators. Adding more than one decorator will ensure that `start` step + starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + bucket_key : str | List[str] + The key(s) being waited on. Supports full s3:// style url or relative path from root level. + When it's specified as a full s3:// url, please leave `bucket_name` as None + bucket_name : str + Name of the S3 bucket. Only needed when bucket_key is not provided as a full s3:// url. + When specified, all the keys passed to bucket_key refers to this bucket. (Default:None) + wildcard_match : bool + whether the bucket_key should be interpreted as a Unix wildcard pattern. (Default: False) + aws_conn_id : string + a reference to the s3 connection on Airflow. (Default: None) + verify : bool + Whether or not to verify SSL certificates for S3 connection. (Default: None) + """ + name = "airflow_s3_key_sensor" operator_type = SensorNames.S3_SENSOR # Arg specification can be found here : diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py index c97c41b283e..ad2379514bb 100644 --- a/metaflow/plugins/airflow/sensors/sql_sensor.py +++ b/metaflow/plugins/airflow/sensors/sql_sensor.py @@ -4,6 +4,43 @@ class SQLSensorDecorator(AirflowSensorDecorator): + """ + The `@airflow_sql_sensor` decorator attaches a Airflow [SqlSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor) + before the start step of the flow. This decorator only works when a flow is scheduled on Airflow + and is compiled using `airflow create`. More than one `@airflow_sql_sensor` can be + added as a flow decorators. Adding more than one decorator will ensure that `start` step + starts only after all sensors finish. + + Parameters + ---------- + timeout : int + Time, in seconds before the task times out and fails. (Default: 3600) + poke_interval : int + Time in seconds that the job should wait in between each try. (Default: 60) + mode : string + How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") + exponential_backoff : bool + allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) + pool : string + the slot pool this task should run in, + slot pools are a way to limit concurrency for certain tasks. (Default:None) + soft_fail : bool + Set to true to mark the task as SKIPPED on failure. (Default: False) + name : string + Name of the sensor on Airflow + description : string + Description of sensor in the Airflow UI + sql : str + The sql to run. To pass, it needs to return at least one cell that + contains a non-zero / empty string value. + parameters : List[str] | Dict + The parameters to render the SQL query with (optional). (Default: None) + fail_on_empty : bool + Explicitly fail on no rows returned. (Default: True) + conn_id : string + a reference to the SQL connection on Airflow. + """ + name = "airflow_sql_sensor" operator_type = SensorNames.SQL_SENSOR # Arg specification can be found here : From c102499d26ce0fe82a1c387c01a0c026850b8820 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 31 Jan 2023 00:51:09 +0000 Subject: [PATCH 10/14] Remove SQL Sensor from the code. --- metaflow/plugins/__init__.py | 1 - metaflow/plugins/airflow/airflow_utils.py | 16 ----- metaflow/plugins/airflow/sensors/__init__.py | 2 - .../plugins/airflow/sensors/sql_sensor.py | 68 ------------------- 4 files changed, 87 deletions(-) delete mode 100644 metaflow/plugins/airflow/sensors/sql_sensor.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 6ff7e4c05cd..a639348ed01 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -101,7 +101,6 @@ # Add Airflow sensor related flow decorators SENSOR_FLOW_DECORATORS = [ ("airflow_external_task_sensor", ".airflow.sensors.ExternalTaskSensorDecorator"), - ("airflow_sql_sensor", ".airflow.sensors.SQLSensorDecorator"), ("airflow_s3_key_sensor", ".airflow.sensors.S3KeySensorDecorator"), ] diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index 51774486c49..ba02453189e 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -196,7 +196,6 @@ def pathspec(cls, flowname, is_foreach=False): class SensorNames: EXTERNAL_TASK_SENSOR = "ExternalTaskSensor" S3_SENSOR = "S3KeySensor" - SQL_SENSOR = "SQLSensor" @classmethod def get_supported_sensors(cls): @@ -423,21 +422,6 @@ def _get_sensor(name): "`pip install apache-airflow-providers-amazon`" ) return S3KeySensor - elif name == SensorNames.SQL_SENSOR: - try: - from airflow.sensors.sql import SqlSensor - except ImportError as e: - try: - from airflow.providers.common.sql.sensors.sql import SqlSensor - except ImportError as e: - raise AirflowSensorNotFound( - "This DAG requires a `SqlSensor`. " - "Install the Airflow SQL provider using : " - "`pip install apache-airflow-providers-common-sql`" - ) - - return SqlSensor - def get_metaflow_kubernetes_operator(): try: diff --git a/metaflow/plugins/airflow/sensors/__init__.py b/metaflow/plugins/airflow/sensors/__init__.py index 02952d0c9a4..d788d704523 100644 --- a/metaflow/plugins/airflow/sensors/__init__.py +++ b/metaflow/plugins/airflow/sensors/__init__.py @@ -1,9 +1,7 @@ from .external_task_sensor import ExternalTaskSensorDecorator from .s3_sensor import S3KeySensorDecorator -from .sql_sensor import SQLSensorDecorator SUPPORTED_SENSORS = [ ExternalTaskSensorDecorator, S3KeySensorDecorator, - SQLSensorDecorator, ] diff --git a/metaflow/plugins/airflow/sensors/sql_sensor.py b/metaflow/plugins/airflow/sensors/sql_sensor.py deleted file mode 100644 index ad2379514bb..00000000000 --- a/metaflow/plugins/airflow/sensors/sql_sensor.py +++ /dev/null @@ -1,68 +0,0 @@ -from .base_sensor import AirflowSensorDecorator -from ..airflow_utils import SensorNames -from ..exception import AirflowException - - -class SQLSensorDecorator(AirflowSensorDecorator): - """ - The `@airflow_sql_sensor` decorator attaches a Airflow [SqlSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor) - before the start step of the flow. This decorator only works when a flow is scheduled on Airflow - and is compiled using `airflow create`. More than one `@airflow_sql_sensor` can be - added as a flow decorators. Adding more than one decorator will ensure that `start` step - starts only after all sensors finish. - - Parameters - ---------- - timeout : int - Time, in seconds before the task times out and fails. (Default: 3600) - poke_interval : int - Time in seconds that the job should wait in between each try. (Default: 60) - mode : string - How the sensor operates. Options are: { poke | reschedule }. (Default: "poke") - exponential_backoff : bool - allow progressive longer waits between pokes by using exponential backoff algorithm. (Default: True) - pool : string - the slot pool this task should run in, - slot pools are a way to limit concurrency for certain tasks. (Default:None) - soft_fail : bool - Set to true to mark the task as SKIPPED on failure. (Default: False) - name : string - Name of the sensor on Airflow - description : string - Description of sensor in the Airflow UI - sql : str - The sql to run. To pass, it needs to return at least one cell that - contains a non-zero / empty string value. - parameters : List[str] | Dict - The parameters to render the SQL query with (optional). (Default: None) - fail_on_empty : bool - Explicitly fail on no rows returned. (Default: True) - conn_id : string - a reference to the SQL connection on Airflow. - """ - - name = "airflow_sql_sensor" - operator_type = SensorNames.SQL_SENSOR - # Arg specification can be found here : - # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/sql/index.html#airflow.sensors.sql.SqlSensor - defaults = dict( - **AirflowSensorDecorator.defaults, - conn_id=None, - sql=None, - # success = None, # sucess/failure require callables. Wont be supported at start since not serialization friendly. - # failure = None, - parameters=None, - fail_on_empty=True, - ) - - def validate(self): - if self.attributes["conn_id"] is None: - raise AirflowException( - "`%s` argument of `@%s`cannot be `None`." % ("conn_id", self.name) - ) - raise _arg_exception("conn_id", self.name, None) - if self.attributes["sql"] is None: - raise AirflowException( - "`%s` argument of `@%s`cannot be `None`." % ("sql", self.name) - ) - super().validate() From 2acdb55fd1076ebecd85adb09f5781e255a9c265 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Mon, 30 Jan 2023 19:46:20 +0000 Subject: [PATCH 11/14] exception when Airflow with timezones. --- metaflow/plugins/airflow/airflow_cli.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index e9ea4521e28..3245e09bfb4 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -400,6 +400,15 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): ) ) + schedule = flow._flow_decorators.get("schedule") + if not schedule: + return + + if schedule.timezone is not None: + raise AirflowException( + "`airflow create` does not support scheduling with `timezone`." + ) + def resolve_dag_name(name): project = current.get("project_name") From 8586fe1039ff462fc6cb82d04b124addc2c21f7b Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Mon, 30 Jan 2023 19:55:14 +0000 Subject: [PATCH 12/14] Fix bug with multi decorator --- metaflow/plugins/airflow/airflow_cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 3245e09bfb4..c938f5e450c 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -403,7 +403,8 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): schedule = flow._flow_decorators.get("schedule") if not schedule: return - + + schedule = schedule[0] if schedule.timezone is not None: raise AirflowException( "`airflow create` does not support scheduling with `timezone`." From 628129a24c5135734ea985266ba4dea43f712719 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 31 Jan 2023 00:57:05 +0000 Subject: [PATCH 13/14] Fix linter changes. --- metaflow/plugins/airflow/airflow_cli.py | 2 +- metaflow/plugins/airflow/airflow_utils.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index c938f5e450c..6259abd2b76 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -403,7 +403,7 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout): schedule = flow._flow_decorators.get("schedule") if not schedule: return - + schedule = schedule[0] if schedule.timezone is not None: raise AirflowException( diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index ba02453189e..24b4b80e576 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -423,6 +423,7 @@ def _get_sensor(name): ) return S3KeySensor + def get_metaflow_kubernetes_operator(): try: from airflow.contrib.operators.kubernetes_pod_operator import ( From 975a1c025631383b7ecad14d7d1b8021824e2e3b Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Tue, 31 Jan 2023 02:04:45 +0000 Subject: [PATCH 14/14] Rebase bug fix. --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/aws/step_functions/step_functions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index caa61403e70..ff5c3e2a582 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -174,9 +174,9 @@ def trigger(cls, name, parameters=None): def _cron(self): schedule = self.flow._flow_decorators.get("schedule") - schedule = schedule[0] if schedule: # Remove the field "Year" if it exists + schedule = schedule[0] return " ".join(schedule.schedule.split()[:5]), schedule.timezone return None diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 370d4adec93..319afa4192c 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -328,8 +328,8 @@ def _visit(node, workflow, exit_node=None): def _cron(self): schedule = self.flow._flow_decorators.get("schedule") - schedule = schedule[0] if schedule: + schedule = schedule[0] if schedule.timezone is not None: raise StepFunctionsException( "Step Functions does not support scheduling with a timezone."