|
1 | | -#!/usr/bin/env python |
2 | | -import urlparse |
3 | | -import pkg_resources # part of setuptools |
4 | | -import urllib |
5 | | -import json |
6 | | -import time |
7 | | -import sys |
8 | | -import os |
9 | | -import argparse |
10 | | -import logging |
11 | | -import schema_salad.ref_resolver |
12 | | -import requests |
13 | | -from wes_service.util import visit |
14 | | -from bravado.client import SwaggerClient |
15 | | -from bravado.requests_client import RequestsClient |
16 | | - |
17 | | -def main(argv=sys.argv[1:]): |
18 | | - parser = argparse.ArgumentParser(description='Workflow Execution Service') |
19 | | - parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST")) |
20 | | - parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH")) |
21 | | - parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https")) |
22 | | - parser.add_argument("--quiet", action="store_true", default=False) |
23 | | - parser.add_argument("--outdir", type=str) |
24 | | - parser.add_argument("--page", type=str, default=None) |
25 | | - parser.add_argument("--page-size", type=int, default=None) |
26 | | - |
27 | | - exgroup = parser.add_mutually_exclusive_group() |
28 | | - exgroup.add_argument("--run", action="store_true", default=False) |
29 | | - exgroup.add_argument("--get", type=str, default=None) |
30 | | - exgroup.add_argument("--log", type=str, default=None) |
31 | | - exgroup.add_argument("--list", action="store_true", default=False) |
32 | | - exgroup.add_argument("--info", action="store_true", default=False) |
33 | | - exgroup.add_argument("--version", action="store_true", default=False) |
34 | | - |
35 | | - exgroup = parser.add_mutually_exclusive_group() |
36 | | - exgroup.add_argument("--wait", action="store_true", default=True, dest="wait") |
37 | | - exgroup.add_argument("--no-wait", action="store_false", default=True, dest="wait") |
38 | | - |
39 | | - parser.add_argument("workflow_url", type=str, nargs="?", default=None) |
40 | | - parser.add_argument("job_order", type=str, nargs="?", default=None) |
41 | | - args = parser.parse_args(argv) |
42 | | - |
43 | | - if args.version: |
44 | | - pkg = pkg_resources.require("wes_service") |
45 | | - print(u"%s %s" % (sys.argv[0], pkg[0].version)) |
46 | | - exit(0) |
47 | | - |
48 | | - http_client = RequestsClient() |
49 | | - split = urlparse.urlsplit("%s://%s/" % (args.proto, args.host)) |
50 | | - |
51 | | - http_client.set_api_key( |
52 | | - split.hostname, args.auth, |
53 | | - param_name='Authorization', param_in='header') |
54 | | - client = SwaggerClient.from_url( |
55 | | - "%s://%s/ga4gh/wes/v1/swagger.json" % (args.proto, args.host), |
56 | | - http_client=http_client, config={'use_models': False}) |
57 | | - |
58 | | - if args.list: |
59 | | - response = client.WorkflowExecutionService.ListWorkflows(page_token=args.page, page_size=args.page_size) |
60 | | - json.dump(response.result(), sys.stdout, indent=4) |
61 | | - return 0 |
62 | | - |
63 | | - if args.log: |
64 | | - response = client.WorkflowExecutionService.GetWorkflowLog( |
65 | | - workflow_id=args.log) |
66 | | - sys.stdout.write(response.result()["workflow_log"]["stderr"]) |
67 | | - return 0 |
68 | | - |
69 | | - if args.get: |
70 | | - response = client.WorkflowExecutionService.GetWorkflowLog( |
71 | | - workflow_id=args.get) |
72 | | - json.dump(response.result(), sys.stdout, indent=4) |
73 | | - return 0 |
74 | | - |
75 | | - if args.info: |
76 | | - response = client.WorkflowExecutionService.GetServiceInfo() |
77 | | - json.dump(response.result(), sys.stdout, indent=4) |
78 | | - return 0 |
79 | | - |
80 | | - loader = schema_salad.ref_resolver.Loader({ |
81 | | - "location": {"@type": "@id"}, |
82 | | - "path": {"@type": "@id"} |
83 | | - }) |
84 | | - input, _ = loader.resolve_ref(args.job_order) |
85 | | - |
86 | | - basedir = os.path.dirname(args.job_order) |
87 | | - |
88 | | - def fixpaths(d): |
89 | | - if isinstance(d, dict): |
90 | | - if "path" in d: |
91 | | - if ":" not in d["path"]: |
92 | | - local_path = os.path.normpath( |
93 | | - os.path.join(os.getcwd(), basedir, d["path"])) |
94 | | - d["location"] = urllib.pathname2url(local_path) |
95 | | - else: |
96 | | - d["location"] = d["path"] |
97 | | - del d["path"] |
98 | | - loc = d.get("location", "") |
99 | | - if d.get("class") == "Directory": |
100 | | - if loc.startswith("http:") or loc.startswith("https:"): |
101 | | - logging.error("Directory inputs not supported with http references") |
102 | | - exit(33) |
103 | | - # if not (loc.startswith("http:") or loc.startswith("https:") |
104 | | - # or args.job_order.startswith("http:") or args.job_order.startswith("https:")): |
105 | | - # logging.error("Upload local files not supported, must use http: or https: references.") |
106 | | - # exit(33) |
107 | | - |
108 | | - visit(input, fixpaths) |
109 | | - |
110 | | - workflow_url = args.workflow_url |
111 | | - if not workflow_url.startswith("/") and ":" not in workflow_url: |
112 | | - workflow_url = "file://" + os.path.abspath(workflow_url) |
113 | | - |
114 | | - if args.quiet: |
115 | | - logging.basicConfig(level=logging.WARNING) |
116 | | - else: |
117 | | - logging.basicConfig(level=logging.INFO) |
118 | | - |
119 | | - parts = [ |
120 | | - ("workflow_params", json.dumps(input)), |
121 | | - ("workflow_type", "CWL"), |
122 | | - ("workflow_type_version", "v1.0") |
123 | | - ] |
124 | | - |
125 | | - if workflow_url.startswith("file://"): |
126 | | - # with open(workflow_url[7:], "rb") as f: |
127 | | - # body["workflow_descriptor"] = f.read() |
128 | | - rootdir = os.path.dirname(workflow_url[7:]) |
129 | | - dirpath = rootdir |
130 | | - #for dirpath, dirnames, filenames in os.walk(rootdir): |
131 | | - for f in os.listdir(rootdir): |
132 | | - if f.startswith("."): |
133 | | - continue |
134 | | - fn = os.path.join(dirpath, f) |
135 | | - if os.path.isfile(fn): |
136 | | - parts.append(('workflow_descriptor', (fn[len(rootdir)+1:], open(fn, "rb")))) |
137 | | - parts.append(("workflow_url", os.path.basename(workflow_url[7:]))) |
138 | | - else: |
139 | | - parts.append(("workflow_url", workflow_url)) |
140 | | - |
141 | | - postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/workflows" % (args.proto, args.host), |
142 | | - files=parts, |
143 | | - headers={"Authorization": args.auth}) |
144 | | - |
145 | | - r = json.loads(postresult.text) |
146 | | - |
147 | | - if postresult.status_code != 200: |
148 | | - logging.error("%s", r) |
149 | | - exit(1) |
150 | | - |
151 | | - if args.wait: |
152 | | - logging.info("Workflow id is %s", r["workflow_id"]) |
153 | | - else: |
154 | | - sys.stdout.write(r["workflow_id"]+"\n") |
155 | | - exit(0) |
156 | | - |
157 | | - r = client.WorkflowExecutionService.GetWorkflowStatus( |
158 | | - workflow_id=r["workflow_id"]).result() |
159 | | - while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"): |
160 | | - time.sleep(8) |
161 | | - r = client.WorkflowExecutionService.GetWorkflowStatus( |
162 | | - workflow_id=r["workflow_id"]).result() |
163 | | - |
164 | | - logging.info("State is %s", r["state"]) |
165 | | - |
166 | | - s = client.WorkflowExecutionService.GetWorkflowLog( |
167 | | - workflow_id=r["workflow_id"]).result() |
168 | | - logging.info("%s", s["workflow_log"]["stderr"]) |
169 | | - logs = requests.get(s["workflow_log"]["stderr"], headers={"Authorization": args.auth}).text |
170 | | - logging.info("Workflow log:\n"+logs) |
171 | | - |
172 | | - if "fields" in s["outputs"] and s["outputs"]["fields"] is None: |
173 | | - del s["outputs"]["fields"] |
174 | | - json.dump(s["outputs"], sys.stdout, indent=4) |
175 | | - |
176 | | - if r["state"] == "COMPLETE": |
177 | | - return 0 |
178 | | - else: |
179 | | - return 1 |
180 | | - |
181 | | - |
182 | | -if __name__ == "__main__": |
183 | | - sys.exit(main(sys.argv[1:])) |
0 commit comments