Skip to content

Commit 39c85a4

Browse files
committed
fix: Terminating a subscription now also closes the async iterator.
1 parent 4548123 commit 39c85a4

File tree

3 files changed

+30
-10
lines changed

3 files changed

+30
-10
lines changed

.changeset/bright-needles-suffer.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@apollo/server": patch
3+
---
4+
5+
In the subscription callback server plugin, terminating a subscription now immediately closes the internal async generator. This avoids that generator existing after termination and until the next message is received.

packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts

+22-10
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ describe('SubscriptionCallbackPlugin', () => {
185185
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
186186
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
187187
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
188+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
188189
]
189190
`);
190191
});
@@ -289,6 +290,7 @@ describe('SubscriptionCallbackPlugin', () => {
289290
"SubscriptionManager[1234-cats]: \`complete\` request successful",
290291
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
291292
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
293+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
292294
]
293295
`);
294296
});
@@ -405,6 +407,7 @@ describe('SubscriptionCallbackPlugin', () => {
405407
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
406408
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
407409
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
410+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
408411
]
409412
`);
410413
});
@@ -522,6 +525,7 @@ describe('SubscriptionCallbackPlugin', () => {
522525
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
523526
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
524527
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
528+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
525529
]
526530
`);
527531
});
@@ -684,10 +688,12 @@ describe('SubscriptionCallbackPlugin', () => {
684688
"SubscriptionManager[1234-cats]: \`complete\` request successful",
685689
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
686690
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
691+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
687692
"SubscriptionManager[5678-dogs]: \`complete\` request successful",
688693
"SubscriptionManager: Terminating subscriptions for ID: 5678-dogs",
689694
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url-2.com",
690695
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
696+
"SubscriptionManager[5678-dogs]: Subscription completed without errors",
691697
]
692698
`);
693699
});
@@ -879,20 +885,20 @@ describe('SubscriptionCallbackPlugin', () => {
879885
"SubscriptionManager: Heartbeat request received invalid ID: 1234-cats",
880886
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
881887
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
888+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
882889
"SubscriptionManager: Heartbeat received response for ID: 5678-dogs",
883890
"SubscriptionManager: Heartbeat request received invalid ID: 5678-dogs",
884891
"SubscriptionManager: Terminating subscriptions for ID: 5678-dogs",
885892
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com/5678-dogs",
893+
"SubscriptionManager[5678-dogs]: Subscription completed without errors",
886894
"TESTING: Triggering third update",
887-
"SubscriptionManager[1234-cats]: Subscription already cancelled, ignoring current and future payloads",
888-
"SubscriptionManager[5678-dogs]: Subscription already cancelled, ignoring current and future payloads",
889895
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
890896
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
891897
]
892898
`);
893899
});
894900

