Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws/action-status.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def lambda_handler(event, context):
)

print("updated_response", update_response)

if failure:
print("FAILED ", failure)
status = "FAILED"
Expand Down
61 changes: 28 additions & 33 deletions aws/funcx-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,54 +33,49 @@ def lambda_handler(event, context):
monitor_by = body['monitor_by'] if 'monitor_by' in body else None
manage_by = body['manage_by'] if 'manage_by' in body else None

result = {
"action_id": action_id,
'status': 'ACTIVE',
'display_status': 'Function Submitted',
'details': None,
'monitor_by': monitor_by,
'manage_by': manage_by,
'start_time': now_isoformat(),
}

status_code = 202
action_status = 'ACTIVE'
display_status = 'Function Submitted'
details = None

batch_res = None
try:
batch = fxc.create_batch()

for task in body['body']['tasks']:
print(task)
payload = task.get('payload', None)
if payload:
batch.add(endpoint_id=task['endpoint'], function_id=task['function'],
**task['payload'])
else:
batch.add(endpoint_id=task['endpoint'], function_id=task['function'])
batch.add(endpoint_id=task['endpoint'], function_id=task['function'],
**task['payload'])

batch_res = fxc.batch_run(batch)
print({'action_id': action_id, 'tasks': batch_res})
result['details'] = batch_res
except Exception as eek:
print('FAILED ', eek)
result['status'] = 'FAILED'
result['display_status'] = 'Failed to submit tasks'
result['details'] = str(eek)
status_code = 400
print(batch_res)

# Create a dynamo record where the primary key is this action's ID
# Tasks is a dict by task_id and contains the eventual results from their
# execution. Where there are no more None results then the action is complete
if batch_res:
# Create a dynamo record where the primary key is this action's ID
# Tasks is a dict by task_id and contains the eventual results from their
# execution. Where there are no more None results then the action is complete
response = table.put_item(
Item={
'action-id': action_id,
'tasks': json.dumps({task_id: {"result": None, "completed": False} for task_id in batch_res})
}
)
print("Dynamo", response)

print("Status result", result)
except Exception as eek:
print("FAILED ", str(eek))
action_status = 'FAILED'
display_status = str(eek)
details = str(eek)
status_code = 400

result = {
"action_id": action_id,
'status': action_status,
'display_status': display_status,
'details': details,
'monitor_by': monitor_by,
'manage_by': manage_by,
'start_time': now_isoformat(),
}

print('Submit result', result)

return {
'statusCode': status_code,
'body': json.dumps(result)
Expand Down