From 03d7ba7a3bd75eba470cc69c897ee9b4a5c57ca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=A7=E7=9B=9B?= Date: Sun, 17 Jul 2016 20:52:11 +0800 Subject: [PATCH 1/6] add cleanup to opener --- cola/job/executor.py | 13 +++++++++++++ cola/job/task.py | 1 + 2 files changed, 14 insertions(+) diff --git a/cola/job/executor.py b/cola/job/executor.py index f3666c2..303813d 100644 --- a/cola/job/executor.py +++ b/cola/job/executor.py @@ -197,6 +197,7 @@ def _login(self, shuffle=False): return True def clear_and_relogin(self): + self._cleanup_opener() self.opener = self.job_desc.opener_cls(timeout=DEFAULT_OPENER_TIMEOUT) self.login(random=True) @@ -299,6 +300,18 @@ def _recover_normal(self): self.counter_client.multi_local_acc(self.ip, self.id_, **kw) self.normal_pages += 1 + def _cleanup_opener(self): + if hasattr(self.opener, 'cleanup'): + try: + self.opener.cleanup() + except Exception as e: + self.logger.error('Error encountered when clean up an opener') + self.logger.exception(e) + + def close(self): + self._cleanup_opener() + + class UrlExecutor(Executor): def __init__(self, *args, **kwargs): super(UrlExecutor, self).__init__(*args, **kwargs) diff --git a/cola/job/task.py b/cola/job/task.py index c7ae328..5b4e35c 100644 --- a/cola/job/task.py +++ b/cola/job/task.py @@ -243,6 +243,7 @@ def run(self): curr_priority = (curr_priority+1) % self.full_priorities finally: + self.executor.close() self.counter_client.sync() self.save() From db3b4245f549d0ac605c98b02dc7b825887f8d40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=A7=E7=9B=9B?= Date: Sat, 30 Jul 2016 00:18:32 +0800 Subject: [PATCH 2/6] fix #59, require lock before generate a mq store --- cola/core/mq/store.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cola/core/mq/store.py b/cola/core/mq/store.py index 1087147..d7b5f5d 100755 --- a/cola/core/mq/store.py +++ b/cola/core/mq/store.py @@ -254,7 +254,8 @@ def put_one(self, obj, force=False, commit=True): if self.deduper.exist(prop): return if len(self.legal_files) == 0: - self._generate_file() + with self.lock: + self._generate_file() obj_str = self._stringfy(obj) # If no file has enough space From 9951b41e2234fd79ea2be6abd576fd3becdbd9d1 Mon Sep 17 00:00:00 2001 From: brtgpy Date: Mon, 24 Apr 2017 21:39:25 +0800 Subject: [PATCH 3/6] fix encode issue when put msg with chinese --- cola/core/mq/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cola/core/mq/utils.py b/cola/core/mq/utils.py index fd8d11a..d8e58e2 100644 --- a/cola/core/mq/utils.py +++ b/cola/core/mq/utils.py @@ -24,7 +24,7 @@ def labelize(obj): if isinstance(obj, str): return obj elif isinstance(obj, unicode): - return unicode.encode('utf-8') + return obj.encode('utf-8','ignore') else: try: return str(obj) From 54c32ac03528d2d0dc95d2a67c9bb4a4a9e48d95 Mon Sep 17 00:00:00 2001 From: brtgpy Date: Mon, 24 Apr 2017 21:40:29 +0800 Subject: [PATCH 4/6] enable add_proxy on SpynnerOpener --- cola/core/opener.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/cola/core/opener.py b/cola/core/opener.py index 5592b99..948d0ad 100644 --- a/cola/core/opener.py +++ b/cola/core/opener.py @@ -115,9 +115,9 @@ def __init__(self, cookie_filename=None, user_agent=None, timeout=None, **kwargs self.browser.set_handle_redirect(True) self.browser.set_handle_referer(True) self.browser.set_handle_robots(False) - self.browser.addheaders = [ - ('User-agent', user_agent)] - + self.browser.addheaders = [('User-agent', user_agent)] + self.proxies = {} + if timeout is None: self._default_timout = mechanize._sockettimeout._GLOBAL_DEFAULT_TIMEOUT else: @@ -169,14 +169,17 @@ def _clear_content(self): del self.content def close(self): + """ + clear browse history, avoid memory issue + """ self._clear_content() resp = self.browser.response() if resp is not None: resp.close() self.browser.clear_history() - + class SpynnerOpener(Opener): - def __init__(self, user_agent=None, **kwargs): + def __init__(self, user_agent=None, timeout=30, **kwargs): try: import spynner except ImportError: @@ -185,8 +188,9 @@ def __init__(self, user_agent=None, **kwargs): if user_agent is None: user_agent = 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)' - self.br = spynner.Browser(user_agent=user_agent, **kwargs) - + self.br = spynner.Browser(user_agent=user_agent) + self._default_timout = timeout + def spynner_open(self, url, data=None, headers=None, method='GET', wait_for_text=None, wait_for_selector=None, tries=None): try: @@ -207,7 +211,7 @@ def wait_callback(br): if method == 'POST': operation = QNetworkAccessManager.PostOperation self.br.load(url, wait_callback=wait_callback, tries=tries, - operation=operation, body=data, headers=headers) + operation=operation, body=data, headers=headers,load_timeout= self._default_timout) return self.br @@ -222,6 +226,12 @@ def read(self): return self.content if hasattr(self, 'content') else self.br.contents def wait_for_selector(self, selector, **kwargs): - self.br.wait_for_content( - lambda br: not br.webframe.findFirstElement(selector).isNull(), - **kwargs) \ No newline at end of file + self.br.wait_for_content(lambda br: not br.webframe.findFirstElement(selector).isNull(), + **kwargs) + + def add_proxy(self,addr, proxy_type='all', + user=None, password=None): + self.br.set_proxy(addr) + + def set_default_timeout(self, timeout): + self._default_timout = timeout \ No newline at end of file From d4f4a3efa6a7f3e83f8446a0433735823ea4f0cf Mon Sep 17 00:00:00 2001 From: brtgpy Date: Mon, 24 Apr 2017 21:44:09 +0800 Subject: [PATCH 5/6] enable auto increse/decrease budget automatically so that job can finish as soon as all URL/bundle are processed --- cola/functions/budget.py | 20 ++++++----- cola/job/__init__.py | 15 +++++---- cola/job/executor.py | 72 ++++++++++++++++++++-------------------- cola/job/task.py | 38 ++++++++++----------- 4 files changed, 75 insertions(+), 70 deletions(-) diff --git a/cola/functions/budget.py b/cola/functions/budget.py index e0c8cc7..41dbf96 100644 --- a/cola/functions/budget.py +++ b/cola/functions/budget.py @@ -35,6 +35,7 @@ SUFFICIENT, NOAPPLIED, ALLFINISHED = range(3) DEFAULT_BUDGETS = 3 BUDGET_APPLY_STATUS_FILENAME = 'budget.apply.status' +AUTO_BUDGET = 'auto' def synchronized(func): def inner(self, *args, **kw): @@ -50,8 +51,11 @@ def __init__(self, working_dir, settings, self.settings = settings self.rpc_server = rpc_server self.app_name = app_name - - self.budgets = settings.job.size + # check if auto budget setting enabled + if settings.job.size == AUTO_BUDGET: + self.budgets = len(settings.job.starts) + else: + self.budgets = settings.job.size self.limit = self.budgets >= 0 self.applied = 0 self.finished = 0 @@ -178,11 +182,11 @@ def finish(self, size=1): def error(self, size=1): return self._call('error', size) - def set_budget(self, budget): - return self._call('set_budget', budget) + def set_budgets(self, budget): + return self._call('set_budgets', budget) - def inc_budget(self, budget): - return self._call('inc_budget', budget) + def inc_budgets(self, budget): + return self._call('inc_budgets', budget) - def dec_budget(self, budget): - return self._call('dec_budget', budget) \ No newline at end of file + def dec_budgets(self, budget): + return self._call('dec_budgets', budget) \ No newline at end of file diff --git a/cola/job/__init__.py b/cola/job/__init__.py index cfe9c9e..b7b9e11 100644 --- a/cola/job/__init__.py +++ b/cola/job/__init__.py @@ -107,12 +107,12 @@ def run_containers(n_containers, n_instances, working_dir, job_def_path, is_local=is_local, master_ip=master_ip, task_start_id=acc) if is_multi_process: process = multiprocessing.Process(target=container.run, - args=(True, )) + args=(True,)) process.start() processes.append(process) else: thread = threading.Thread(target=container.run, - args=(True, )) + args=(True,)) thread.start() processes.append(thread) acc += n_tasks @@ -138,7 +138,7 @@ def __init__(self, ctx, job_def_path, job_name, self.job_name = job_name self.working_dir = working_dir or os.path.join(self.ctx.working_dir, self.job_name) - self.logger = get_logger(name='cola_job'+str(time.time())) + self.logger = get_logger(name='cola_job' + str(time.time())) self.job_desc = job_desc or import_job_desc(job_def_path) self.settings = self.job_desc.settings @@ -168,7 +168,7 @@ def _register_rpc(self): self.rpc_server.register_function(self.shutdown, name='shutdown', prefix=self.prefix) if self.ctx.is_local_mode: - self.rpc_server.register_function(lambda: [self.job_name, ], + self.rpc_server.register_function(lambda: [self.job_name,], name='get_jobs') def init_deduper(self): @@ -176,6 +176,8 @@ def init_deduper(self): base = 1 if not self.is_bundle else 1000 size = self.job_desc.settings.job.size + if isinstance(size,basestring): + size = len(self.job_desc.settings.job.starts) * 10 capacity = UNLIMIT_BLOOM_FILTER_CAPACITY if size > 0: capacity = max(base * size * 10, capacity) @@ -203,7 +205,7 @@ def init_mq(self): def _init_function_servers(self): budget_dir = os.path.join(self.working_dir, 'budget') - budget_cls = BudgetApplyServer if not self.is_multi_process \ + budget_cls = BudgetApplyServer if not self.is_multi_process \ else self.manager.budget_server self.budget_server = budget_cls(budget_dir, self.settings, None, self.job_name) @@ -264,8 +266,7 @@ def init(self): def run(self, block=False): self.init() try: - self.processes = run_containers( - self.n_containers, self.n_instances, self.working_dir, + self.processes = run_containers(self.n_containers, self.n_instances, self.working_dir, self.job_def_path, self.job_name, self.ctx.env, self.mq, self.counter_arg, self.budget_arg, self.speed_arg, self.stopped, self.nonsuspend, self.idle_statuses, diff --git a/cola/job/executor.py b/cola/job/executor.py index 95a363f..553afe2 100644 --- a/cola/job/executor.py +++ b/cola/job/executor.py @@ -71,8 +71,7 @@ def __init__(self, id_, job_desc, mq, is_local=False, env=None, logger=None): self.id_ = id_ self.job_desc = job_desc - self.opener = job_desc.opener_cls( - timeout=DEFAULT_OPENER_TIMEOUT) + self.opener = job_desc.opener_cls(timeout=DEFAULT_OPENER_TIMEOUT) self.mq = mq self.dir_ = working_dir self.settings = job_desc.settings @@ -116,8 +115,7 @@ def _configure_proxy(self): for p in proxies: proxy_type = p.type if p.has('type') else 'all' if p.has('addr'): - self.opener.add_proxy( - p.addr, + self.opener.add_proxy(p.addr, proxy_type=proxy_type, user=p.user if p.has('user') else None, password=p.password if p.has('password') else None) @@ -198,7 +196,6 @@ def _login(self, shuffle=False): return True def clear_and_relogin(self): - self._cleanup_opener() self.opener = self.job_desc.opener_cls(timeout=DEFAULT_OPENER_TIMEOUT) self.login(random=True) @@ -211,7 +208,7 @@ def _pack_error(self, url, msg, error, content=None, msg_filename = os.path.join(path, ERROR_MSG_FILENAME) with open(msg_filename, 'w') as f: - f.write(msg+'\n') + f.write(msg + '\n') traceback.print_exc(file=f) content_filename = os.path.join(path, @@ -301,22 +298,12 @@ def _recover_normal(self): self.counter_client.multi_local_acc(self.ip, self.id_, **kw) self.normal_pages += 1 - def _cleanup_opener(self): - if hasattr(self.opener, 'cleanup'): - try: - self.opener.cleanup() - except Exception as e: - self.logger.error('Error encountered when clean up an opener') - self.logger.exception(e) - - def close(self): - self._cleanup_opener() - - class UrlExecutor(Executor): def __init__(self, *args, **kwargs): super(UrlExecutor, self).__init__(*args, **kwargs) self.budges = 0 + self.url_error_times = {} + def _parse(self, parser_cls, options, url): if hasattr(self, 'content'): @@ -327,14 +314,17 @@ def _parse(self, parser_cls, options, url): counter=ExecutorCounter(self), settings=ReadOnlySettings(self.settings), **options).parse() - return list(res) + if res: + return list(res) + else: + return list() def _log_error(self, url, e): if self.logger: self.logger.error('Error when handle url: %s' % (str(url))) self.logger.exception(e) - url.error_times = getattr(url, 'error_times', 0) + 1 + self.url_error_times[url] = self.url_error_times.get(url, 0) + 1 self.counter_client.local_inc(self.ip, self.id_, 'error_urls', 1) @@ -346,7 +336,7 @@ def _handle_error(self, url, e, pack=True): self._log_error(url, e) retries, span, ignore = self._get_handle_error_params(e) - if url.error_times <= retries: + if self.url_error_times.get(url, 0) <= retries: self.stopped.wait(span) return @@ -363,6 +353,7 @@ def _handle_error(self, url, e, pack=True): self._error() raise UnitRetryFailed + def _clear_error(self, url): if hasattr(url, 'error_times'): del url.error_times @@ -377,7 +368,7 @@ def _parse_with_process_exception(self, parser_cls, options, url): kw = {'pages': 1, 'secs': t} self.counter_client.multi_local_inc(self.ip, self.id_, **kw) self.counter_client.multi_global_inc(**kw) - + self._clear_error(url) self._recover_normal() @@ -396,7 +387,7 @@ def _parse_with_process_exception(self, parser_cls, options, url): except Exception, e: self._handle_error(url, e) - return [url, ] + return [url,] def execute(self, url, is_inc=False): failed = False @@ -415,20 +406,23 @@ def execute(self, url, is_inc=False): parser_cls, options = self.job_desc.url_patterns.get_parser(url, options=True) if parser_cls is not None: if rates == 0: - rates, span = self.speed_client.require( - DEFAULT_SPEEED_REQUIRE_SIZE) + rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE) if rates == 0: if self.stopped.wait(5): return rates -= 1 try: - next_urls = self._parse_with_process_exception( - parser_cls, options, url) + next_urls = self._parse_with_process_exception(parser_cls, options, url) next_urls = list(self.job_desc.url_patterns.matches(next_urls)) if next_urls: self.mq.put(next_urls) + # inc budget if auto budget enabled + if self.settings.job.size == 'auto': + inc_budgets = len(next_urls) + if inc_budgets > 0: + self.budget_client.inc_budgets(inc_budgets) if hasattr(self.opener, 'close'): self.opener.close() @@ -471,8 +465,7 @@ def _parse(self, parser_cls, options, bundle, url): def _log_error(self, bundle, url, e): if self.logger: - self.logger.error('Error when handle bundle: %s, url: %s' % ( - str(bundle), str(url))) + self.logger.error('Error when handle bundle: %s, url: %s' % (str(bundle), str(url))) self.logger.exception(e) if url == getattr(bundle, 'error_url', None): bundle.error_times = getattr(bundle, 'error_times', 0) + 1 @@ -512,6 +505,9 @@ def _handle_error(self, bundle, url, e, pack=True): if ignore: bundle.error_urls.append(url) + # dec budget if auto budget enabled + if self.settings.job.size == 'auto': + self.budget_client.dec_budgets(1) return else: bundle.current_urls.insert(0, url) @@ -538,6 +534,7 @@ def _parse_with_process_exception(self, parser_cls, options, self.counter_client.multi_local_inc(self.ip, self.id_, **kw) self.counter_client.multi_global_inc(**kw) + self._clear_error(bundle) self._recover_normal() @@ -556,7 +553,7 @@ def _parse_with_process_exception(self, parser_cls, options, except Exception, e: self._handle_error(bundle, url, e) - return [url, ], [] + return [url,], [] def execute(self, bundle, max_sec, is_inc=False): failed = False @@ -578,8 +575,7 @@ def execute(self, bundle, max_sec, is_inc=False): url = bundle.current_urls.pop(0) if self.logger: - self.logger.debug('get %s url: %s' % - (bundle.label, url)) + self.logger.debug('get %s url: %s' % (bundle.label, url)) rates = 0 span = 0.0 @@ -587,16 +583,14 @@ def execute(self, bundle, max_sec, is_inc=False): options=True) if parser_cls is not None: if rates == 0: - rates, span = self.speed_client.require( - DEFAULT_SPEEED_REQUIRE_SIZE) + rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE) if rates == 0: if self.stopped.wait(5): break rates -= 1 try: - next_urls, bundles = self._parse_with_process_exception( - parser_cls, options, bundle, url) + next_urls, bundles = self._parse_with_process_exception(parser_cls, options, bundle, url) next_urls = list(self.job_desc.url_patterns.matches(next_urls)) next_urls.extend(bundle.current_urls) if self.shuffle_urls: @@ -610,6 +604,12 @@ def execute(self, bundle, max_sec, is_inc=False): if bundles: self.mq.put(bundles) + # inc budget if auto budget enabled + if self.settings.job.size == 'auto': + inc_budgets = len(bundles) + if inc_budgets > 0: + self.budget_client.inc_budgets(inc_budgets) + if hasattr(self.opener, 'close'): self.opener.close() diff --git a/cola/job/task.py b/cola/job/task.py index 5b4e35c..c4b0e19 100644 --- a/cola/job/task.py +++ b/cola/job/task.py @@ -64,9 +64,8 @@ def __init__(self, working_dir, job_desc, task_id, self.n_priorities = self.settings.job.priorities # the last one is the inc mq if inc=True self.full_priorities = self.n_priorities if not self.inc else \ - self.n_priorities+1 - self.priorities_secs = tuple( - [MAX_RUNNING_SECONDS/(2**i) for i in range(self.full_priorities)]) + self.n_priorities + 1 + self.priorities_secs = tuple([MAX_RUNNING_SECONDS / (2 ** i) for i in range(self.full_priorities)]) self.priorities_objs = [[] for _ in range(self.full_priorities)] self.starts = [] @@ -119,7 +118,7 @@ def prepare(self): if self.is_local: size = self.job_desc.settings.job.size - if size < 0: + if size == 'auto' or size < 0: return for obj in self.starts[size:]: @@ -203,17 +202,20 @@ def run(self): no_budgets_times = 0 self._get_unit(curr_priority, self.runnings) else: - status = self._apply(no_budgets_times) - if status == CANNOT_APPLY: - priority_deals[curr_priority] = False - break - elif status == APPLY_FAIL: - no_budgets_times += 1 - if len(self.runnings) == 0: - continue - else: - no_budgets_times = 0 - self._get_unit(curr_priority, self.runnings) + self._get_unit(curr_priority, self.runnings) + # if get unit success, then apply budget, in case budget are wasted + if len(self.runnings)>0: + status = self._apply(no_budgets_times) + if status == CANNOT_APPLY: + priority_deals[curr_priority] = False + break + elif status == APPLY_FAIL: + no_budgets_times += 1 + if len(self.runnings) == 0: + continue + else: + no_budgets_times = 0 + else: self._get_unit(curr_priority, self.runnings) @@ -223,8 +225,7 @@ def run(self): else: priority_deals[curr_priority] = True if self.is_bundle: - self.logger.debug( - 'process bundle from priority %s' % priority_name) + self.logger.debug('process bundle from priority %s' % priority_name) rest = min(last - clock.clock(), MAX_BUNDLE_RUNNING_SECONDS) if rest <= 0: break @@ -241,9 +242,8 @@ def run(self): finally: self.priorities_objs[curr_priority].extend(self.runnings) - curr_priority = (curr_priority+1) % self.full_priorities + curr_priority = (curr_priority + 1) % self.full_priorities finally: - self.executor.close() self.counter_client.sync() self.save() From f8f314d51253c7b0504e8e25a65c68c2ba1c450a Mon Sep 17 00:00:00 2001 From: brtgpy Date: Wed, 26 Apr 2017 23:37:06 +0800 Subject: [PATCH 6/6] move UrlExecutor.retury_error_times to Url.error_times auto close job if idle for 5 times and auto budget enabled fix lint style --- cola/core/mq/utils.py | 2 +- cola/job/__init__.py | 2 +- cola/job/container.py | 9 +++++++++ cola/job/executor.py | 7 ++----- cola/job/task.py | 20 +++++++++++++++++--- 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/cola/core/mq/utils.py b/cola/core/mq/utils.py index d8e58e2..530bd14 100644 --- a/cola/core/mq/utils.py +++ b/cola/core/mq/utils.py @@ -24,7 +24,7 @@ def labelize(obj): if isinstance(obj, str): return obj elif isinstance(obj, unicode): - return obj.encode('utf-8','ignore') + return obj.encode('utf-8', 'ignore') else: try: return str(obj) diff --git a/cola/job/__init__.py b/cola/job/__init__.py index b7b9e11..d02da87 100644 --- a/cola/job/__init__.py +++ b/cola/job/__init__.py @@ -176,7 +176,7 @@ def init_deduper(self): base = 1 if not self.is_bundle else 1000 size = self.job_desc.settings.job.size - if isinstance(size,basestring): + if isinstance(size, basestring): size = len(self.job_desc.settings.job.starts) * 10 capacity = UNLIMIT_BLOOM_FILTER_CAPACITY if size > 0: diff --git a/cola/job/container.py b/cola/job/container.py index 4cf21e3..0c96305 100644 --- a/cola/job/container.py +++ b/cola/job/container.py @@ -31,6 +31,8 @@ from cola.functions.speed import SpeedControlClient from cola.functions.counter import CounterClient +MAX_IDLE_TIMES = 5 + class Container(object): def __init__(self, container_id, working_dir, job_path, job_name, env, mq, @@ -125,9 +127,16 @@ def sync(): def _init_idle_status_checker(self): def check(): + idle_times = 0 while not self.stopped.is_set(): self.idle_statuses[self.container_id] = \ all([task.is_idle() for task in self.tasks]) + if self.idle_statuses[self.container_id]: + idle_times += 1 + if self.job_desc.settings.job.size=='auto' and idle_times > MAX_IDLE_TIMES: + break + else: + idle_times = 0 self.stopped.wait(5) self.check_idle_t = threading.Thread(target=check) diff --git a/cola/job/executor.py b/cola/job/executor.py index 553afe2..d918740 100644 --- a/cola/job/executor.py +++ b/cola/job/executor.py @@ -302,8 +302,6 @@ class UrlExecutor(Executor): def __init__(self, *args, **kwargs): super(UrlExecutor, self).__init__(*args, **kwargs) self.budges = 0 - self.url_error_times = {} - def _parse(self, parser_cls, options, url): if hasattr(self, 'content'): @@ -324,7 +322,7 @@ def _log_error(self, url, e): self.logger.error('Error when handle url: %s' % (str(url))) self.logger.exception(e) - self.url_error_times[url] = self.url_error_times.get(url, 0) + 1 + url.error_times = getattr(url, 'error_times', 0) + 1 self.counter_client.local_inc(self.ip, self.id_, 'error_urls', 1) @@ -336,7 +334,7 @@ def _handle_error(self, url, e, pack=True): self._log_error(url, e) retries, span, ignore = self._get_handle_error_params(e) - if self.url_error_times.get(url, 0) <= retries: + if url.error_times <= retries: self.stopped.wait(span) return @@ -415,7 +413,6 @@ def execute(self, url, is_inc=False): try: next_urls = self._parse_with_process_exception(parser_cls, options, url) next_urls = list(self.job_desc.url_patterns.matches(next_urls)) - if next_urls: self.mq.put(next_urls) # inc budget if auto budget enabled diff --git a/cola/job/task.py b/cola/job/task.py index c4b0e19..e3fa6e7 100644 --- a/cola/job/task.py +++ b/cola/job/task.py @@ -202,9 +202,22 @@ def run(self): no_budgets_times = 0 self._get_unit(curr_priority, self.runnings) else: - self._get_unit(curr_priority, self.runnings) - # if get unit success, then apply budget, in case budget are wasted - if len(self.runnings)>0: + if self.settings.job.size=='auto': + self._get_unit(curr_priority, self.runnings) + # if get unit success, then apply budget, in case budget are wasted + if len(self.runnings)>0: + status = self._apply(no_budgets_times) + if status == CANNOT_APPLY: + priority_deals[curr_priority] = False + break + elif status == APPLY_FAIL: + no_budgets_times += 1 + if len(self.runnings) == 0: + continue + else: + no_budgets_times = 0 + # keep compability + else: status = self._apply(no_budgets_times) if status == CANNOT_APPLY: priority_deals[curr_priority] = False @@ -215,6 +228,7 @@ def run(self): continue else: no_budgets_times = 0 + self._get_unit(curr_priority, self.runnings) else: self._get_unit(curr_priority, self.runnings)