Skip to content

Commit 23cbbc1

Browse files
author
sergey.komissarov
committed
Merge branch '6-drop-notifications' into 'master'
Ability to skip intermediate notifications in the subscription. See merge request sd/pseven/DjangoChannelsGraphqlWs!13
2 parents 25ec3c1 + f0b49fe commit 23cbbc1

File tree

4 files changed

+181
-10
lines changed

4 files changed

+181
-10
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ import graphene
115115
class MySubscription(channels_graphql_ws.Subscription):
116116
"""Simple GraphQL subscription."""
117117

118+
# Leave only latest 64 messages in the server queue.
119+
notification_queue_limit = 64
120+
118121
# Subscription payload.
119122
event = graphene.String()
120123

channels_graphql_ws/graphql_ws_consumer.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -421,11 +421,14 @@ async def _process_broadcast(self, message):
421421
except asyncio.QueueFull:
422422
# The queue is full - issue a warning and throw away
423423
# the oldest item from the queue.
424-
LOG.warning(
425-
"Subscription notification dropped!"
426-
" Subscription operation id: %s.",
427-
sid,
428-
)
424+
# NOTE: Queue with the size 1 means that it is safe
425+
# to drop intermediate notifications.
426+
if subinf.notification_queue.maxsize != 1:
427+
LOG.warning(
428+
"Subscription notification dropped!"
429+
" Subscription operation id: %s.",
430+
sid,
431+
)
429432
subinf.notification_queue.get_nowait()
430433

431434
async def unsubscribe(self, message):
@@ -671,7 +674,12 @@ def register_middleware(next_middleware, root, info, *args, **kwds):
671674
await self._send_gql_complete(operation_id)
672675

