Skip to content

Commit 4c3fd21

Browse files
committed
Reverting changes
1 parent 6ed5c2a commit 4c3fd21

File tree

2 files changed

+270
-0
lines changed

2 files changed

+270
-0
lines changed

wes_service/cwl_runner.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
from __future__ import print_function
2+
import json
3+
import os
4+
import subprocess
5+
import urllib
6+
import uuid
7+
8+
from wes_service.util import WESBackend
9+
10+
11+
class Workflow(object):
12+
def __init__(self, run_id):
13+
super(Workflow, self).__init__()
14+
self.run_id = run_id
15+
self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id)
16+
17+
def run(self, request, opts):
18+
"""
19+
Constructs a command to run a cwl/json from requests and opts,
20+
runs it, and deposits the outputs in outdir.
21+
22+
Runner:
23+
opts.getopt("runner", default="cwl-runner")
24+
25+
CWL (url):
26+
request["workflow_url"] == a url to a cwl file
27+
or
28+
request["workflow_descriptor"] == input cwl text (written to a file and a url constructed for that file)
29+
30+
JSON File:
31+
request["workflow_params"] == input json text (to be written to a file)
32+
33+
:param dict request: A dictionary containing the cwl/json information.
34+
:param wes_service.util.WESBackend opts: contains the user's arguments;
35+
specifically the runner and runner options
36+
:return: {"run_id": self.run_id, "state": state}
37+
"""
38+
os.makedirs(self.workdir)
39+
outdir = os.path.join(self.workdir, "outdir")
40+
os.mkdir(outdir)
41+
42+
with open(os.path.join(self.workdir, "request.json"), "w") as f:
43+
json.dump(request, f)
44+
45+
with open(os.path.join(
46+
self.workdir, "cwl.input.json"), "w") as inputtemp:
47+
json.dump(request["workflow_params"], inputtemp)
48+
49+
if request.get("workflow_descriptor"):
50+
workflow_descriptor = request.get('workflow_descriptor')
51+
with open(os.path.join(self.workdir, "workflow.cwl"), "w") as f:
52+
f.write(workflow_descriptor)
53+
workflow_url = urllib.pathname2url(os.path.join(self.workdir, "workflow.cwl"))
54+
else:
55+
workflow_url = request.get("workflow_url")
56+
57+
output = open(os.path.join(self.workdir, "cwl.output.json"), "w")
58+
stderr = open(os.path.join(self.workdir, "stderr"), "w")
59+
60+
runner = opts.getopt("runner", default="cwl-runner")
61+
extra = opts.getoptlist("extra")
62+
command_args = [runner] + extra + [workflow_url, inputtemp.name]
63+
proc = subprocess.Popen(command_args,
64+
stdout=output,
65+
stderr=stderr,
66+
close_fds=True,
67+
cwd=outdir)
68+
output.close()
69+
stderr.close()
70+
with open(os.path.join(self.workdir, "pid"), "w") as pid:
71+
pid.write(str(proc.pid))
72+
73+
return self.getstatus()
74+
75+
def getstate(self):
76+
"""
77+
Returns RUNNING, -1
78+
COMPLETE, 0
79+
or
80+
EXECUTOR_ERROR, 255
81+
"""
82+
state = "RUNNING"
83+
exit_code = -1
84+
85+
exitcode_file = os.path.join(self.workdir, "exit_code")
86+
pid_file = os.path.join(self.workdir, "pid")
87+
88+
if os.path.exists(exitcode_file):
89+
with open(exitcode_file) as f:
90+
exit_code = int(f.read())
91+
elif os.path.exists(pid_file):
92+
with open(pid_file, "r") as pid:
93+
pid = int(pid.read())
94+
try:
95+
(_pid, exit_status) = os.waitpid(pid, os.WNOHANG)
96+
if _pid != 0:
97+
exit_code = exit_status >> 8
98+
with open(exitcode_file, "w") as f:
99+
f.write(str(exit_code))
100+
os.unlink(pid_file)
101+
except OSError:
102+
os.unlink(pid_file)
103+
exit_code = 255
104+
105+
if exit_code == 0:
106+
state = "COMPLETE"
107+
elif exit_code != -1:
108+
state = "EXECUTOR_ERROR"
109+
110+
return state, exit_code
111+
112+
def getstatus(self):
113+
state, exit_code = self.getstate()
114+
115+
return {
116+
"run_id": self.run_id,
117+
"state": state
118+
}
119+
120+
def getlog(self):
121+
state, exit_code = self.getstate()
122+
123+
with open(os.path.join(self.workdir, "request.json"), "r") as f:
124+
request = json.load(f)
125+
126+
with open(os.path.join(self.workdir, "stderr"), "r") as f:
127+
stderr = f.read()
128+
129+
outputobj = {}
130+
if state == "COMPLETE":
131+
output_path = os.path.join(self.workdir, "cwl.output.json")
132+
with open(output_path, "r") as outputtemp:
133+
outputobj = json.load(outputtemp)
134+
135+
return {
136+
"run_id": self.run_id,
137+
"request": request,
138+
"state": state,
139+
"workflow_log": {
140+
"cmd": [""],
141+
"start_time": "",
142+
"end_time": "",
143+
"stdout": "",
144+
"stderr": stderr,
145+
"exit_code": exit_code
146+
},
147+
"task_logs": [],
148+
"outputs": outputobj
149+
}
150+
151+
def cancel(self):
152+
pass
153+
154+
155+
class CWLRunnerBackend(WESBackend):
156+
def GetServiceInfo(self):
157+
return {
158+
"workflow_type_versions": {
159+
"CWL": {"workflow_type_version": ["v1.0"]}
160+
},
161+
"supported_wes_versions": ["0.3.0"],
162+
"supported_filesystem_protocols": ["file", "http", "https"],
163+
"engine_versions": "cwl-runner",
164+
"system_state_counts": {},
165+
"key_values": {}
166+
}
167+
168+
def ListRuns(self, page_size=None, page_token=None, state_search=None):
169+
# FIXME #15 results don't page
170+
wf = []
171+
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
172+
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
173+
wf.append(Workflow(l))
174+
175+
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
176+
return {
177+
"workflows": workflows,
178+
"next_page_token": ""
179+
}
180+
181+
def RunWorkflow(self, **args):
182+
tempdir, body = self.collect_attachments()
183+
184+
run_id = uuid.uuid4().hex
185+
job = Workflow(run_id)
186+
187+
job.run(body, self)
188+
return {"run_id": run_id}
189+
190+
def GetRunLog(self, run_id):
191+
job = Workflow(run_id)
192+
return job.getlog()
193+
194+
def CancelRun(self, run_id):
195+
job = Workflow(run_id)
196+
job.cancel()
197+
return {"run_id": run_id}
198+
199+
def GetRunStatus(self, run_id):
200+
job = Workflow(run_id)
201+
return job.getstatus()
202+
203+
204+
def create_backend(app, opts):
205+
return CWLRunnerBackend(opts)

