Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ SYNCMASTER__BROKER__URL=amqp://guest:guest@rabbitmq:5672

# Server options
SYNCMASTER__SERVER__SESSION__SECRET_KEY=generate_some_random_string
SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
# !!! NEVER USE ON PRODUCTION !!!
SYNCMASTER__SERVER__DEBUG=true

# Worker options
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}

# Keycloak Auth
#SYNCMASTER__AUTH__PROVIDER=syncmaster.server.providers.auth.keycloak_provider.KeycloakAuthProvider
SYNCMASTER__AUTH__KEYCLOAK__SERVER_URL=http://keycloak:8080
Expand Down
4 changes: 3 additions & 1 deletion .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ export SYNCMASTER__BROKER__URL=amqp://guest:guest@localhost:5672

# Server options
export SYNCMASTER__SERVER__SESSION__SECRET_KEY=generate_some_random_string
export SYNCMASTER__SERVER__LOG_URL_TEMPLATE="https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}"
# !!! NEVER USE ON PRODUCTION !!!
export SYNCMASTER__SERVER__DEBUG=true

# Worker options
export SYNCMASTER__WORKER__LOG_URL_TEMPLATE="https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}"

# Keycloak Auth
#export SYNCMASTER__AUTH__PROVIDER=syncmaster.server.providers.auth.keycloak_provider.KeycloakAuthProvider
export SYNCMASTER__AUTH__KEYCLOAK__SERVER_URL=http://localhost:8080
Expand Down
5 changes: 2 additions & 3 deletions docs/worker/log_url.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
Setting the `Run.log_url` value
===============================

Each run in the system is linked to a log URL where the Celery worker logs are available. This log URL might point to an Elastic instance or another logging tool such as Grafana. The log URL is generated based on a template configured in the server.
Each run in the system is linked to a log URL where the Celery worker logs are available. This log URL might point to an Elastic instance or another logging tool such as Grafana. The log URL is generated based on a template configured in the configuration.

The configuration parameter is:

.. code-block:: bash

SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}

You can search for each run by either its correlation id ``x-request-id`` in http headers or the ``Run.Id``.

11 changes: 0 additions & 11 deletions syncmaster/server/api/v1/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
from datetime import datetime
from typing import Annotated

from asgi_correlation_id import correlation_id
from celery import Celery
from fastapi import APIRouter, Depends, Query
from jinja2 import Template
from kombu.exceptions import KombuError

from syncmaster.db.models import RunType, Status, User
Expand Down Expand Up @@ -122,15 +120,6 @@ async def start_run(
type=RunType.MANUAL,
)

log_url = Template(settings.server.log_url_template).render(
run=run,
correlation_id=correlation_id.get(),
)
run = await unit_of_work.run.update(
run_id=run.id,
log_url=log_url,
)

try:
await asyncio.to_thread(
celery.send_task,
Expand Down
4 changes: 0 additions & 4 deletions syncmaster/server/settings/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ class ServerSettings(BaseModel):
""",
),
)
log_url_template: str = Field(
"",
description=":ref:`URL template to access worker logs <worker-log-url>`",
)
request_id: RequestIDSettings = Field(
default_factory=RequestIDSettings,
)
Expand Down
5 changes: 5 additions & 0 deletions syncmaster/worker/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ class WorkerSettings(BaseSettings):
.. code-block:: bash

SYNCMASTER__WORKER__CREATE_SPARK_SESSION_FUNCTION=custom_syncmaster.spark.get_worker_spark_session
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
"""

CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(
"syncmaster.worker.spark.get_worker_spark_session",
description="Function to create Spark session for worker",
)
log_url_template: str = Field(
"",
description=":ref:`URL template to access worker logs <worker-log-url>`",
)


class WorkerAppSettings(BaseSettings):
Expand Down
7 changes: 4 additions & 3 deletions syncmaster/worker/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime, timezone

from asgi_correlation_id import correlation_id
from asgi_correlation_id.extensions.celery import load_correlation_ids
from celery import Celery
from celery.signals import after_setup_task_logger
from celery.utils.log import get_task_logger
from jinja2 import Template
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload

Expand All @@ -20,8 +22,6 @@
logger = get_task_logger(__name__)
load_correlation_ids()

WORKER_SETTINGS = WorkerAppSettings()


@celery.task(name="run_transfer_task", bind=True, track_started=True)
def run_transfer_task(self: Celery, run_id: int) -> None:
Expand Down Expand Up @@ -50,6 +50,7 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):

run.status = Status.STARTED
run.started_at = datetime.now(tz=timezone.utc)
run.log_url = Template(settings.worker.log_url_template).render(run=run, correlation_id=correlation_id.get())
session.add(run)
session.commit()

Expand All @@ -61,7 +62,7 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):

try:
controller = TransferController(
settings=WORKER_SETTINGS,
settings=settings,
run=run,
source_connection=run.transfer.source_connection,
target_connection=run.transfer.target_connection,
Expand Down
2 changes: 0 additions & 2 deletions tests/test_unit/test_runs/test_create_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ async def test_superuser_can_create_run(
"type": RunType.MANUAL,
}
assert result.status_code == 200
assert "correlation_id" in response.get("log_url")
assert "run_id" in response.get("log_url")
mock_to_thread.assert_awaited_once_with(
mock_send_task,
"run_transfer_task",
Expand Down
2 changes: 2 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ async def run_transfer_and_verify(
token=user.token,
)
assert run_data["status"] == Status.FINISHED.value
assert "correlation_id" in run_data["log_url"]
assert "run_id" in run_data["log_url"]
verify_transfer_auth_data(run_data, source_auth, target_auth)

return run_data
Expand Down
Loading