The diagram below illustrates the architecture of the ACES Workflow Management system which is based on the Prefect framework.
- Cloud and Edge Kubernetes clusters
kubectlconfigured to access the clustershelminstalled and configured to access the clusters
The MinIO will be used on the cloud cluster to store:
- input and output data of Prefect flows.
The MinIO will be used on the edge cluster to store:
- intermediate results of Prefect flow tasks.
NB! MinIO credentials are set to admin/martel2024.
cd deployment/minioConfigure KUBECONFIG to point to your cloud or edge clusters accordingly and
then run the automated deployment script that handles everything:
./deploy-minio.shFollow the instructions at the end of the script to complete the deployment.
Use mc to create the prefect bucket and set it to public.
kubectl apply -f s3-add-bucket-prefect-job.yamlAlternatively, you can use Nuvla to deploy MinIO for Prefect with local path storage. Deploy the applications in the following order.
- Local Path Storage: https://nuvla.io/ui/apps/aces/edge-cloud-infrastructure/workflow-management/minio-for-prefect/local-path-storage
- MinIO Operator: https://nuvla.io/ui/apps/aces/edge-cloud-infrastructure/workflow-management/minio-for-prefect/minio-operator
- MinIO Tenant: https://nuvla.io/ui/apps/aces/edge-cloud-infrastructure/workflow-management/minio-for-prefect/minio-tenant
- Add the
acesbucket to the MinIO tenant: https://nuvla.io/ui/apps/aces/edge-cloud-infrastructure/workflow-management/minio-for-prefect/aces-bucket-in-minio/
If you need to cleanup after terminating the deployment, you can delete the MinIO related resources using:
kubectl delete crd tenants.minio.min.io policybindings.sts.min.io --ignore-not-found
kubectl delete clusterrole minio-operator-role --ignore-not-found
kubectl delete clusterrolebinding minio-operator-role-binding --ignore-not-found
kubectl delete clusterrolebinding minio-operator-binding --ignore-not-foundThe following explains how to deploy Prefect Server with PostgreSQL.
Deploy using Nuvla:
https://nuvla.io/ui/apps/aces/edge-cloud-infrastructure/workflow-management/prefect
Alternatively, deploy using CLI script:
cd deployment/prefect/server
./deploy-prefect-server.shValidate the the server is running
$ kubectl -n prefect get pods
NAME READY STATUS RESTARTS AGE
prefect-server-6b7b745577-7z2w4 1/1 Running 0 2d12h
prefect-server-postgresql-0 1/1 Running 0 2d12hFor accessing Prefect UI and API from outside the cluster, you can use port-forwarding.
kubectl -n prefect port-forward svc/prefect-server 4200:4200 --address=0.0.0.0To validate Prefect is available, run
export PREFECT_API_URL="http://<hostname|IP>:4200/api"
prefect versionYou can also test the connection with:
prefect config viewTo access UI create a tunnel to the Prefect server. From your local machine, run
ssh -L 4200:localhost:4200 root@<hostname|IP> -N
Then, from your local machine, access the Prefect UI at http://localhost:4200.
The following approach to data management for Prefect flows was taken:
-
Cloud S3 (“minio-data”, “minio-results”) holds inputs/outputs. Edge flows pull inputs from Cloud S3 and push final results back. This centralizes artifacts and simplifies discovery and governance.
-
Edge‑local S3 for intermediate/ephemeral artifacts and parameter exchange reduces egress, latency, and avoids pushing large intermediates to the cloud unnecessarily.
See the conceptual diagram above with S3 public and S3 local storage respectively.
Follow the instructions in this README.md.
The Prefect Worker is intended to be deployed on K8s clusters on edge devices. It, then connects to the Prefect Server to a predefined pool and listens to the actions to execute.
Deploy Prefect Worker on the edge devices using Nuvla:
https://nuvla.io/ui/apps/aces/edge-cloud-infrastructure/workflow-management/prefect-worker
After that deploy the K8s ServiceAccount to be able to deploy K8s flows:
Run the following tests to validate the deployment and configuration of the Prefect and S3 storages.
This will:
- Register the deployment to the
acespool - Create it under the name
hello-k8s-s3 - Use the flow defined in
hello_k8s.py:hello - The code will be uploaded to Cloud S3 and taken from there by worker
Make sure your Prefect server is configured and reachable by running
$ prefect config view
🚀 you are connected to:
http://localhost:4200
PREFECT_PROFILE='ephemeral'
PREFECT_API_URL='http://localhost:4200/api' (from env)
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE='true' (from profile)
$cd tests/hello-k8s
python deploy_hello_k8s.pyList the deployments
prefect deployment lsAnd trigger a run:
prefect deployment run 'hello/hello-k8s-s3'To check the runs of the flow, use:
prefect flow-run lsDeploy the UC1 Load Sensitivity Analysis flow with MinIO integration:
cd IPTO/UC1/uc1_prefect
python flow_with_minio.py # Deploy the flowThis will create a deployment named uc1-load-sensitivity-minio that:
- Loads input data from MinIO (if any)
- Runs the load sensitivity analysis
- Saves all output files (Excel, PNG plots) to MinIO storage
- Stores intermediate results in MinIO
Run the deployment:
prefect deployment run uc1-load-sensitivity-analysis/uc1-load-sensitivity-minioFor users deploying their own workloads, see the comprehensive MinIO Integration Guide which covers:
- How to configure flows to use MinIO for input/output files
- Different deployment methods (local code vs. MinIO-stored code)
- Best practices for file organization
- Example code for common patterns
- Troubleshooting guide
from prefect import flow, task
from prefect_aws import S3Bucket
@task
def load_data(input_path: str):
storage = S3Bucket.load("minio-data-storage")
local_path = storage.download_object_to_path(input_path, "temp_input.csv")
# Process your data
local_path.unlink() # Clean up
@task
def save_results(data, output_path: str):
storage = S3Bucket.load("minio-data-storage")
# Save data locally first, then upload
storage.upload_from_path("local_file.csv", output_path)
@flow(result_storage="prefect/minio-result-storage")
def my_workload():
data = load_data("input/my_data.csv")
# Your processing...
save_results(data, "output/my_results.csv")
if __name__ == "__main__":
my_workload.deploy(
name="my-workload",
work_pool_name="aces",
job_variables={"env": {"EXTRA_PIP_PACKAGES": "prefect-aws s3fs pandas"}}
)