Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
DATASTORE_SYSROOT_AZURE,
CARD_AZUREROOT,
AIRFLOW_KUBERNETES_CONN_ID,
DATASTORE_SYSROOT_GS,
CARD_GSROOT,
)
from metaflow.parameters import DelayedEvaluationParameter, deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import Kubernetes
Expand Down Expand Up @@ -361,7 +363,7 @@ def _to_job(self, node):
"METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
"METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3,
"METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
"METAFLOW_DEFAULT_DATASTORE": "s3",
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": "service",
"METAFLOW_KUBERNETES_WORKLOAD": str(
1
Expand All @@ -376,6 +378,9 @@ def _to_job(self, node):
"METAFLOW_AIRFLOW_JOB_ID": AIRFLOW_MACROS.AIRFLOW_JOB_ID,
"METAFLOW_PRODUCTION_TOKEN": self.production_token,
"METAFLOW_ATTEMPT_NUMBER": AIRFLOW_MACROS.ATTEMPT,
# GCP stuff
"METAFLOW_DATASTORE_SYSROOT_GS": DATASTORE_SYSROOT_GS,
"METAFLOW_CARD_GSROOT": CARD_GSROOT,
}
env[
"METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT"
Expand Down
11 changes: 8 additions & 3 deletions metaflow/plugins/airflow/airflow_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,15 @@ def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout):
"Step *%s* is marked for execution on AWS Batch with Airflow which isn't currently supported."
% node.name
)

if flow_datastore.TYPE not in ("azure", "s3"):
SUPPORTED_DATASTORES = ("azure", "s3", "gs")
if flow_datastore.TYPE not in SUPPORTED_DATASTORES:
raise AirflowException(
'Datastore of type "s3" or "azure" required with `airflow create`'
"Datastore type `%s` is not supported with `airflow create`. "
"Please choose from datastore of type %s when calling `airflow create`"
% (
str(flow_datastore.TYPE),
"or ".join(["`%s`" % x for x in SUPPORTED_DATASTORES]),
)
)


Expand Down