Skip to content

Commit 00ae988

Browse files
authored
Merge pull request #102 from controlm/support-run_on_demand
Support run ondemand Automation-API Control-M
2 parents 1f216fa + 81d198c commit 00ae988

File tree

4 files changed

+288
-9
lines changed

4 files changed

+288
-9
lines changed

src/aapi/bases.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
import enum
33
import typing
44
import json
5+
import random
6+
from ctm_python_client.core.comm import Environment
7+
from ctm_python_client.core.monitoring import RunMonitor
58

69
class AAPIJob:
710
pass
@@ -57,4 +60,42 @@ def dump_aapi(self, f, indent=None):
5760

5861
def as_dict(self):
5962
return attrs.asdict(self)
60-
63+
64+
def run_on_demand(self, environment: Environment, inpath: str = f'run_on_demand{random.randint(100,999)}', controlm_server: str = None,
65+
run_as: str = None, host: str = None, application: str = None, sub_application: str = None, skip_login: bool = False,
66+
file_path: str = None, delete_afterwards: bool = True, open_in_browser: str = None) -> RunMonitor:
67+
# Import circular dependency
68+
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
69+
from aapi import Job, Folder
70+
71+
if isinstance(self, Job) or (hasattr(self, 'job_list') and self.job_list is not None and len(self.job_list) > 0):
72+
try:
73+
on_demand_workflow = Workflow(
74+
environment,
75+
WorkflowDefaults(
76+
controlm_server=controlm_server,
77+
run_as=run_as,
78+
host=host,
79+
application=application,
80+
sub_application=sub_application
81+
)
82+
)
83+
if isinstance(self, Folder):
84+
on_demand_workflow.add(self)
85+
else:
86+
on_demand_workflow.add(self, inpath=inpath)
87+
88+
on_demand_workflow.run_on_demand(
89+
skip_login=skip_login,
90+
file_path=file_path,
91+
delete_afterwards=delete_afterwards,
92+
open_in_browser=open_in_browser
93+
)
94+
except Exception as e:
95+
errors = [err.get('message', '') + ' ' + err.get('item', '')
96+
for err in json.loads(e.body)['errors']]
97+
raise RuntimeError(f"AAPI request failed: {', '.join(errors)}")
98+
finally:
99+
on_demand_workflow.clear_all()
100+
else:
101+
raise Exception('Run is not allowed for json without jobs')

src/clients/ctm_api_client/api/run_api.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3413,6 +3413,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501
34133413
collection_formats=collection_formats,
34143414
)
34153415

