From f3de8c18a507ee00c2d5c705678083aeabf542a5 Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:09:54 -0400 Subject: [PATCH 01/11] code: create contrib folder and app config --- health_check/contrib/beat_health_check/__init__.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 health_check/contrib/beat_health_check/__init__.py diff --git a/health_check/contrib/beat_health_check/__init__.py b/health_check/contrib/beat_health_check/__init__.py new file mode 100644 index 00000000..6d51797b --- /dev/null +++ b/health_check/contrib/beat_health_check/__init__.py @@ -0,0 +1 @@ +default_app_config = "contrib.beat_health_check.apps.HealthchecksConfig" From 0698f1f9880a14065b72c40f2c3d863658eaf217 Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:10:06 -0400 Subject: [PATCH 02/11] code: beat_health_check app --- health_check/contrib/beat_health_check/apps.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 health_check/contrib/beat_health_check/apps.py diff --git a/health_check/contrib/beat_health_check/apps.py b/health_check/contrib/beat_health_check/apps.py new file mode 100644 index 00000000..a758131d --- /dev/null +++ b/health_check/contrib/beat_health_check/apps.py @@ -0,0 +1,11 @@ +from django.apps import AppConfig +from health_check.plugins import plugin_dir + + +class HealthchecksConfig(AppConfig): + name = "contrib.beat_health_check" + + def ready(self): + from .backends import CeleryBeatHealthCheck + + plugin_dir.register(CeleryBeatHealthCheck) From 92c9bf177152223612ca0bf5faa2de4a727ffedf Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:10:38 -0400 Subject: [PATCH 03/11] code: beat_health_check backends with implemented check_status --- .../contrib/beat_health_check/backends.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 health_check/contrib/beat_health_check/backends.py diff --git a/health_check/contrib/beat_health_check/backends.py b/health_check/contrib/beat_health_check/backends.py new file mode 100644 index 00000000..1bef8a3c --- /dev/null +++ b/health_check/contrib/beat_health_check/backends.py @@ -0,0 +1,100 @@ +from datetime import timedelta, timezone +from typing import Any, Dict, List, Union + +from celery.beat import Service +from celery.schedules import crontab, solar, schedule +from django.conf import settings +from django.utils.module_loading import import_string +from django_celery_beat.models import CrontabSchedule, IntervalSchedule, SolarSchedule +from django_celery_beat.schedulers import ModelEntry +from health_check.backends import BaseHealthCheckBackend +from health_check.exceptions import ServiceReturnedUnexpectedResult, ServiceUnavailable + + +class CeleryBeatHealthCheck(BaseHealthCheckBackend): + def check_status(self): + """ + Checks for overdue tasks in a celery beat scheduler. + Uses the scheduler module dotted path that is specified in settings.py with `CELERY_BEAT_SCHEDULER`. + If not specified, defaults to `django_celery_beat`'s `django_celery_beat.schedulers.DatabaseScheduler`. + Allows a custom buffer to be set using `BEAT_HEALTH_CHECK_BUFFER_SECONDS` in settings.py. The buffer + defaults to 30 seconds if not defined. The buffer will offset the scheduler interval for when due + tasks are processed. Using a buffer avoids false positives, such as the case where a task is + technically due according to the scheduler, but that's only because the scheduler has not hit its + interval to check and process due tasks. + """ + from celery.app import default_app as celery_app + + # Dotted path to the Celery beat scheduler. Uses django_celery_beat scheduler at default. + scheduler_module_path = getattr(settings, "CELERY_BEAT_SCHEDULER", "django_celery_beat.schedulers.DatabaseScheduler") + scheduler = import_string(scheduler_module_path) + try: + # Get the celery scheduler for the current app and scheduler class via a beat Service + schedule: Dict = ( + Service(app=celery_app, scheduler_cls=scheduler).get_scheduler().schedule + ) + + schedule_tasks: Union[ModelEntry, Any] = schedule.values() + tasks_due: List[Union[ModelEntry, Any]] = [] + + for task in schedule_tasks: + if self.is_overdue(task): + tasks_due.append(task) + + if tasks_due: + self.add_error( + ServiceReturnedUnexpectedResult( + f"{len(tasks_due)} task(s) due:{[task.name for task in tasks_due]} " + ) + ) + except BaseException as e: + self.add_error( + ServiceUnavailable("Encountered unexpected error while checking Beat Tasks"), e + ) + + @staticmethod + def is_overdue(task: ModelEntry) -> bool: + """Determines if a task is overdue by checking if a task is overdue more than x seconds. + Use BEAT_HEALTH_CHECK_BUFFER_SECONDS or defaults to 30 second, when checking if a task should + be considered overdue. + Uses the ScheduleEntry.last run at, and the task's schedule in seconds to calculate the + next time the task is due. If the time that the task is due is less than the current time + plus the buffer, we say it's overdue. Otherwise, it's not. + + See celery.schedules.schedule.is_due for celery's implementation of determining when a + entry is due. + + Args: + task: ScheduleEntry + + Returns: + bool: If the task is overdue by at least the buffer. + """ + EXPECTED_SCHEDULE_CLASSES = [solar, crontab, schedule] + DEFAULT_DUE_BUFFER_SECONDS = 30 + buffer_seconds = getattr( + settings, "BEAT_HEALTH_CHECK_BUFFER_SECONDS", DEFAULT_DUE_BUFFER_SECONDS + ) + # django_celery_beat.schedulers.ModelEntry.to_model_schedule returns a Tuple: model_schedule, model_field + task_schedule: Union[ + CrontabSchedule, IntervalSchedule, SolarSchedule + ] = task.to_model_schedule(task.schedule)[0] + + # Get the celery schedule from the Modelentry. + celery_scheduler = task_schedule.schedule + if celery_scheduler.__class__ not in EXPECTED_SCHEDULE_CLASSES: + raise ServiceUnavailable( + f"Encountered unexpected celery schedule class: {celery_scheduler}" + ) + try: + celery_schedule: Union[solar, crontab, schedule] = task_schedule.schedule + due_in = celery_schedule.remaining_estimate(task.last_run_at) + except BaseException: + raise ServiceUnavailable("Encountered an error when determining if a task is overdue.") + + next_due_buffered_seconds = (due_in + timedelta(seconds=buffer_seconds)).total_seconds() + # If the task is due in the past, even with the buffer, we consider it due. + if next_due_buffered_seconds < 0: + return True + else: + return False From 5d61c5e37073ea5b80c9fc9110bade8aecbc0c82 Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:10:58 -0400 Subject: [PATCH 04/11] docs: add beat_health_check to contrib docs --- docs/contrib.rst | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/contrib.rst b/docs/contrib.rst index c74ce5ba..afa17053 100644 --- a/docs/contrib.rst +++ b/docs/contrib.rst @@ -54,5 +54,15 @@ which don't require that tasks are executed almost instantly but require that th to be executed in sometime the future i.e. that the worker process is alive and processing tasks all the time. -You may also use both of them. To use these checks add them to `INSTALLED_APPS` in your -Django settings module. +`health_check.contrib.beat_health_check` Checks for overdue tasks in a celery beat scheduler. +Uses the scheduler module dotted path that is specified in settings.py with `CELERY_BEAT_SCHEDULER`. +If not specified, defaults to `django_celery_beat`'s `django_celery_beat.schedulers.DatabaseScheduler`. +Allows a custom buffer to be set using `BEAT_HEALTH_CHECK_BUFFER_SECONDS` in settings.py. The buffer +defaults to 30 seconds if not defined. The buffer will offset the scheduler interval for when due +tasks are processed. Using a buffer avoids false positives, such as the case where a task is +technically due according to the scheduler, but that's only because the scheduler has not hit its +interval to check and process due tasks. + +To use any of the above checks, add the full dotted path to `INSTALLED_APPS` in your +Django settings module. Example: add `health_check.contrib.beat_health_check` to `INSTALLED_APPS` +to use `beat_health_check`. From 9c73a204ec115249fb0ec702a18ed18887ba8bbe Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:11:11 -0400 Subject: [PATCH 05/11] docs: specify settings --- docs/settings.rst | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/docs/settings.rst b/docs/settings.rst index 96be9a44..133df575 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -90,3 +90,25 @@ Using `django.settings` you may exert more fine-grained control over the behavio - Number - 3 - Specifies the maximum total time for a task to complete and return a result, including queue time. + + +Beat Health Check +---------------------- +Use `django.settings` to customize the target scheduler and buffer of the celery beat health check + +.. list-table:: Additional Settings + :widths: 25 10 10 55 + :header-rows: 1 + + * - Name + - Type + - Default + - Description + * - `CELERY_BEAT_SCHEDULER` + - String + - `django_celery_beat.schedulers.DatabaseScheduler` + - The Scheduler module dotted path + * - `BEAT_HEALTH_CHECK_BUFFER_SECONDS` + - Number + - 30 + - The number of seconds a task needs to be overdue for the heath check to fail From 7173309d2bc126e0da4f592d0bd26b9371ca4c30 Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:35:17 -0400 Subject: [PATCH 06/11] docs: add celery beat to read me --- README.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.rst b/README.rst index 0625ecda..4aa663de 100644 --- a/README.rst +++ b/README.rst @@ -16,9 +16,11 @@ The following health checks are bundled with this project: - AWS S3 storage - Celery task queue - Celery ping +- Celery Beat Health Check (via `django_celery_beat`) - RabbitMQ - Migrations +View use instructions for contrib health checks in `docs/contrib.rst` and `docs/settings.rst` Writing your own custom health checks is also very quick and easy. We also like contributions, so don't be afraid to make a pull request. @@ -72,6 +74,7 @@ Add the ``health_check`` applications to your ``INSTALLED_APPS``: 'health_check.contrib.migrations', 'health_check.contrib.celery', # requires celery 'health_check.contrib.celery_ping', # requires celery + 'health_check.contrib.beat_health_check', # requires django_celery_beat 'health_check.contrib.psutil', # disk and memory utilization; requires psutil 'health_check.contrib.s3boto3_storage', # requires boto3 and S3BotoStorage backend 'health_check.contrib.rabbitmq', # requires RabbitMQ broker From 2e0eb94a09ee4f0ea54274726a6519a43a4b0656 Mon Sep 17 00:00:00 2001 From: Mark Goldstein Date: Thu, 22 Apr 2021 21:35:29 -0400 Subject: [PATCH 07/11] code: remove unused import --- health_check/contrib/beat_health_check/backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/health_check/contrib/beat_health_check/backends.py b/health_check/contrib/beat_health_check/backends.py index 1bef8a3c..ee8914ca 100644 --- a/health_check/contrib/beat_health_check/backends.py +++ b/health_check/contrib/beat_health_check/backends.py @@ -1,4 +1,4 @@ -from datetime import timedelta, timezone +from datetime import timedelta from typing import Any, Dict, List, Union from celery.beat import Service From 0cff64964da60460a5c3ab0f7493f741e5afd64b Mon Sep 17 00:00:00 2001 From: Mark <40143016+Maker-Mark@users.noreply.github.com> Date: Wed, 9 Feb 2022 09:49:16 -0500 Subject: [PATCH 08/11] Update README.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Łukasz Skarżyński --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 4aa663de..30be3576 100644 --- a/README.rst +++ b/README.rst @@ -20,7 +20,7 @@ The following health checks are bundled with this project: - RabbitMQ - Migrations -View use instructions for contrib health checks in `docs/contrib.rst` and `docs/settings.rst` +View usage instructions of contrib health checks in `docs/contrib.rst` and `docs/settings.rst` Writing your own custom health checks is also very quick and easy. We also like contributions, so don't be afraid to make a pull request. From 482f1be4ec86ea7c49e8e7fc053dd8662b07c761 Mon Sep 17 00:00:00 2001 From: Mark <40143016+Maker-Mark@users.noreply.github.com> Date: Wed, 9 Feb 2022 09:49:58 -0500 Subject: [PATCH 09/11] Update health_check/contrib/beat_health_check/backends.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Łukasz Skarżyński --- health_check/contrib/beat_health_check/backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/health_check/contrib/beat_health_check/backends.py b/health_check/contrib/beat_health_check/backends.py index ee8914ca..913e9bf9 100644 --- a/health_check/contrib/beat_health_check/backends.py +++ b/health_check/contrib/beat_health_check/backends.py @@ -55,7 +55,7 @@ def check_status(self): @staticmethod def is_overdue(task: ModelEntry) -> bool: """Determines if a task is overdue by checking if a task is overdue more than x seconds. - Use BEAT_HEALTH_CHECK_BUFFER_SECONDS or defaults to 30 second, when checking if a task should + Use `BEAT_HEALTH_CHECK_BUFFER_SECONDS` (defaults to 30 seconds) when checking if a task should be considered overdue. Uses the ScheduleEntry.last run at, and the task's schedule in seconds to calculate the next time the task is due. If the time that the task is due is less than the current time From 5a70b5f6c60732e450f1cb14a23ed0a12c5bdd8e Mon Sep 17 00:00:00 2001 From: Mark <40143016+Maker-Mark@users.noreply.github.com> Date: Wed, 9 Feb 2022 09:50:08 -0500 Subject: [PATCH 10/11] Update health_check/contrib/beat_health_check/backends.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Łukasz Skarżyński --- health_check/contrib/beat_health_check/backends.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/health_check/contrib/beat_health_check/backends.py b/health_check/contrib/beat_health_check/backends.py index 913e9bf9..c4ca31f7 100644 --- a/health_check/contrib/beat_health_check/backends.py +++ b/health_check/contrib/beat_health_check/backends.py @@ -94,7 +94,4 @@ def is_overdue(task: ModelEntry) -> bool: next_due_buffered_seconds = (due_in + timedelta(seconds=buffer_seconds)).total_seconds() # If the task is due in the past, even with the buffer, we consider it due. - if next_due_buffered_seconds < 0: - return True - else: - return False + return next_due_buffered_seconds < 0 From e2478a0b63ae1cb1994357a8f0665fda71b6e423 Mon Sep 17 00:00:00 2001 From: Mark <40143016+Maker-Mark@users.noreply.github.com> Date: Wed, 9 Feb 2022 09:50:18 -0500 Subject: [PATCH 11/11] Update health_check/contrib/beat_health_check/backends.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Łukasz Skarżyński --- health_check/contrib/beat_health_check/backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/health_check/contrib/beat_health_check/backends.py b/health_check/contrib/beat_health_check/backends.py index c4ca31f7..a99c4b0b 100644 --- a/health_check/contrib/beat_health_check/backends.py +++ b/health_check/contrib/beat_health_check/backends.py @@ -57,7 +57,7 @@ def is_overdue(task: ModelEntry) -> bool: """Determines if a task is overdue by checking if a task is overdue more than x seconds. Use `BEAT_HEALTH_CHECK_BUFFER_SECONDS` (defaults to 30 seconds) when checking if a task should be considered overdue. - Uses the ScheduleEntry.last run at, and the task's schedule in seconds to calculate the + Uses the `ScheduleEntry.last_run_at`, and the task's schedule in seconds to calculate the next time the task is due. If the time that the task is due is less than the current time plus the buffer, we say it's overdue. Otherwise, it's not.