From 262058ac92319814d991b23fd78d3f32c1cef3a7 Mon Sep 17 00:00:00 2001 From: Dainer Mesa Date: Thu, 14 Jan 2021 00:33:21 -0300 Subject: [PATCH 1/4] add connection default in schedule entry to create dynamic task --- celerybeatmongo/schedulers.py | 64 +++++++++++++++++------------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/celerybeatmongo/schedulers.py b/celerybeatmongo/schedulers.py index 14bcd5b..8dad39e 100644 --- a/celerybeatmongo/schedulers.py +++ b/celerybeatmongo/schedulers.py @@ -18,12 +18,35 @@ logger = get_logger(__name__) +def connect_mongo(app): + if hasattr(app.conf, "mongodb_scheduler_db"): + db = app.conf.get("mongodb_scheduler_db") + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_DB"): + db = app.conf.CELERY_MONGODB_SCHEDULER_DB + else: + db = "celery" + if hasattr(app.conf, "mongodb_scheduler_connection_alias"): + alias = app.conf.get('mongodb_scheduler_connection_alias') + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS"): + alias = app.conf.CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS + else: + alias = mongoengine.DEFAULT_CONNECTION_NAME + + if hasattr(app.conf, "mongodb_scheduler_url"): + host = app.conf.get('mongodb_scheduler_url') + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_URL"): + host = app.conf.CELERY_MONGODB_SCHEDULER_URL + else: + host = None + + return mongoengine.connect(db, host=host, alias=alias) + class MongoScheduleEntry(ScheduleEntry): - def __init__(self, task): + def __init__(self, task, app=None): self._task = task - self.app = current_app._get_current_object() + self.app = current_app._get_current_object() if app is None else app self.name = self._task.name self.task = self._task.task @@ -46,6 +69,7 @@ def __init__(self, task): if not self._task.last_run_at: self._task.last_run_at = self._default_now() self.last_run_at = self._task.last_run_at + self._mongo = connect_mongo(self.app) def _default_now(self): return self.app.now() @@ -96,8 +120,7 @@ def save(self): try: self._task.save(save_condition={}) except Exception: - logger.error(traceback.format_exc()) - + print(traceback.format_exc()) class MongoScheduler(Scheduler): @@ -110,34 +133,11 @@ class MongoScheduler(Scheduler): Model = PeriodicTask def __init__(self, app, *args, **kwargs): - if hasattr(app.conf, "mongodb_scheduler_db"): - db = app.conf.get("mongodb_scheduler_db") - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_DB"): - db = app.conf.CELERY_MONGODB_SCHEDULER_DB - else: - db = "celery" - if hasattr(app.conf, "mongodb_scheduler_connection_alias"): - alias = app.conf.get('mongodb_scheduler_connection_alias') - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS"): - alias = app.conf.CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS - else: - alias = "default" - - if hasattr(app.conf, "mongodb_scheduler_url"): - host = app.conf.get('mongodb_scheduler_url') - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_URL"): - host = app.conf.CELERY_MONGODB_SCHEDULER_URL - else: - host = None - - self._mongo = mongoengine.connect(db, host=host, alias=alias) - - if host: - logger.info("backend scheduler using %s/%s:%s", - host, db, self.Model._get_collection().name) - else: - logger.info("backend scheduler using %s/%s:%s", - "mongodb://localhost", db, self.Model._get_collection().name) + self._mongo = connect_mongo(app) + + logger.info("backend scheduler using %s/%s:%s", + self._mongo.address, self._mongo.get_default_database().name, self.Model._get_collection().name) + self._schedule = {} self._last_updated = None Scheduler.__init__(self, app, *args, **kwargs) From 8d4a36a769159835ce984e91cdc7ac1b1d0ae25f Mon Sep 17 00:00:00 2001 From: Dainer Mesa Date: Thu, 14 Jan 2021 00:37:15 -0300 Subject: [PATCH 2/4] return object when save task --- celerybeatmongo/models.py | 7 +++++-- celerybeatmongo/schedulers.py | 5 +++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/celerybeatmongo/models.py b/celerybeatmongo/models.py index 197f1f0..83a9e95 100644 --- a/celerybeatmongo/models.py +++ b/celerybeatmongo/models.py @@ -6,10 +6,13 @@ from datetime import datetime, timedelta -from mongoengine import * from celery import current_app import celery.schedules +from mongoengine.document import DynamicDocument, EmbeddedDocument +from mongoengine.errors import ValidationError +from mongoengine.fields import BooleanField, DateTimeField, DictField, EmbeddedDocumentField, IntField, ListField, StringField + def get_periodic_task_collection(): if hasattr(current_app.conf, "mongodb_scheduler_collection"): @@ -118,7 +121,7 @@ def save(self, force_insert=False, validate=True, clean=True, if not self.date_creation: self.date_creation = datetime.now() self.date_changed = datetime.now() - super(PeriodicTask, self).save(force_insert, validate, clean, + return super(PeriodicTask, self).save(force_insert, validate, clean, write_concern, cascade, cascade_kwargs, _refs, save_condition, signal_kwargs, **kwargs) diff --git a/celerybeatmongo/schedulers.py b/celerybeatmongo/schedulers.py index 8dad39e..39ed84b 100644 --- a/celerybeatmongo/schedulers.py +++ b/celerybeatmongo/schedulers.py @@ -118,9 +118,10 @@ def save(self): self._task.last_run_at = self.last_run_at self._task.run_immediately = False try: - self._task.save(save_condition={}) - except Exception: + return self._task.save(save_condition={}) + except Exception as ex: print(traceback.format_exc()) + raise ex class MongoScheduler(Scheduler): From 21b7644f95dec5d8553d68974a761a219d734b61 Mon Sep 17 00:00:00 2001 From: Dainer Mesa Date: Thu, 14 Jan 2021 02:06:33 -0300 Subject: [PATCH 3/4] refactor code to show message in log --- celerybeatmongo/schedulers.py | 41 +++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/celerybeatmongo/schedulers.py b/celerybeatmongo/schedulers.py index 39ed84b..848d4b7 100644 --- a/celerybeatmongo/schedulers.py +++ b/celerybeatmongo/schedulers.py @@ -18,28 +18,40 @@ logger = get_logger(__name__) -def connect_mongo(app): +def connect_mongo(app): + alias = get_alias(app) + host = get_host(app) + db = get_db(app) + + return mongoengine.connect(db, host=host, alias=alias) + +def get_host(app): + if hasattr(app.conf, "mongodb_scheduler_url"): + host = app.conf.get('mongodb_scheduler_url') + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_URL"): + host = app.conf.CELERY_MONGODB_SCHEDULER_URL + else: + host = None + return host + +def get_db(app): if hasattr(app.conf, "mongodb_scheduler_db"): db = app.conf.get("mongodb_scheduler_db") elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_DB"): db = app.conf.CELERY_MONGODB_SCHEDULER_DB else: db = "celery" + return db + +def get_alias(app): if hasattr(app.conf, "mongodb_scheduler_connection_alias"): alias = app.conf.get('mongodb_scheduler_connection_alias') elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS"): alias = app.conf.CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS else: alias = mongoengine.DEFAULT_CONNECTION_NAME + return alias - if hasattr(app.conf, "mongodb_scheduler_url"): - host = app.conf.get('mongodb_scheduler_url') - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_URL"): - host = app.conf.CELERY_MONGODB_SCHEDULER_URL - else: - host = None - - return mongoengine.connect(db, host=host, alias=alias) class MongoScheduleEntry(ScheduleEntry): @@ -134,10 +146,17 @@ class MongoScheduler(Scheduler): Model = PeriodicTask def __init__(self, app, *args, **kwargs): + host = get_host(app) + db = get_db(app) + self._mongo = connect_mongo(app) - logger.info("backend scheduler using %s/%s:%s", - self._mongo.address, self._mongo.get_default_database().name, self.Model._get_collection().name) + if host: + logger.info("backend scheduler using %s/%s:%s", + host, db, self.Model._get_collection().name) + else: + logger.info("backend scheduler using %s/%s:%s", + "mongodb://localhost", db, self.Model._get_collection().name) self._schedule = {} self._last_updated = None From b4ef663685e456649d43add50be7b66a4afc48db Mon Sep 17 00:00:00 2001 From: Dainer Mesa Date: Tue, 19 Jan 2021 09:13:47 -0300 Subject: [PATCH 4/4] change print error for logger error --- celerybeatmongo/schedulers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/celerybeatmongo/schedulers.py b/celerybeatmongo/schedulers.py index 848d4b7..6518930 100644 --- a/celerybeatmongo/schedulers.py +++ b/celerybeatmongo/schedulers.py @@ -132,8 +132,7 @@ def save(self): try: return self._task.save(save_condition={}) except Exception as ex: - print(traceback.format_exc()) - raise ex + logger.error(traceback.format_exc()) class MongoScheduler(Scheduler):