Skip to content

Kafka backend listener not starting #2

@franagustin

Description

@franagustin

Error description

Hi!
We've noticed that using this bundle with a Kafka backend configured was working randomly. This means that it sometimes worked fine and sometimes the events weren't being registered.

After some debugging, I tracked down the issue to this line: https://github.com/applauncher-team/remote_event_bundle/blob/master/remote_event_bundle/backend/kafka_backend.py#L23

kernel.run_service(lambda event_list, gid: self.kafka.subscribe(
    topics=[i.name for i in event_list],
    group_id=gid,
    consumer_callback=self.callback,
    poll_timeout=2
), events, self.group_id)

Even though you may see nothing wrong at first sight, let's check this self.kafka.subscribe method defined in Kafka Bundle.

def subscribe(self, topics, group_id, consumer_callback, poll_timeout=1):
    self.kernel.run_service(
        self._subscribe_service,
        topics,
        group_id,
        consumer_callback,
        poll_timeout
    )

It calls run_service method on self.kernel which is instantiated as None: https://github.com/applauncher-team/kafka_bundle/blob/5182b57f57917a674653bc0920118bbf1e978127/kafka_bundle/bundle.py#L17

However, provided that it sometimes works, it must be populated somehow. This is happening in KafkaBundle.injector_ready method: https://github.com/applauncher-team/kafka_bundle/blob/5182b57f57917a674653bc0920118bbf1e978127/kafka_bundle/bundle.py#L109

@inject.params(kernel=Kernel, kafka=KafkaManager)
def injector_ready(self, event, kernel, kafka):
    kafka.kernel = kernel

What is wrong then?

Applauncher uses Blinker's signals for running the event listeners such as injector_ready.
And here the main problem: [Blinker does not guarantee any particular ordering in which to run signals.

We have the following two functions depending on the same signal:

  • KafkaBundle.injector_ready: It populates the KafkaManager.kernel attribute.
  • RemoteEventBundle.injector_ready: It register events, hence calling the aforementioned KafkaManager.subscribe method which needs its kernel attribute to be populated beforehand.

Clearly, the former should run first and the latter afterwards. Nevertheless, this will happen randomly since blinker does not guarantee it.
Whenever they run in the specified order, the events are registered and the listeners begin running, whereas when inverted, the KafkaManager.subscribe method fails because the kernel attribute is not populated yet.

What can be done

I believe the most logical approach would be running those signals on different events instead of the same.
However, we can't change the KafkaBundle.injector_ready to run before (in the ConfigurationReadyEvent) since it needs to inject the Kernel.

Hence, my approach would be change the RemoteEventBundle.injector_ready to run at KernelReadyEvent.
I've already tried it and it seems to work alright.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions