Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,10 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
SENTRY_SUBNET_SECRET = os.environ.get("SENTRY_SUBNET_SECRET", None)


DEVSERVICES_TASKBROKER_GRPC_PORT = 50051
if SILO_DEVSERVER or IS_DEV:
DEVSERVICES_TASKBROKER_GRPC_PORT = 50055

# Queue configuration
from kombu import Exchange, Queue

Expand Down Expand Up @@ -2963,7 +2967,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
"taskbroker": lambda settings, options: (
{
"image": "ghcr.io/getsentry/taskbroker:latest",
"ports": {"50051/tcp": 50051},
"ports": {
f"{settings.DEVSERVICES_TASKBROKER_GRPC_PORT}/tcp": settings.DEVSERVICES_TASKBROKER_GRPC_PORT
},
"environment": {
"TASKBROKER_KAFKA_CLUSTER": (
"sentry_kafka"
Expand Down
15 changes: 13 additions & 2 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@
from typing import Any

import click
from django.conf import settings

import sentry.taskworker.constants as taskworker_constants
from sentry.bgtasks.api import managed_bgtasks
from sentry.runner.decorators import configuration, log_options
from sentry.utils.kafka import run_processor_with_signals


def _get_default_rpc_host() -> str:

port = getattr(settings, "DEVSERVICES_TASKBROKER_GRPC_PORT", 50051)
return f"127.0.0.1:{port}"


DEFAULT_BLOCK_SIZE = int(32 * 1e6)
logger = logging.getLogger("sentry.runner.commands.run")

Expand Down Expand Up @@ -252,7 +260,7 @@ def worker(ignore_unknown_queues: bool, **options: Any) -> None:
@click.option(
"--rpc-host",
help="The hostname and port for the taskworker-rpc. When using num-brokers the hostname will be appended with `-{i}` to connect to individual brokers.",
default="127.0.0.1:50051",
default=None,
)
@click.option(
"--num-brokers", help="Number of brokers available to connect to", default=None, type=int
Expand Down Expand Up @@ -312,7 +320,7 @@ def taskworker(**options: Any) -> None:


def run_taskworker(
rpc_host: str,
rpc_host: str | None,
num_brokers: int | None,
rpc_host_list: str | None,
max_child_task_count: int,
Expand All @@ -329,6 +337,9 @@ def run_taskworker(
"""
taskworker factory that can be reloaded
"""
if rpc_host is None:
rpc_host = _get_default_rpc_host()

from sentry.taskworker.client.client import make_broker_hosts
from sentry.taskworker.worker import TaskWorker

Expand Down
Loading