-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbalance_queue.py
More file actions
301 lines (258 loc) · 11.6 KB
/
balance_queue.py
File metadata and controls
301 lines (258 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""
Distribute priority jobs among accounts. To distribute non-priority jobs see myslurm.Squeue.balance.
###
# purpose: evenly redistributes jobs across available slurm accounts. Jobs are
# found via searching for the keyword among the squeue output fields;
# Helps speed up effective run time by spreading out the load.
###
###
# usage: python balance_queue.py [keyword] [parentdir]
#
# keyword is used to search across column data in queue
# parentdir is used to either find a previously saved list of accounts
# or is set to 'choose' so the user can run from command line
# and manually choose which accounts are used
#
# accounts are those slurm accounts that do not end with '_gpu' returned from the command:
# sshare -U --user $USER --format=Account
#
# to manually balance queue using all available accounts:
# python balance_queue.py
# to manually balance queue and choose among available accounts:
# python balance_queue.py $USER choose
# python balance_queue.py keyword choose
# as run in pipeline when balancing trim jobs from 01_trim.py:
# this looks for accounts.pkl in parentdir to determine accounts saved in 00_start.py
# python balance_queue.py trim /path/to/parentdir
#
# because of possible exit() commands in balance_queue, this should be run
# as a main program, or as a subprocess when run inside another python
# script.
###
### assumes
# export SQUEUE_FORMAT="%i %u %a %j %t %S %L %D %C %b %m %N (%r)"
###
# FUN FACTS
# 🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁
# balance_queue.py originated as part of the CoAdapTree project: github.com/CoAdapTree/varscan_pipeline
# 🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁🇨🇦🍁
"""
import os, shutil, sys, math, subprocess, time
from random import shuffle
from collections import Counter
from collections import defaultdict
import pythonimports as pyimp
import myslurm
def announceacctlens(accounts, fin, priority=True):
"""How many priority jobs does each account have?
Positional arguments:
accounts - dictionary with key = account_name, val = dict with key = pid and val = job info (squeue output)
fin - True if this is the final job announcement, otherwise the first announcement
"""
print("\t%s job announcement" % ("final" if fin is True else "first"))
if fin is True:
time.sleep(1)
for account in accounts:
num_acct = len(accounts[account])
status = "with Priority status " if priority is True else ""
print(f"\t{num_acct} jobs {status}on {account}")
pass
def adjustjob(acct, jobid):
"""Move job from one account to another."""
subprocess.Popen([shutil.which("scontrol"),
"update",
"Account=%s" % acct,
"JobId=%s" % str(jobid)])
pass
def getaccounts(sq, stage, user_accts):
"""
Count the number of priority jobs assigned to each account.
Positional arguments:
sq - Squeue class object, dict-like: keys for slurm_job_ids, values=info
stage - stage of pipeline, used as keyword to filter jobs in queue
user_accts - list of slurm accounts to use in balancing
"""
# get accounts with low priority
accounts = defaultdict(dict)
for pid, info in sq.items():
account = info.account
if account not in accounts and account in user_accts:
accounts[account] = {}
accounts[account][pid] = info
# if all user_accts have low priority, exit()
if len(accounts.keys()) == len(user_accts) and stage != "final":
print("\tall accounts have low priority, leaving queue as-is")
early_exit_decision = True
else:
early_exit_decision = False
if stage == "final":
early_exit_decision = True
return accounts, early_exit_decision
def getbalance(accounts, num):
"""Determine how many jobs should be given from one account to another.
Positional arguments:
accounts - dictionary with key = account_name, val = dict with key = pid and val = job info (squeue output)
num - number of accounts to balance among (this needs to be changed to object not number)
"""
sums = 0
for account in accounts:
sums += len(accounts[account].keys())
bal = math.ceil(sums / num)
return bal
def choose_accounts(accts):
print(
pyimp.ColorText(
"\nDetermining which slurm accounts are available for use by balance_queue.py"
).bold()
)
if len(accts) > 1:
keep = []
for acct in accts:
while True:
inp = input(pyimp.ColorText(
f"\tINPUT NEEDED: Do you want to use this account: {acct}? (yes | no): ").warn()
).lower()
if inp in ["yes", "no"]:
if inp == "yes":
print("\t\tkeeping %s" % acct)
keep.append(acct)
else:
print("\t\tignoring %s" % acct)
break
else:
print(pyimp.ColorText("Please respond with 'yes' or 'no'").fail())
else:
# no need to ask if they want to use the only account that's available, duh
keep = accts
# make sure they've chosen at least one account
while len(keep) == 0:
print(pyimp.ColorText("FAIL: You need to specify at least one account. Revisiting accounts...").fail())
keep = choose_accounts(accts)
return keep
def get_avail_accounts(parentdir=None, save=False):
"""Query slurm with sshare command to determine accounts available.
If called with parentdir=None, return all available accounts.
- Meant to be called from command line outside of pipeline. See also sys.argv input.
If called with parentdir='choose', allow user to choose accounts.
- Meant to be called from command line outside of pipeline. See also sys.argv input.
If called with save=True, confirm each account with user and save .pkl file in parentdir.
- save=True is only called from 00_start.py
Returns a list of accounts to balance queue.
"""
if parentdir is not None and save is False:
# if the accounts have already been chosen, just return them right away
# keep 'save is False' so 00_start can overwrite previous pkl and skip here
pkl = os.path.join(parentdir, "accounts.pkl")
if os.path.exists(pkl):
return pyimp.pklload(pkl)
# get a list of all available accounts
acctout = (subprocess.check_output([shutil.which("sshare"),
"-U",
"--user",
os.environ["USER"],
"--format=Account"]).decode("utf-8").split("\n"))
accts = [acct.split()[0] for acct in acctout if "_gpu" not in acct]
# for running outside of the pipeline:
if parentdir is None:
# to manually run on command line, using all accounts
return accts
elif parentdir == "choose":
# to manually run on command line, choose accounts
return choose_accounts(accts)
# save if necessary
if save is True:
# called from 00_start.py
keep = choose_accounts(accts)
pyimp.pkldump(keep, os.path.join(parentdir, "accounts.pkl"))
# no return necessary for 00_start.py
return
return accts
def redistribute_jobs(accts, user_accts, balance):
"""Redistribute priority jobs to other accounts without high priority.
Positional arguments:
accts - dict: key = account, value = dict with key = pid, value = squeue output
user_accts - list of all available slurm accounts
balance - int; ceiling number of jobs each account should have after balancing
"""
# which jobs can be moved, which accounts need jobs?
moveable = []
takers = []
for account in user_accts:
if account not in accts: # if account has zero priority jobs
takers.append(account)
else:
pids = list(accts[account].keys())
# if account has excess, give away jobs
if len(pids) > balance:
numtomove = len(pids) - balance
print("\t%s is giving up %s jobs" % (account, numtomove))
moveable.extend(pids[-numtomove:]) # move newest jobs, hopefully old will schedule
# keep track of accounts that need jobs
elif len(pids) < balance:
takers.append(account)
elif len(pids) == 1 and balance == 1 and len(accts.keys()) < len(user_accts):
# if numjobs and balance == 1 but not all accounts have low priority, give up the job
moveable.append(pids[0])
# shuffle list(takers) to avoid passing only to accounts that appear early in the list
shuffle(takers)
# redistribute jobs
taken = Counter()
while len(moveable) > 0:
for taker in takers:
# determine numtotake
if taker not in accts:
pids = []
else:
pids = accts[taker].keys()
numtotake = balance - len(pids)
if balance == 1 and len(pids) == 1:
numtotake = 1
# give numtotake to taker
for pid in moveable[-numtotake:]:
adjustjob(taker, pid)
taken[taker] += 1 # needs to be above .remove because of while()
moveable.remove(pid)
for taker, count in taken.items():
print("\t%s has taken %s jobs" % (taker, count))
pass
def main(keyword, parentdir):
print(pyimp.ColorText("\nStarting balance_queue.py").bold())
# get accounts available for billing
user_accts = get_avail_accounts(parentdir)
# if only one account, skip balancing
if len(user_accts) == 1:
print("\tthere is only one account (%s), no more accounts to balance queue." % user_accts[0])
print("\texiting balance_queue.py")
exit()
# get priority jobs from the queue
sq = myslurm.Squeue(grepping=[keyword, "Priority"])
if sq is None or len(sq) == 0:
print("\texiting balance_queue.py")
sys.exit(0)
# get per-account lists of jobs in Priority pending status, exit if all accounts have low priority
accts, early_exit_decision = getaccounts(sq, "", user_accts)
announceacctlens(accts, early_exit_decision) # TODO: announce all accounts, not just accts with priority jobs
if early_exit_decision is True:
sys.exit(0)
# determine number of jobs to redistribute to each account
balance = getbalance(accts, len(user_accts))
# redistribute
redistribute_jobs(accts, user_accts, balance)
# announce final job counts
announceacctlens(*getaccounts(myslurm.Squeue(grepping=[keyword, "Priority"]),
"final",
user_accts))
pass
if __name__ == "__main__":
# args
if len(sys.argv) == 1:
# so I can run from command line and balance full queue
keyword = os.environ["USER"]
parentdir = None
elif len(sys.argv) == 2:
# so I can run from command line without a parentdir
thisfile, keyword = sys.argv
parentdir = None
else:
thisfile, keyword, parentdir = sys.argv
main(keyword, parentdir)