From e263fc7783507092eb9529e52508d68c0380058c Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Fri, 12 Jun 2015 08:40:53 +0000 Subject: [PATCH] Added nice parameter to control the nice level --- lib/disco/worker/__init__.py | 7 +++++++ lib/disco/worker/classic/worker.py | 2 ++ master/include/pipeline.hrl | 2 ++ master/src/disco_worker.erl | 4 ++-- master/src/job_coordinator.erl | 2 ++ master/src/jobpack.erl | 25 +++++++++++++++++++++++++ 6 files changed, 40 insertions(+), 2 deletions(-) diff --git a/lib/disco/worker/__init__.py b/lib/disco/worker/__init__.py index bfb74bc5d..ba8f5d549 100644 --- a/lib/disco/worker/__init__.py +++ b/lib/disco/worker/__init__.py @@ -104,6 +104,11 @@ class Worker(dict): The :class:`Worker` base class defines the following parameters: + :type nice: int + :param nice: niceness of the job (default 19). + Niceness values range from -20 (most favorable to the process) to 19 (least favorable to the process). + Values lower than 0 need root. + :type save_results: bool :param save_results: whether or not to save the output to :ref:`DDFS`. @@ -148,6 +153,7 @@ def defaults(self): :return: dict of default values for the :class:`Worker`. """ return {'save_results': False, + 'nice': 19, 'profile': False, 'required_files': {}, 'required_modules': None} @@ -198,6 +204,7 @@ def jobdict(self, job, **jobargs): :return: :ref:`jobdict` dict. """ return {'prefix': self.getitem('name', job, jobargs), + 'nice': self.getitem('nice', job, jobargs, 19), 'save_results': self.getitem('save_results', job, jobargs, False), 'save_info': self.getitem('save_info', job, jobargs, "ddfs"), 'scheduler': self.getitem('scheduler', job, jobargs, {}), diff --git a/lib/disco/worker/classic/worker.py b/lib/disco/worker/classic/worker.py index ff477d843..e1eb176e4 100644 --- a/lib/disco/worker/classic/worker.py +++ b/lib/disco/worker/classic/worker.py @@ -261,6 +261,7 @@ def get(key, default=None): has_reduce = bool(get('reduce')) reduce_shuffle = bool(get('reduce_shuffle')) job_input = get('input', []) + nice = get('nice', 19) has_save_results = get('save', False) or get('save_results', False) if not isiterable(job_input): @@ -292,6 +293,7 @@ def get(key, default=None): 'reduce?': has_reduce, 'reduce_shuffle?': reduce_shuffle, 'nr_reduces': nr_reduces, + 'nice': nice, 'save_results': has_save_results}) return jobdict diff --git a/master/include/pipeline.hrl b/master/include/pipeline.hrl index be72f34fe..c3da20f9f 100644 --- a/master/include/pipeline.hrl +++ b/master/include/pipeline.hrl @@ -102,6 +102,7 @@ schedule :: task_schedule(), input :: [input_id()], all_inputs:: boolean(), + nice :: integer(), save_outputs :: boolean(), save_info :: string()}). -type task_spec() :: #task_spec{}. @@ -136,6 +137,7 @@ inputs = [] :: [task_output()], pipeline = [] :: pipeline(), schedule :: task_schedule(), + nice = 19 :: integer(), save_results = false :: boolean(), save_info :: string()}). -type jobinfo() :: #jobinfo{}. diff --git a/master/src/disco_worker.erl b/master/src/disco_worker.erl index 233f13876..9bf131901 100644 --- a/master/src/disco_worker.erl +++ b/master/src/disco_worker.erl @@ -128,10 +128,10 @@ handle_cast(start, #state{task = Task, master = Master} = State) -> {stop, {shutdown, {error, E}}, State} end; handle_cast(work, #state{task = T, port = none} = State) -> - {#task_spec{jobname = JobName, worker = W, jobenvs = JE}, #task_run{}} = T, + {#task_spec{jobname = JobName, nice = Nice, worker = W, jobenvs = JE}, #task_run{}} = T, JobHome = jobhome(JobName), Worker = filename:join(JobHome, binary_to_list(W)), - Command = "nice -n 19 " ++ Worker, + Command = "nice -n " ++ integer_to_list(Nice) ++ " " ++ Worker, JobEnvs = [{S, false} || S <- disco:settings()] ++ JE, Options = [{cd, JobHome}, stream, diff --git a/master/src/job_coordinator.erl b/master/src/job_coordinator.erl index fd098855a..dae47bb59 100644 --- a/master/src/job_coordinator.erl +++ b/master/src/job_coordinator.erl @@ -723,6 +723,7 @@ make_stage_tasks(Stage, _Grouping, [], make_stage_tasks(Stage, Grouping, [{G, Inputs}|Rest], #state{jobinfo = #jobinfo{jobname = JN, jobenvs = JE, + nice = Nice, save_info = SaveInfo, save_results = Save, worker = W}, @@ -755,6 +756,7 @@ make_stage_tasks(Stage, Grouping, [{G, Inputs}|Rest], grouping = Grouping, job_coord = self(), schedule = Schedule, + nice = Nice, save_outputs = SaveOutputs, save_info = SaveInfo}, diff --git a/master/src/jobpack.erl b/master/src/jobpack.erl index 4d0680f70..cbe0b8548 100644 --- a/master/src/jobpack.erl +++ b/master/src/jobpack.erl @@ -76,11 +76,13 @@ jobdict(< {jobname(), jobinfo()}. core_jobinfo(JobPack, JobDict) -> Prefix = find(<<"prefix">>, JobDict), + Nice = find(<<"nice">>, JobDict, 19), SaveResults = find(<<"save_results">>, JobDict, false), SaveInfo = find(<<"save_info">>, JobDict, "ddfs"), JobInfo = #jobinfo{jobenvs = jobenvs(JobPack), worker = find(<<"worker">>, JobDict), owner = find(<<"owner">>, JobDict), + nice = validate_nice(Nice), save_info = validate_save_info(SaveInfo), save_results = validate_save_results(SaveResults)}, {validate_prefix(Prefix), JobInfo}. @@ -220,6 +222,29 @@ validate_inputs(Inputs) -> || [L, Sz, Urls] <- Inputs] end. +-spec validate_nice(term()) -> integer(). +validate_nice(S) -> + case json_validator:validate(integer, S) of + {error, E} -> + lager:warning("Invalid nice in jobpack: ~s", + json_validator:error_msg(E)), + throw({error, invalid_job_nice}); + _ -> + case S > 19 of + true -> + lager:warning("Invalid nice in jobpack: ~s is too high", S), + throw({error, invalid_job_nice}); + false -> + case S < -20 of + true -> + lager:warning("Invalid nice in jobpack: ~s is too low", S), + throw({error, invalid_job_nice}); + false -> + S + end + end + end. + -spec validate_save_info(binary() | list()) -> string(). validate_save_info(S) when is_binary(S)-> validate_save_info(binary_to_list(S));