Skip to content

Airflow Foreach + Sensors#1225

Closed
valayDave wants to merge 1 commit intovalay/multi-flow-decosfrom
valay/airflow-foreach-base-final
Closed

Airflow Foreach + Sensors#1225
valayDave wants to merge 1 commit intovalay/multi-flow-decosfrom
valay/airflow-foreach-base-final

Conversation

@valayDave
Copy link
Copy Markdown
Collaborator

@valayDave valayDave commented Jan 9, 2023

Stacked On Top of #1224

This PR enabled Airflow Sensor Support and Airflow Foreach Support.

Support for following Airflow sensor decorators:

  • @airflow_external_task_sensor
  • @airflow_s3_key_sensor

@airflow_s3_key_sensor

Sensor Args

The below flow will trigger when Airflow detects a x.json is present in the S3 Bucket path

from metaflow import FlowSpec, step
from metaflow import  airflow_s3_key_sensor

SENSOR_PATH = "s3://my-s3-bucket/sensor-data/x.json"

@airflow_s3_key_sensor(bucket_key=SENSOR_PATH,mode='poke')
class SensorFlow(FlowSpec):
        
        @step
        def start(self):
            self.x = 1
            self.next(self.b)

        @step
        def b(self):
            self.x += 1
            self.next(self.end)
        
        @step
        def end(self):
            self.x += 1
            print("Value of X is ", self.x)
    
if __name__ == "__main__":
    SensorFlow()

@airflow_external_task_sensor

Sensor Args

The below flow will get triggered when SourceDag-0 and SourceDag-1 with the same execution_date finish completion.

from metaflow import FlowSpec, step
from metaflow import  airflow_external_task_sensor, schedule

@airflow_external_task_sensor(name='x',external_dag_id='SourceDag-0',mode='poke')
@airflow_external_task_sensor(name='y',external_dag_id='SourceDag-1',mode='poke')
@schedule(cron='* * * * *')
class SensorFlow(FlowSpec):
        
        @step
        def start(self):
            self.x = 1
            self.next(self.b)

        @step
        def b(self):
            self.x += 1
            self.next(self.end)
        
        @step
        def end(self):
            self.x += 1
            print("Value of X is ", self.x)
    
if __name__ == "__main__":
    SensorFlow()

Copy link
Copy Markdown
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not review anything in airflow/. This needs to be rebased on 1212 prior to merging.

@valayDave valayDave force-pushed the valay/multi-flow-decos branch from 82f7640 to f8337ed Compare January 23, 2023 22:38
@valayDave valayDave force-pushed the valay/airflow-foreach-base-final branch 7 times, most recently from 29e860d to 21260c5 Compare January 27, 2023 19:54
@valayDave valayDave force-pushed the valay/airflow-foreach-base-final branch from 21260c5 to 9dbd6c8 Compare January 27, 2023 21:26
Copy link
Copy Markdown
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not review airflow specific parts but rest looks fine.

@valayDave valayDave force-pushed the valay/airflow-foreach-base-final branch from 9dbd6c8 to fe25ca0 Compare January 30, 2023 19:22
@valayDave valayDave force-pushed the valay/multi-flow-decos branch from 080513c to 24854af Compare January 30, 2023 19:22
@valayDave valayDave force-pushed the valay/airflow-foreach-base-final branch from fe25ca0 to 1ab2bcb Compare January 31, 2023 00:47
@valayDave valayDave force-pushed the valay/multi-flow-decos branch from 24854af to 75c690b Compare January 31, 2023 00:47
@valayDave valayDave closed this Jan 31, 2023
@savingoyal savingoyal deleted the valay/airflow-foreach-base-final branch May 16, 2023 03:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants