1+ import os
2+ import sys
3+ from pathlib import Path
4+ import subprocess
5+ import json
6+ from sys import exit
7+
8+
9+ class RunAction :
10+ def __init__ (self , metadata ):
11+ self .stk = sys .argv [0 ]
12+ self .action_base_path = os .path .dirname (os .path .abspath (__file__ ))
13+ self .inputs = {k : v for k , v in metadata .inputs .items () if v is not None }
14+
15+ def __call__ (self , action_name : str , ** inputs ):
16+ action_path = f'{ self .action_base_path } /{ action_name } '
17+ cmd = [self .stk , 'run' , 'action' , action_path , '--inputs-json' , json .dumps ({** self .inputs , ** inputs }),'--non-interactive' ]
18+ result = subprocess .run (cmd )
19+ if (result .returncode == 1 ):
20+ print (f"Fail to execute { result .args } ..." )
21+ exit (1 )
22+
23+
24+ def deploy_workflow (run_action : RunAction ):
25+ run_action ("runtime-create-manifest-action" )
26+ run_action ("runtime-manager-action" )
27+
28+ with open ('manager-output.log' , 'r' ) as file :
29+ data = json .loads (file .read ().replace ("\' " , "\" " ))
30+
31+ task_runners = dict (
32+ IAC_SELF_HOSTED = lambda ** i : run_action ("runtime-iac-action" , ** i ),
33+ DEPLOY_SELF_HOSTED = lambda ** i : run_action ("runtime-deploy-action" , ** i ),
34+ )
35+ for t in data ['tasks' ]:
36+ runner = task_runners .get (t ["taskType" ])
37+ runner and runner (run_task_id = t ["runTaskId" ])
38+
39+
40+ def cancel_deploy_run (run_action : RunAction ):
41+ run_action ("runtime-cancel-run-action" )
42+
43+
44+ def run (metadata ):
45+ workflows = dict (deploy = deploy_workflow , cancel_deploy = cancel_deploy_run )
46+ run_action = RunAction (metadata )
47+ workflow_runner = workflows .get (metadata .inputs ["workflow_type" ])
48+ workflow_runner and workflow_runner (run_action = run_action )
49+
0 commit comments