Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/architectural_decisions/009-kafka-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ central Kafka instance is not currently considered "reliable" in an experiment c
streaming the documents will allow testing to be done. Kafka will eventually be deployed in a "reliable"
way accessible to each instrument.
- We will encode messages from bluesky using `msgpack` (with the `msgpack-numpy` extension), because:
- It is the default encoder used by the upstream `bluesky-kafka` integration
- It is the default encoder used by the upstream `bluesky-kafka` integration (though we are not using
that integration directly, as it has been soft-deprecated upstream)
- It is a schema-less encoder, meaning we do not have to write/maintain fixed schemas for all the
documents allowed by `event-model`
- It has reasonable performance in terms of encoding speed and message size
Expand All @@ -41,7 +42,7 @@ Encoding bluesky documents into JSON and then wrapping them in the
was considered.

We chose `msgpack` instead of json strings + flatbuffers because:
- It is more standard in the bluesky community (e.g. it is the default used in `bluesky-kafka`)
- It is more standard in the bluesky community
- Bluesky events will be streamed to a dedicated topic, which is unlikely to be confused with data
using any other schema.

Expand Down
2 changes: 1 addition & 1 deletion doc/dev/kafka.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kafka

`ibex_bluesky_core` uses [the `bluesky-kafka` library](https://github.com/bluesky/bluesky-kafka) to send documents
`ibex_bluesky_core` uses {py:obj}`~ibex_bluesky_core.callbacks.KafkaCallback` to send documents
emitted by the {py:obj}`~bluesky.run_engine.RunEngine` to Kafka. The Kafka callback is automatically added by
{py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always
enabled.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ classifiers = [

dependencies = [
"bluesky", # Bluesky framework
"bluesky-kafka", # Bluesky-kafka integration
"confluent-kafka", # Kafka producer
"ophyd-async[ca] == 0.14.2", # Device abstraction. When changing, also change in doc/conf.py
"lmfit", # Fitting
"matplotlib", # Plotting
Expand Down
2 changes: 2 additions & 0 deletions src/ibex_bluesky_core/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
LiveFit,
LiveFitLogger,
)
from ibex_bluesky_core.callbacks._kafka import KafkaCallback
from ibex_bluesky_core.callbacks._plotting import LivePColorMesh, LivePlot, PlotPNGSaver, show_plot
from ibex_bluesky_core.callbacks._utils import get_default_output_path
from ibex_bluesky_core.fitting import FitMethod
Expand All @@ -49,6 +50,7 @@
"DocLoggingCallback",
"HumanReadableFileCallback",
"ISISCallbacks",
"KafkaCallback",
"LiveFit",
"LiveFitLogger",
"LivePColorMesh",
Expand Down
80 changes: 80 additions & 0 deletions src/ibex_bluesky_core/callbacks/_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
import os
import socket
from typing import Any

import msgpack_numpy
from bluesky.callbacks import CallbackBase
from confluent_kafka import Producer

logger = logging.getLogger(__name__)


DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092"


def get_kafka_topic_name() -> str:
"""Get the name of the bluesky Kafka topic for this machine."""
computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper()
computer_name = computer_name.upper()
if computer_name.startswith(("NDX", "NDH")):
name = computer_name[3:]
else:
name = computer_name

return f"{name}_bluesky"


class KafkaCallback(CallbackBase):
"""Forward all bluesky documents to Kafka.

Documents are sent to Kafka encoded using the MsgPack format with
the ``msgpack_numpy`` extension to allow efficiently encoding arrays.

.. note::

This callback is automatically configured by
:py:obj:`ibex_bluesky_core.run_engine.get_run_engine`, and does not need
to be configured manually.
"""

def __init__(
self,
*,
bootstrap_servers: list[str] | None = None,
topic: str | None = None,
key: str,
kafka_config: dict[str, Any],
) -> None:
super().__init__()

self._topic = topic or get_kafka_topic_name()
self._key = msgpack_numpy.dumps(key)

if "bootstrap.servers" in kafka_config:
raise ValueError(
"Do not specify bootstrap.servers in kafka config, use bootstrap_servers argument."
)

if bootstrap_servers is None:
bootstrap_servers = [
os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER)
]

kafka_config["bootstrap.servers"] = ",".join(bootstrap_servers)

self._producer = Producer(kafka_config)

def __call__(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just thinking - we don't want to flush here (to make the producer synchronous) but we could flush the producer in a __del__() function.

Copy link
Member Author

@Tom-Willemsen Tom-Willemsen Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think __del__ works as this callback will always be subsribed (so never deleted).

We could flush after a stop document. Do you think that's better? Still need to test how that would interact with network loss/broker being down/broker being borked.

self, name: str, doc: dict[str, Any], validate: bool = False
) -> tuple[str, dict[str, Any]]:
try:
data = msgpack_numpy.dumps([name, doc])
self._producer.produce(topic=self._topic, key=self._key, value=data)
except Exception:
# If we can't produce to kafka, log and carry on. We don't want
# kafka failures to kill a scan - kafka is currently considered
# 'non-critical'.
logger.exception("Failed to publish Kafka message")

return name, doc
30 changes: 4 additions & 26 deletions src/ibex_bluesky_core/run_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,27 @@
import asyncio
import functools
import logging
import os
import socket
from collections.abc import Generator
from functools import cache
from threading import Event, Lock
from typing import Any, cast

import bluesky.preprocessors as bpp
import msgpack_numpy
from bluesky.run_engine import RunEngine, RunEngineResult
from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted
from bluesky_kafka import Publisher

from ibex_bluesky_core.callbacks import DocLoggingCallback
from ibex_bluesky_core.callbacks import DocLoggingCallback, KafkaCallback
from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY
from ibex_bluesky_core.preprocessors import add_rb_number_processor
from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler
from ibex_bluesky_core.utils import is_matplotlib_backend_qt
from ibex_bluesky_core.version import version

__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"]
__all__ = ["get_run_engine", "run_plan"]

logger = logging.getLogger(__name__)


DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092"


def get_kafka_topic_name() -> str:
"""Get the name of the bluesky Kafka topic for this machine."""
computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper()
computer_name = computer_name.upper()
if computer_name.startswith(("NDX", "NDH")):
name = computer_name[3:]
else:
name = computer_name

return f"{name}_bluesky"


class _DuringTask(DuringTask):
def block(self, blocking_event: Event) -> None:
"""On windows, event.wait() on the main thread is not interruptible by a CTRL-C.
Expand Down Expand Up @@ -120,12 +101,9 @@ def get_run_engine() -> RunEngine:
log_callback = DocLoggingCallback()
RE.subscribe(log_callback)

kafka_callback = Publisher(
topic=get_kafka_topic_name(),
bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER),
kafka_callback = KafkaCallback(
key="doc",
serializer=msgpack_numpy.dumps,
producer_config={
kafka_config={
"enable.idempotence": True,
"log_level": 0,
"log.connection.close": False,
Expand Down
35 changes: 35 additions & 0 deletions tests/callbacks/test_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import re
from unittest import mock

import pytest

from ibex_bluesky_core.callbacks._kafka import KafkaCallback, get_kafka_topic_name


def test_get_kafka_topic_name():
with mock.patch("ibex_bluesky_core.callbacks._kafka.os.environ.get", return_value="FOO"):
assert get_kafka_topic_name() == "FOO_bluesky"

with mock.patch("ibex_bluesky_core.callbacks._kafka.os.environ.get", return_value="NDXBAR"):
assert get_kafka_topic_name() == "BAR_bluesky"

with mock.patch("ibex_bluesky_core.callbacks._kafka.os.environ.get", return_value="NDHBAZ"):
assert get_kafka_topic_name() == "BAZ_bluesky"


def test_init_kafka_callback_with_duplicate_bootstrap_servers():
with pytest.raises(
ValueError,
match=re.escape(
"Do not specify bootstrap.servers in kafka config, use bootstrap_servers argument."
),
):
KafkaCallback(bootstrap_servers=["abc"], kafka_config={"bootstrap.servers": "foo"}, key="")


def test_exceptions_suppressed():
cb = KafkaCallback(bootstrap_servers=["abc"], kafka_config={}, key="")
with mock.patch(
"ibex_bluesky_core.callbacks._kafka.msgpack_numpy.dumps", side_effect=ValueError
):
cb("start", {})
14 changes: 1 addition & 13 deletions tests/test_run_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import threading
from collections.abc import Generator
from typing import Any
from unittest import mock
from unittest.mock import MagicMock

import bluesky.plan_stubs as bps
Expand All @@ -12,7 +11,7 @@
from bluesky.run_engine import RunEngineResult
from bluesky.utils import Msg, RequestAbort, RunEngineInterrupted

from ibex_bluesky_core.run_engine import _DuringTask, get_kafka_topic_name, get_run_engine, run_plan
from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine, run_plan
from ibex_bluesky_core.version import version


Expand Down Expand Up @@ -147,14 +146,3 @@ def plan():
result = run_plan(plan())
assert result.plan_result == "happy_path_result"
assert result.exit_status == "success"


def test_get_kafka_topic_name():
with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="FOO"):
assert get_kafka_topic_name() == "FOO_bluesky"

with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDXBAR"):
assert get_kafka_topic_name() == "BAR_bluesky"

with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDHBAZ"):
assert get_kafka_topic_name() == "BAZ_bluesky"