diff --git a/cola/core/mq/store.py b/cola/core/mq/store.py index 1087147..d7b5f5d 100755 --- a/cola/core/mq/store.py +++ b/cola/core/mq/store.py @@ -254,7 +254,8 @@ def put_one(self, obj, force=False, commit=True): if self.deduper.exist(prop): return if len(self.legal_files) == 0: - self._generate_file() + with self.lock: + self._generate_file() obj_str = self._stringfy(obj) # If no file has enough space diff --git a/cola/core/mq/utils.py b/cola/core/mq/utils.py index fd8d11a..530bd14 100644 --- a/cola/core/mq/utils.py +++ b/cola/core/mq/utils.py @@ -24,7 +24,7 @@ def labelize(obj): if isinstance(obj, str): return obj elif isinstance(obj, unicode): - return unicode.encode('utf-8') + return obj.encode('utf-8', 'ignore') else: try: return str(obj) diff --git a/cola/core/opener.py b/cola/core/opener.py index 5592b99..948d0ad 100644 --- a/cola/core/opener.py +++ b/cola/core/opener.py @@ -115,9 +115,9 @@ def __init__(self, cookie_filename=None, user_agent=None, timeout=None, **kwargs self.browser.set_handle_redirect(True) self.browser.set_handle_referer(True) self.browser.set_handle_robots(False) - self.browser.addheaders = [ - ('User-agent', user_agent)] - + self.browser.addheaders = [('User-agent', user_agent)] + self.proxies = {} + if timeout is None: self._default_timout = mechanize._sockettimeout._GLOBAL_DEFAULT_TIMEOUT else: @@ -169,14 +169,17 @@ def _clear_content(self): del self.content def close(self): + """ + clear browse history, avoid memory issue + """ self._clear_content() resp = self.browser.response() if resp is not None: resp.close() self.browser.clear_history() - + class SpynnerOpener(Opener): - def __init__(self, user_agent=None, **kwargs): + def __init__(self, user_agent=None, timeout=30, **kwargs): try: import spynner except ImportError: @@ -185,8 +188,9 @@ def __init__(self, user_agent=None, **kwargs): if user_agent is None: user_agent = 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)' - self.br = spynner.Browser(user_agent=user_agent, **kwargs) - + self.br = spynner.Browser(user_agent=user_agent) + self._default_timout = timeout + def spynner_open(self, url, data=None, headers=None, method='GET', wait_for_text=None, wait_for_selector=None, tries=None): try: @@ -207,7 +211,7 @@ def wait_callback(br): if method == 'POST': operation = QNetworkAccessManager.PostOperation self.br.load(url, wait_callback=wait_callback, tries=tries, - operation=operation, body=data, headers=headers) + operation=operation, body=data, headers=headers,load_timeout= self._default_timout) return self.br @@ -222,6 +226,12 @@ def read(self): return self.content if hasattr(self, 'content') else self.br.contents def wait_for_selector(self, selector, **kwargs): - self.br.wait_for_content( - lambda br: not br.webframe.findFirstElement(selector).isNull(), - **kwargs) \ No newline at end of file + self.br.wait_for_content(lambda br: not br.webframe.findFirstElement(selector).isNull(), + **kwargs) + + def add_proxy(self,addr, proxy_type='all', + user=None, password=None): + self.br.set_proxy(addr) + + def set_default_timeout(self, timeout): + self._default_timout = timeout \ No newline at end of file diff --git a/cola/functions/budget.py b/cola/functions/budget.py index e0c8cc7..41dbf96 100644 --- a/cola/functions/budget.py +++ b/cola/functions/budget.py @@ -35,6 +35,7 @@ SUFFICIENT, NOAPPLIED, ALLFINISHED = range(3) DEFAULT_BUDGETS = 3 BUDGET_APPLY_STATUS_FILENAME = 'budget.apply.status' +AUTO_BUDGET = 'auto' def synchronized(func): def inner(self, *args, **kw): @@ -50,8 +51,11 @@ def __init__(self, working_dir, settings, self.settings = settings self.rpc_server = rpc_server self.app_name = app_name - - self.budgets = settings.job.size + # check if auto budget setting enabled + if settings.job.size == AUTO_BUDGET: + self.budgets = len(settings.job.starts) + else: + self.budgets = settings.job.size self.limit = self.budgets >= 0 self.applied = 0 self.finished = 0 @@ -178,11 +182,11 @@ def finish(self, size=1): def error(self, size=1): return self._call('error', size) - def set_budget(self, budget): - return self._call('set_budget', budget) + def set_budgets(self, budget): + return self._call('set_budgets', budget) - def inc_budget(self, budget): - return self._call('inc_budget', budget) + def inc_budgets(self, budget): + return self._call('inc_budgets', budget) - def dec_budget(self, budget): - return self._call('dec_budget', budget) \ No newline at end of file + def dec_budgets(self, budget): + return self._call('dec_budgets', budget) \ No newline at end of file diff --git a/cola/job/__init__.py b/cola/job/__init__.py index cfe9c9e..d02da87 100644 --- a/cola/job/__init__.py +++ b/cola/job/__init__.py @@ -107,12 +107,12 @@ def run_containers(n_containers, n_instances, working_dir, job_def_path, is_local=is_local, master_ip=master_ip, task_start_id=acc) if is_multi_process: process = multiprocessing.Process(target=container.run, - args=(True, )) + args=(True,)) process.start() processes.append(process) else: thread = threading.Thread(target=container.run, - args=(True, )) + args=(True,)) thread.start() processes.append(thread) acc += n_tasks @@ -138,7 +138,7 @@ def __init__(self, ctx, job_def_path, job_name, self.job_name = job_name self.working_dir = working_dir or os.path.join(self.ctx.working_dir, self.job_name) - self.logger = get_logger(name='cola_job'+str(time.time())) + self.logger = get_logger(name='cola_job' + str(time.time())) self.job_desc = job_desc or import_job_desc(job_def_path) self.settings = self.job_desc.settings @@ -168,7 +168,7 @@ def _register_rpc(self): self.rpc_server.register_function(self.shutdown, name='shutdown', prefix=self.prefix) if self.ctx.is_local_mode: - self.rpc_server.register_function(lambda: [self.job_name, ], + self.rpc_server.register_function(lambda: [self.job_name,], name='get_jobs') def init_deduper(self): @@ -176,6 +176,8 @@ def init_deduper(self): base = 1 if not self.is_bundle else 1000 size = self.job_desc.settings.job.size + if isinstance(size, basestring): + size = len(self.job_desc.settings.job.starts) * 10 capacity = UNLIMIT_BLOOM_FILTER_CAPACITY if size > 0: capacity = max(base * size * 10, capacity) @@ -203,7 +205,7 @@ def init_mq(self): def _init_function_servers(self): budget_dir = os.path.join(self.working_dir, 'budget') - budget_cls = BudgetApplyServer if not self.is_multi_process \ + budget_cls = BudgetApplyServer if not self.is_multi_process \ else self.manager.budget_server self.budget_server = budget_cls(budget_dir, self.settings, None, self.job_name) @@ -264,8 +266,7 @@ def init(self): def run(self, block=False): self.init() try: - self.processes = run_containers( - self.n_containers, self.n_instances, self.working_dir, + self.processes = run_containers(self.n_containers, self.n_instances, self.working_dir, self.job_def_path, self.job_name, self.ctx.env, self.mq, self.counter_arg, self.budget_arg, self.speed_arg, self.stopped, self.nonsuspend, self.idle_statuses, diff --git a/cola/job/container.py b/cola/job/container.py index 4cf21e3..0c96305 100644 --- a/cola/job/container.py +++ b/cola/job/container.py @@ -31,6 +31,8 @@ from cola.functions.speed import SpeedControlClient from cola.functions.counter import CounterClient +MAX_IDLE_TIMES = 5 + class Container(object): def __init__(self, container_id, working_dir, job_path, job_name, env, mq, @@ -125,9 +127,16 @@ def sync(): def _init_idle_status_checker(self): def check(): + idle_times = 0 while not self.stopped.is_set(): self.idle_statuses[self.container_id] = \ all([task.is_idle() for task in self.tasks]) + if self.idle_statuses[self.container_id]: + idle_times += 1 + if self.job_desc.settings.job.size=='auto' and idle_times > MAX_IDLE_TIMES: + break + else: + idle_times = 0 self.stopped.wait(5) self.check_idle_t = threading.Thread(target=check) diff --git a/cola/job/executor.py b/cola/job/executor.py index ba6ff0f..d918740 100644 --- a/cola/job/executor.py +++ b/cola/job/executor.py @@ -71,8 +71,7 @@ def __init__(self, id_, job_desc, mq, is_local=False, env=None, logger=None): self.id_ = id_ self.job_desc = job_desc - self.opener = job_desc.opener_cls( - timeout=DEFAULT_OPENER_TIMEOUT) + self.opener = job_desc.opener_cls(timeout=DEFAULT_OPENER_TIMEOUT) self.mq = mq self.dir_ = working_dir self.settings = job_desc.settings @@ -116,8 +115,7 @@ def _configure_proxy(self): for p in proxies: proxy_type = p.type if p.has('type') else 'all' if p.has('addr'): - self.opener.add_proxy( - p.addr, + self.opener.add_proxy(p.addr, proxy_type=proxy_type, user=p.user if p.has('user') else None, password=p.password if p.has('password') else None) @@ -210,7 +208,7 @@ def _pack_error(self, url, msg, error, content=None, msg_filename = os.path.join(path, ERROR_MSG_FILENAME) with open(msg_filename, 'w') as f: - f.write(msg+'\n') + f.write(msg + '\n') traceback.print_exc(file=f) content_filename = os.path.join(path, @@ -314,7 +312,10 @@ def _parse(self, parser_cls, options, url): counter=ExecutorCounter(self), settings=ReadOnlySettings(self.settings), **options).parse() - return list(res) + if res: + return list(res) + else: + return list() def _log_error(self, url, e): if self.logger: @@ -350,6 +351,7 @@ def _handle_error(self, url, e, pack=True): self._error() raise UnitRetryFailed + def _clear_error(self, url): if hasattr(url, 'error_times'): del url.error_times @@ -364,7 +366,7 @@ def _parse_with_process_exception(self, parser_cls, options, url): kw = {'pages': 1, 'secs': t} self.counter_client.multi_local_inc(self.ip, self.id_, **kw) self.counter_client.multi_global_inc(**kw) - + self._clear_error(url) self._recover_normal() @@ -383,7 +385,7 @@ def _parse_with_process_exception(self, parser_cls, options, url): except Exception, e: self._handle_error(url, e) - return [url, ] + return [url,] def execute(self, url, is_inc=False): failed = False @@ -402,20 +404,22 @@ def execute(self, url, is_inc=False): parser_cls, options = self.job_desc.url_patterns.get_parser(url, options=True) if parser_cls is not None: if rates == 0: - rates, span = self.speed_client.require( - DEFAULT_SPEEED_REQUIRE_SIZE) + rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE) if rates == 0: if self.stopped.wait(5): return rates -= 1 try: - next_urls = self._parse_with_process_exception( - parser_cls, options, url) + next_urls = self._parse_with_process_exception(parser_cls, options, url) next_urls = list(self.job_desc.url_patterns.matches(next_urls)) - if next_urls: self.mq.put(next_urls) + # inc budget if auto budget enabled + if self.settings.job.size == 'auto': + inc_budgets = len(next_urls) + if inc_budgets > 0: + self.budget_client.inc_budgets(inc_budgets) if hasattr(self.opener, 'close'): self.opener.close() @@ -458,8 +462,7 @@ def _parse(self, parser_cls, options, bundle, url): def _log_error(self, bundle, url, e): if self.logger: - self.logger.error('Error when handle bundle: %s, url: %s' % ( - str(bundle), str(url))) + self.logger.error('Error when handle bundle: %s, url: %s' % (str(bundle), str(url))) self.logger.exception(e) if url == getattr(bundle, 'error_url', None): bundle.error_times = getattr(bundle, 'error_times', 0) + 1 @@ -499,6 +502,9 @@ def _handle_error(self, bundle, url, e, pack=True): if ignore: bundle.error_urls.append(url) + # dec budget if auto budget enabled + if self.settings.job.size == 'auto': + self.budget_client.dec_budgets(1) return else: bundle.current_urls.insert(0, url) @@ -525,6 +531,7 @@ def _parse_with_process_exception(self, parser_cls, options, self.counter_client.multi_local_inc(self.ip, self.id_, **kw) self.counter_client.multi_global_inc(**kw) + self._clear_error(bundle) self._recover_normal() @@ -543,7 +550,7 @@ def _parse_with_process_exception(self, parser_cls, options, except Exception, e: self._handle_error(bundle, url, e) - return [url, ], [] + return [url,], [] def execute(self, bundle, max_sec, is_inc=False): failed = False @@ -565,8 +572,7 @@ def execute(self, bundle, max_sec, is_inc=False): url = bundle.current_urls.pop(0) if self.logger: - self.logger.debug('get %s url: %s' % - (bundle.label, url)) + self.logger.debug('get %s url: %s' % (bundle.label, url)) rates = 0 span = 0.0 @@ -574,16 +580,14 @@ def execute(self, bundle, max_sec, is_inc=False): options=True) if parser_cls is not None: if rates == 0: - rates, span = self.speed_client.require( - DEFAULT_SPEEED_REQUIRE_SIZE) + rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE) if rates == 0: if self.stopped.wait(5): break rates -= 1 try: - next_urls, bundles = self._parse_with_process_exception( - parser_cls, options, bundle, url) + next_urls, bundles = self._parse_with_process_exception(parser_cls, options, bundle, url) next_urls = list(self.job_desc.url_patterns.matches(next_urls)) next_urls.extend(bundle.current_urls) if self.shuffle_urls: @@ -597,6 +601,12 @@ def execute(self, bundle, max_sec, is_inc=False): if bundles: self.mq.put(bundles) + # inc budget if auto budget enabled + if self.settings.job.size == 'auto': + inc_budgets = len(bundles) + if inc_budgets > 0: + self.budget_client.inc_budgets(inc_budgets) + if hasattr(self.opener, 'close'): self.opener.close() diff --git a/cola/job/task.py b/cola/job/task.py index c7ae328..e3fa6e7 100644 --- a/cola/job/task.py +++ b/cola/job/task.py @@ -64,9 +64,8 @@ def __init__(self, working_dir, job_desc, task_id, self.n_priorities = self.settings.job.priorities # the last one is the inc mq if inc=True self.full_priorities = self.n_priorities if not self.inc else \ - self.n_priorities+1 - self.priorities_secs = tuple( - [MAX_RUNNING_SECONDS/(2**i) for i in range(self.full_priorities)]) + self.n_priorities + 1 + self.priorities_secs = tuple([MAX_RUNNING_SECONDS / (2 ** i) for i in range(self.full_priorities)]) self.priorities_objs = [[] for _ in range(self.full_priorities)] self.starts = [] @@ -119,7 +118,7 @@ def prepare(self): if self.is_local: size = self.job_desc.settings.job.size - if size < 0: + if size == 'auto' or size < 0: return for obj in self.starts[size:]: @@ -203,17 +202,34 @@ def run(self): no_budgets_times = 0 self._get_unit(curr_priority, self.runnings) else: - status = self._apply(no_budgets_times) - if status == CANNOT_APPLY: - priority_deals[curr_priority] = False - break - elif status == APPLY_FAIL: - no_budgets_times += 1 - if len(self.runnings) == 0: - continue - else: - no_budgets_times = 0 + if self.settings.job.size=='auto': self._get_unit(curr_priority, self.runnings) + # if get unit success, then apply budget, in case budget are wasted + if len(self.runnings)>0: + status = self._apply(no_budgets_times) + if status == CANNOT_APPLY: + priority_deals[curr_priority] = False + break + elif status == APPLY_FAIL: + no_budgets_times += 1 + if len(self.runnings) == 0: + continue + else: + no_budgets_times = 0 + # keep compability + else: + status = self._apply(no_budgets_times) + if status == CANNOT_APPLY: + priority_deals[curr_priority] = False + break + elif status == APPLY_FAIL: + no_budgets_times += 1 + if len(self.runnings) == 0: + continue + else: + no_budgets_times = 0 + self._get_unit(curr_priority, self.runnings) + else: self._get_unit(curr_priority, self.runnings) @@ -223,8 +239,7 @@ def run(self): else: priority_deals[curr_priority] = True if self.is_bundle: - self.logger.debug( - 'process bundle from priority %s' % priority_name) + self.logger.debug('process bundle from priority %s' % priority_name) rest = min(last - clock.clock(), MAX_BUNDLE_RUNNING_SECONDS) if rest <= 0: break @@ -241,7 +256,7 @@ def run(self): finally: self.priorities_objs[curr_priority].extend(self.runnings) - curr_priority = (curr_priority+1) % self.full_priorities + curr_priority = (curr_priority + 1) % self.full_priorities finally: self.counter_client.sync() self.save()