wes_service/util.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import tempfile
2+
import json
3+
import os
4+
5+
from six import itervalues
6+
import connexion
7+
from werkzeug.utils import secure_filename
8+
9+
10+
def visit(d, op):
11+
"""Recursively call op(d) for all list subelements and dictionary 'values' that d may have."""
12+
op(d)
13+
if isinstance(d, list):
14+
for i in d:
15+
visit(i, op)
16+
elif isinstance(d, dict):
17+
for i in itervalues(d):
18+
visit(i, op)
19+
20+
21+
class WESBackend(object):
22+
"""Stores and retrieves options. Intended to be inherited."""
23+
def __init__(self, opts):
24+
"""Parse and store options as a list of tuples."""
25+
self.pairs = []
26+
for o in opts if opts else []:
27+
k, v = o.split("=", 1)
28+
self.pairs.append((k, v))
29+
30+
def getopt(self, p, default=None):
31+
"""Returns the first option value stored that matches p or default."""
32+
for k, v in self.pairs:
33+
if k == p:
34+
return v
35+
return default
36+
37+
def getoptlist(self, p):
38+
"""Returns all option values stored that match p as a list."""
39+
optlist = []
40+
for k, v in self.pairs:
41+
if k == p:
42+
optlist.append(v)
43+
return optlist
44+
45+
def collect_attachments(self):
46+
tempdir = tempfile.mkdtemp()
47+
body = {}
48+
for k, ls in connexion.request.files.iterlists():
49+
for v in ls:
50+
if k == "workflow_descriptor":
51+
filename = secure_filename(v.filename)
52+
v.save(os.path.join(tempdir, filename))
53+
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
54+
body[k] = json.loads(v.read())
55+
else:
56+
body[k] = v.read()
57+
58+
if body['workflow_type'] != "CWL" or \
59+
body['workflow_type_version'] != "v1.0":
60+
return
61+
62+
if ":" not in body["workflow_url"]:
63+
body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"]))
64+
65+
return (tempdir, body)

0 commit comments

Comments
 (0)