3416+
def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501
3417+
"""Run a job on demand # noqa: E501
3418+
3419+
Run a job on demand # noqa: E501
3420+
This method makes a synchronous HTTP request by default. To make an
3421+
asynchronous HTTP request, please pass async_req=True
3422+
>>> thread = api.run_on_demand(definitions_file, async_req=True)
3423+
>>> result = thread.get()
3424+
3425+
:param async_req bool
3426+
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
3427+
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
3428+
:param annotation - user annotation
3429+
:return: RunResult
3430+
If the method is called asynchronously,
3431+
returns the request thread.
3432+
"""
3433+
kwargs['_return_http_data_only'] = True
3434+
if kwargs.get('async_req'):
3435+
return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
3436+
else:
3437+
(data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
3438+
return data
3439+
3440+
3441+
def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501
3442+
"""Run a job on demand # noqa: E501
3443+
3444+
Run a job on demand # noqa: E501
3445+
This method makes a synchronous HTTP request by default. To make an
3446+
asynchronous HTTP request, please pass async_req=True
3447+
>>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True)
3448+
>>> result = thread.get()
3449+
3450+
:param async_req bool
3451+
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
3452+
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
3453+
:param annotation - user annotation
3454+
:return: RunResult
3455+
If the method is called asynchronously,
3456+
returns the request thread.
3457+
"""
3458+
3459+
all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501
3460+
all_params.append('async_req')
3461+
all_params.append('_return_http_data_only')
3462+
all_params.append('_preload_content')
3463+
all_params.append('_request_timeout')
3464+
3465+
params = locals()
3466+
for key, val in six.iteritems(params['kwargs']):
3467+
if key not in all_params:
3468+
raise TypeError(
3469+
"Got an unexpected keyword argument '%s'"
3470+
" to method run_on_demand" % key
3471+
)
3472+
params[key] = val
3473+
del params['kwargs']
3474+
# verify the required parameter 'definitions_file' is set
3475+
if self.api_client.client_side_validation and ('definitions_file' not in params or
3476+
params['definitions_file'] is None): # noqa: E501
3477+
raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501
3478+
3479+
collection_formats = {}
3480+
3481+
path_params = {}
3482+
3483+
query_params = []
3484+
3485+
header_params = annotation.get_annotation() if annotation else {}
3486+
3487+
form_params = []
3488+
local_var_files = {}
3489+
if 'definitions_file' in params:
3490+
local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501
3491+
if 'deploy_descriptor_file' in params:
3492+
local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501
3493+
3494+
body_params = None
3495+
# HTTP header `Accept`
3496+
header_params['Accept'] = self.api_client.select_header_accept(
3497+
['application/json']) # noqa: E501
3498+
3499+
# HTTP header `Content-Type`
3500+
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
3501+
['multipart/form-data']) # noqa: E501
3502+
3503+
# Authentication setting
3504+
auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501
3505+
3506+
return self.api_client.call_api(
3507+
'/run/ondemand', 'POST',
3508+
path_params,
3509+
query_params,
3510+
header_params,
3511+
body=body_params,
3512+
post_params=form_params,
3513+
files=local_var_files,
3514+
response_type='RunResult', # noqa: E501
3515+
auth_settings=auth_settings,
3516+
async_req=params.get('async_req'),
3517+
_return_http_data_only=params.get('_return_http_data_only'),
3518+
_preload_content=params.get('_preload_content', True),
3519+
_request_timeout=params.get('_request_timeout'),
3520+
collection_formats=collection_formats)
3521+
34163522
def run_now(self, job_id, **kwargs): # noqa: E501
34173523
"""Bypass scheduling cretirias and start the job # noqa: E501
34183524

src/clients/ctm_saas_client/api/run_api.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2723,6 +2723,112 @@ def run_jobs_with_http_info(self, job_definitions_file, **kwargs): # noqa: E501
27232723
_request_timeout=params.get('_request_timeout'),
27242724
collection_formats=collection_formats)
27252725

2726+
def run_on_demand(self, definitions_file, annotation=None, **kwargs): # noqa: E501
2727+
"""Run a job on demand # noqa: E501
2728+
2729+
Run a job on demand # noqa: E501
2730+
This method makes a synchronous HTTP request by default. To make an
2731+
asynchronous HTTP request, please pass async_req=True
2732+
>>> thread = api.run_on_demand(definitions_file, async_req=True)
2733+
>>> result = thread.get()
2734+
2735+
:param async_req bool
2736+
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
2737+
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
2738+
:param annotation - user annotation
2739+
:return: RunResult
2740+
If the method is called asynchronously,
2741+
returns the request thread.
2742+
"""
2743+
kwargs['_return_http_data_only'] = True
2744+
if kwargs.get('async_req'):
2745+
return self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
2746+
else:
2747+
(data) = self.run_on_demand_with_http_info(definitions_file, annotation, **kwargs) # noqa: E501
2748+
return data
2749+
2750+
2751+
def run_on_demand_with_http_info(self, definitions_file, annotation, **kwargs): # noqa: E501
2752+
"""Run a job on demand # noqa: E501
2753+
2754+
Run a job on demand # noqa: E501
2755+
This method makes a synchronous HTTP request by default. To make an
2756+
asynchronous HTTP request, please pass async_req=True
2757+
>>> thread = api.run_on_demand_with_http_info(definitions_file, async_req=True)
2758+
>>> result = thread.get()
2759+
2760+
:param async_req bool
2761+
:param file definitions_file: A file that contains jobs definitions to be run on demand (required)
2762+
:param file deploy_descriptor_file: Deploy Descriptor JSON file.
2763+
:param annotation - user annotation
2764+
:return: RunResult
2765+
If the method is called asynchronously,
2766+
returns the request thread.
2767+
"""
2768+
2769+
all_params = ['definitions_file', 'deploy_descriptor_file'] # noqa: E501
2770+
all_params.append('async_req')
2771+
all_params.append('_return_http_data_only')
2772+
all_params.append('_preload_content')
2773+
all_params.append('_request_timeout')
2774+
2775+
params = locals()
2776+
for key, val in six.iteritems(params['kwargs']):
2777+
if key not in all_params:
2778+
raise TypeError(
2779+
"Got an unexpected keyword argument '%s'"
2780+
" to method run_on_demand" % key
2781+
)
2782+
params[key] = val
2783+
del params['kwargs']
2784+
# verify the required parameter 'definitions_file' is set
2785+
if self.api_client.client_side_validation and ('definitions_file' not in params or
2786+
params['definitions_file'] is None): # noqa: E501
2787+
raise ValueError("Missing the required parameter `definitions_file` when calling `run_on_demand`") # noqa: E501
2788+
2789+
collection_formats = {}
2790+
2791+
path_params = {}
2792+
2793+
query_params = []
2794+
2795+
header_params = annotation.get_annotation() if annotation else {}
2796+
2797+
form_params = []
2798+
local_var_files = {}
2799+
if 'definitions_file' in params:
2800+
local_var_files['definitionsFile'] = params['definitions_file'] # noqa: E501
2801+
if 'deploy_descriptor_file' in params:
2802+
local_var_files['deployDescriptorFile'] = params['deploy_descriptor_file'] # noqa: E501
2803+
2804+
body_params = None
2805+
# HTTP header `Accept`
2806+
header_params['Accept'] = self.api_client.select_header_accept(
2807+
['application/json']) # noqa: E501
2808+
2809+
# HTTP header `Content-Type`
2810+
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
2811+
['multipart/form-data']) # noqa: E501
2812+
2813+
# Authentication setting
2814+
auth_settings = ['ApiKeyAuth', 'Bearer'] # noqa: E501
2815+
2816+
return self.api_client.call_api(
2817+
'/run/ondemand', 'POST',
2818+
path_params,
2819+
query_params,
2820+
header_params,
2821+
body=body_params,
2822+
post_params=form_params,
2823+
files=local_var_files,
2824+
response_type='RunResult', # noqa: E501
2825+
auth_settings=auth_settings,
2826+
async_req=params.get('async_req'),
2827+
_return_http_data_only=params.get('_return_http_data_only'),
2828+
_preload_content=params.get('_preload_content', True),
2829+
_request_timeout=params.get('_request_timeout'),
2830+
collection_formats=collection_formats)
2831+
27262832
def run_now(self, job_id, **kwargs): # noqa: E501
27272833
"""Bypass scheduling cretirias and start the job # noqa: E501
27282834