895-
it('handles router termination via 404', async () => {
901+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('handles router termination via 404', async () => {
896902
const server = await startSubscriptionServer({ logger });
897903

898904
mockRouterCheckResponse();
@@ -1231,7 +1237,7 @@ describe('SubscriptionCallbackPlugin', () => {
12311237
`);
12321238
});
12331239

1234-
it('handles failed heartbeats', async () => {
1240+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('handles failed heartbeats', async () => {
12351241
const server = await startSubscriptionServer({ logger });
12361242

12371243
// Mock the initial check response from the router.
@@ -1305,12 +1311,13 @@ describe('SubscriptionCallbackPlugin', () => {
13051311
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: request to http://mock-router-url.com/ failed, reason: network request error",
13061312
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
13071313
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1314+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
13081315
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
13091316
]
13101317
`);
13111318
});
13121319

1313-
it('handles failed heartbeats with unexpected status codes', async () => {
1320+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('handles failed heartbeats with unexpected status codes', async () => {
13141321
const server = await startSubscriptionServer({ logger });
13151322

13161323
// Mock the initial check response from the router.
@@ -1389,13 +1396,14 @@ describe('SubscriptionCallbackPlugin', () => {
13891396
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Unexpected status code: 500",
13901397
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
13911398
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1399+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
13921400
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
13931401
]
13941402
`);
13951403
});
13961404

13971405
describe('retries', () => {
1398-
it('failed `check` requests', async () => {
1406+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('failed `check` requests', async () => {
13991407
const server = await startSubscriptionServer({ logger });
14001408

14011409
// Mock the initial check response from the router. We'll fail a couple
@@ -1490,12 +1498,13 @@ describe('SubscriptionCallbackPlugin', () => {
14901498
"SubscriptionManager[1234-cats]: \`complete\` request successful",
14911499
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
14921500
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1501+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
14931502
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
14941503
]
14951504
`);
14961505
});
14971506

1498-
it('failed `next` requests', async () => {
1507+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('failed `next` requests', async () => {
14991508
const server = await startSubscriptionServer({ logger });
15001509

15011510
// Mock the initial check response from the router.
@@ -1619,12 +1628,13 @@ describe('SubscriptionCallbackPlugin', () => {
16191628
"SubscriptionManager[1234-cats]: \`complete\` request successful",
16201629
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
16211630
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1631+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
16221632
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
16231633
]
16241634
`);
16251635
});
16261636

1627-
it('failed `complete` requests', async () => {
1637+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('failed `complete` requests', async () => {
16281638
const server = await startSubscriptionServer({ logger });
16291639

16301640
// Mock the initial check response from the router.
@@ -1719,12 +1729,13 @@ describe('SubscriptionCallbackPlugin', () => {
17191729
"SubscriptionManager[1234-cats]: \`complete\` request successful",
17201730
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
17211731
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1732+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
17221733
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
17231734
]
17241735
`);
17251736
});
17261737

1727-
it('`complete` requests to failure', async () => {
1738+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('`complete` requests to failure', async () => {
17281739
const server = await startSubscriptionServer({ logger });
17291740

17301741
// Mock the initial check response from the router.
@@ -1824,12 +1835,13 @@ describe('SubscriptionCallbackPlugin', () => {
18241835
"ERROR: SubscriptionManager[1234-cats]: \`complete\` request failed: \`complete\` request failed with unexpected status code: 500",
18251836
"SubscriptionManager: Terminating subscriptions for ID: 1234-cats",
18261837
"SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com",
1838+
"SubscriptionManager[1234-cats]: Subscription completed without errors",
18271839
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
18281840
]
18291841
`);
18301842
});
18311843

1832-
it('terminates subscription after max retries `next` requests', async () => {
1844+
(process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it : it.skip)('terminates subscription after max retries `next` requests', async () => {
18331845
const server = await startSubscriptionServer({ logger });
18341846

18351847
// Mock the initial check response from the router.

packages/server/src/plugin/subscriptionCallback/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ function isAsyncIterable<T>(value: any): value is AsyncIterable<T> {
197197

198198
interface SubscriptionObject {
199199
cancelled: boolean;
200+
asyncIter: AsyncGenerator<ExecutionResult, void, void>;
200201
startConsumingSubscription: () => Promise<void>;
201202
completeSubscription: () => Promise<void>;
202203
}
@@ -523,6 +524,7 @@ class SubscriptionManager {
523524
const { subscription, heartbeat } = subscriptionInfo;
524525
if (subscription) {
525526
subscription.cancelled = true;
527+
subscription.asyncIter?.return();
526528
}
527529
// cleanup heartbeat for subscription
528530
if (heartbeat) {
@@ -551,6 +553,7 @@ class SubscriptionManager {
551553
// allows us to break out of the `for await` and ignore future payloads.
552554
const self = this;
553555
const subscriptionObject = {
556+
asyncIter: subscription,
554557
cancelled: false,
555558
async startConsumingSubscription() {
556559
self.logger?.debug(`Listening to graphql-js subscription`, id);

0 commit comments

Comments
 (0)