673676
async def _register_subscription(
674-
self, operation_id, groups, publish_callback, unsubscribed_callback
677+
self,
678+
operation_id,
679+
groups,
680+
publish_callback,
681+
unsubscribed_callback,
682+
notification_queue_limit=None,
675683
):
676684
"""Register a new subscription when client subscribes.
677685
@@ -689,6 +697,8 @@ async def _register_subscription(
689697
subscription groups current subscription belongs to.
690698
unsubscribed_callback: Called to notify when a client
691699
unsubscribes.
700+
notification_queue_limit: LImit for the subscribtion
701+
notification queue. Default is used if not set.
692702
693703
"""
694704
# NOTE: It is important to invoke `group_add` from an
@@ -705,9 +715,10 @@ async def _register_subscription(
705715
trigger = rx.subjects.Subject()
706716

707717
# The subscription notification queue.
708-
notification_queue = asyncio.Queue(
709-
maxsize=self.subscription_notification_queue_limit
710-
)
718+
queue_size = notification_queue_limit
719+
if not queue_size or queue_size < 0:
720+
queue_size = self.subscription_notification_queue_limit
721+
notification_queue = asyncio.Queue(maxsize=queue_size)
711722

712723
# Start an endless task which listens the `notification_queue`
713724
# and invokes subscription "resolver" on new notifications.

channels_graphql_ws/subscription.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,15 @@ class Subscription(graphene.ObjectType):
148148
# Return this from the `publish` to suppress the notification.
149149
SKIP = GraphqlWsConsumer.SKIP
150150

151+
# Subscription notifications queue limit. Set this to control the
152+
# amount of notifications server keeps in queue when notifications
153+
# come faster than server processing them. Set this limit to 1 drops
154+
# all notifications in queue except the latest one. Use this only if
155+
# you are sure that subscription always returns all required state
156+
# to the client and client does not loose information when
157+
# intermediate notification is missed.
158+
notification_queue_limit = None
159+
151160
@classmethod
152161
def broadcast(cls, *, group=None, payload=None):
153162
"""Call this method to notify all subscriptions in the group.
@@ -422,7 +431,12 @@ def unsubscribed_callback():
422431
# `subscribe`.
423432
return result
424433

425-
return register_subscription(groups, publish_callback, unsubscribed_callback)
434+
return register_subscription(
435+
groups,
436+
publish_callback,
437+
unsubscribed_callback,
438+
cls.notification_queue_limit,
439+
)
426440

427441
@classmethod
428442
def _group_name(cls, group=None):
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Copyright (C) DATADVANCE, 2010-2021
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining
4+
# a copy of this software and associated documentation files (the
5+
# "Software"), to deal in the Software without restriction, including
6+
# without limitation the rights to use, copy, modify, merge, publish,
7+
# distribute, sublicense, and/or sell copies of the Software, and to
8+
# permit persons to whom the Software is furnished to do so, subject to
9+
# the following conditions:
10+
#
11+
# The above copyright notice and this permission notice shall be
12+
# included in all copies or substantial portions of the Software.
13+
#
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
17+
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
18+
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19+
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20+
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
22+
"""Test that intermediate notifications skipped."""
23+
24+
import time
25+
26+
import graphene
27+
import pytest
28+
29+
import channels_graphql_ws
30+
31+
32+
@pytest.mark.asyncio
33+
async def test_notification_queue_limit(gql):
34+
"""Test it is possible to skip intermediate notifications.
35+
36+
Here we start subscription which send 10 messages and server took
37+
1 second to process each message. All messages except the first and
38+
the last will be skipped by the server. Because subscription class
39+
sets notification_queue_limit to 1.
40+
"""
41+
42+
print("Prepare the test setup: GraphQL backend classes.")
43+
44+
class SendMessages(graphene.Mutation):
45+
"""Send message mutation."""
46+
47+
is_ok = graphene.Boolean()
48+
49+
@staticmethod
50+
def mutate(root, info):
51+
"""Broadcast 10 messages."""
52+
del root, info
53+
for idx in range(10):
54+
OnNewMessage.broadcast(payload={"message": str(idx)})
55+
return SendMessages(is_ok=True)
56+
57+
class OnNewMessage(channels_graphql_ws.Subscription):
58+
"""Triggered by `SendMessage` on every new message."""
59+
60+
# Leave only the last message in the server queue.
61+
notification_queue_limit = 1
62+
63+
message = graphene.String()
64+
65+
@staticmethod
66+
def publish(payload, info):
67+
"""Notify all clients except the author of the message."""
68+
del info
69+
# Emulate server high load. It is bad to use sleep in the
70+
# tests but here it seems ok. If test is running within
71+
# high load builder it will behave the same and skip
72+
# notifications it is unable to process.
73+
time.sleep(1)
74+
return OnNewMessage(message=payload["message"])
75+
76+
class Subscription(graphene.ObjectType):
77+
"""Root subscription."""
78+
79+
on_new_message = OnNewMessage.Field()
80+
81+
class Mutation(graphene.ObjectType):
82+
"""Root mutation."""
83+
84+
send_messages = SendMessages.Field()
85+
86+
print("Establish & initialize WebSocket GraphQL connections.")
87+
88+
comm1 = gql(
89+
mutation=Mutation,
90+
subscription=Subscription,
91+
consumer_attrs={"strict_ordering": True},
92+
)
93+
await comm1.connect_and_init()
94+
95+
comm2 = gql(
96+
mutation=Mutation,
97+
subscription=Subscription,
98+
consumer_attrs={"strict_ordering": True},
99+
)
100+
await comm2.connect_and_init()
101+
102+
print("Subscribe to receive a new message notifications.")
103+
104+
sub_op_id = await comm2.send(
105+
msg_type="start",
106+
payload={
107+
"query": "subscription op_name { on_new_message { message } }",
108+
"variables": {},
109+
"operationName": "op_name",
110+
},
111+
)
112+
await comm2.assert_no_messages("Subscribe responded with a message!")
113+
114+
print("Start sending notifications.")
115+
116+
mut_op_id = await comm1.send(
117+
msg_type="start",
118+
payload={
119+
"query": """mutation op_name { send_messages { is_ok } }""",
120+
"variables": {},
121+
"operationName": "op_name",
122+
},
123+
)
124+
await comm1.receive(assert_id=mut_op_id, assert_type="data")
125+
await comm1.receive(assert_id=mut_op_id, assert_type="complete")
126+
127+
await comm1.assert_no_messages("Self-notification happened!")
128+
129+
# Client will receive only the first and the last notifications.
130+
resp = await comm2.receive(assert_id=sub_op_id, assert_type="data")
131+
assert resp["data"]["on_new_message"]["message"] == "0"
132+
133+
resp = await comm2.receive(assert_id=sub_op_id, assert_type="data")
134+
assert resp["data"]["on_new_message"]["message"] == "9"
135+
136+
await comm1.assert_no_messages(
137+
"Unexpected message received at the end of the test!"
138+
)
139+
await comm2.assert_no_messages(
140+
"Unexpected message received at the end of the test!"
141+
)
142+
await comm1.finalize()
143+
await comm2.finalize()

0 commit comments

Comments
 (0)