src/ctm_python_client/core/workflow.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ class WorkflowDefaults:
6767
run_as: str = attrs.field(default=None, metadata={'applyon': [
6868
'Folder', 'SimpleFolder', 'SubFolder', 'Job']})
6969
host: str = attrs.field(default=None, metadata={'applyon': ['Job']})
70-
when: Job.When = attrs.field(default=None, metadata={
71-
'applyon': ['Folder', 'Job']})
70+
when: Job.When = attrs.field(default=None, metadata={'applyon': ['Folder', 'Job']})
7271
application: str = attrs.field(default=None, metadata={'applyon': [
7372
'Folder', 'SimpleFolder', 'SubFolder', 'Job']})
7473
sub_application: str = attrs.field(default=None, metadata={'applyon': [
@@ -129,7 +128,7 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True):
129128
pass
130129

131130
if inpath:
132-
if isinstance(obj, SubFolder) or isinstance(obj, Job):
131+
if isinstance(obj, SubFolder) or isinstance(obj, Job) or isinstance(obj, AAPIJob):
133132
try:
134133
folder = self.get(inpath)
135134
except Exception as e:
@@ -138,15 +137,15 @@ def add(self, obj: AAPIObject, inpath: str = None, allow_creation: bool = True):
138137
else:
139138
raise e
140139
if folder:
141-
if isinstance(obj, Job):
140+
if isinstance(obj, Job) or isinstance(obj, AAPIJob):
142141
folder.job_list.append(obj)
143142
else:
144143
folder.sub_folder_list.append(obj)
145144
else:
146145
# Folder does not exist, create folder
147146
folder, parent_folder = self.create_folder_hierarchy(inpath)
148147
self._apply_defaults_for_folder(folder)
149-
if isinstance(obj, Job):
148+
if isinstance(obj, Job) or isinstance(obj, AAPIJob):
150149
parent_folder.job_list.append(obj)
151150
else:
152151
parent_folder.sub_folder_list.append(obj)
@@ -408,13 +407,40 @@ def run(self, skip_login: bool = False, file_path: str = None, delete_afterwards
408407

409408
try:
410409
res = self.aapiclient.run_api.run_jobs(fpath.resolve())
411-
# return res
412410
run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri)
413-
if open_in_browser:
411+
if open_in_browser:
414412
run_.open_in_browser()
415413
return run_
416414
except Exception as e:
417-
raise e
415+
errors = [err.get('message', '') + ' ' + err.get('item', '')
416+
for err in json.loads(e.body)['errors']]
417+
raise RuntimeError(f"AAPI request failed: {', '.join(errors)}")
418+
finally:
419+
if delete_afterwards:
420+
fpath.unlink()
421+
422+
def run_on_demand(self, skip_login: bool = False, file_path: str = None, delete_afterwards: bool = True, open_in_browser=False) -> RunMonitor:
423+
if not skip_login:
424+
self.aapiclient.authenticate()
425+
426+
if not file_path:
427+
file_path = f'{tempfile.gettempdir()}/temp_{random.randint(1000,9999)}.json'
428+
429+
fpath = pathlib.Path(file_path)
430+
431+
with open(fpath.resolve(), 'w') as f:
432+
self.dump_json(f)
433+
434+
try:
435+
res = self.aapiclient.run_api.run_on_demand(fpath.resolve())
436+
run_ = RunMonitor(res.run_id, self.aapiclient, monitor_page_uri= res.monitor_page_uri)
437+
if open_in_browser:
438+
run_.open_in_browser()
439+
return run_
440+
except Exception as e:
441+
errors = [err.get('message', '') + ' ' + err.get('item', '')
442+
for err in json.loads(e.body)['errors']]
443+
raise RuntimeError(f"AAPI request failed: {', '.join(errors)}")
418444
finally:
419445
if delete_afterwards:
420446
fpath.unlink()

0 commit comments

Comments
 (0)