Skip to content

Commit 3c5db89

Browse files
author
Andrey Zelenchuk
committed
Introduce initial payload.
1 parent d5fea18 commit 3c5db89

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

channels_graphql_ws/graphql_ws_consumer.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,12 @@ def register_middleware(next_middleware, root, info, *args, **kwds):
665665
await self._send_gql_complete(operation_id)
666666

667667
async def _register_subscription(
668-
self, operation_id, groups, publish_callback, unsubscribed_callback
668+
self,
669+
operation_id,
670+
groups,
671+
publish_callback,
672+
unsubscribed_callback,
673+
initial_payload,
669674
):
670675
"""Register a new subscription when client subscribes.
671676
@@ -703,13 +708,25 @@ async def _register_subscription(
703708
maxsize=self.subscription_notification_queue_limit
704709
)
705710

711+
# Enqueue the initial payload.
712+
if initial_payload is not self.SKIP:
713+
notification_queue.put_nowait(Serializer.serialize(initial_payload))
714+
706715
# Start an endless task which listens the `notification_queue`
707716
# and invokes subscription "resolver" on new notifications.
708717
async def notifier():
709718
"""Watch the notification queue and notify clients."""
710719

711720
# Assert we run in a proper thread.
712721
self._assert_thread()
722+
723+
# Dirty hack to partially workaround the race between:
724+
# 1) call to `result.subscribe` in `_on_gql_start`; and
725+
# 2) call to `trigger.on_next` below in this function.
726+
# The first call must be earlier. Otherwise, first one or more notifications
727+
# may be lost.
728+
await asyncio.sleep(1)
729+
713730
while True:
714731
serialized_payload = await notification_queue.get()
715732

channels_graphql_ws/subscription.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ def __init_subclass_with_meta__(
357357
_meta.subscribe = get_function(subscribe)
358358
_meta.publish = get_function(publish)
359359
_meta.unsubscribed = get_function(unsubscribed)
360+
_meta.initial_payload = options.get("initial_payload", cls.SKIP)
360361

361362
super().__init_subclass_with_meta__(_meta=_meta, **options)
362363

@@ -423,7 +424,9 @@ def unsubscribed_callback():
423424
# `subscribe`.
424425
return result
425426

426-
return register_subscription(groups, publish_callback, unsubscribed_callback)
427+
return register_subscription(
428+
groups, publish_callback, unsubscribed_callback, cls._meta.initial_payload
429+
)
427430

428431
@classmethod
429432
def _group_name(cls, group=None):

0 commit comments

Comments
 (0)