Skip to content

Commit 0540e99

Browse files
CDRIVER-4738 do not resume after change stream cursor is closed
1 parent 75abd6d commit 0540e99

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

src/libmongoc/src/mongoc/mongoc-change-stream.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,15 @@ mongoc_change_stream_next(mongoc_change_stream_t *stream, const bson_t **bson)
497497
goto end;
498498
}
499499

500+
/* the cursor is closed. */
501+
if (stream->cursor->cursor_id == 0) {
502+
_mongoc_set_error(&stream->err,
503+
MONGOC_ERROR_CURSOR,
504+
MONGOC_ERROR_CURSOR_INVALID_CURSOR,
505+
"Cannot advance a closed change stream.");
506+
goto end;
507+
}
508+
500509
resumable = _is_resumable_error(stream, err_doc);
501510
while (resumable) {
502511
/* recreate the cursor. */

src/libmongoc/tests/test-mongoc-change-stream.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,78 @@ prose_test_18(void)
19301930
mock_server_destroy(server);
19311931
}
19321932

1933+
// Test that a resume does not occur after an "invalidate" event.
1934+
static void
1935+
iterate_after_invalidate(void *test_ctx)
1936+
{
1937+
mongoc_client_t *client = test_framework_new_default_client();
1938+
mongoc_collection_t *coll = mongoc_client_get_collection(client, "db", "coll");
1939+
bson_error_t error;
1940+
int64_t start_time = bson_get_monotonic_time();
1941+
1942+
BSON_UNUSED(test_ctx);
1943+
1944+
// Insert a document into the collection to ensure the collection is created.
1945+
bool ok = mongoc_collection_insert_one(coll, tmp_bson("{'foo': 'bar'}"), NULL /* opts */, NULL /* reply */, &error);
1946+
ASSERT_OR_PRINT(ok, error);
1947+
1948+
mongoc_change_stream_t *cs = mongoc_collection_watch(coll, tmp_bson("{}"), NULL /* opts */);
1949+
1950+
ASSERT_OR_PRINT(mongoc_collection_drop(coll, &error), error);
1951+
1952+
// Iterate until the next event. Expect "drop" event.
1953+
{
1954+
const bson_t *event;
1955+
bool found_event = false;
1956+
while (!found_event) {
1957+
found_event = mongoc_change_stream_next(cs, &event);
1958+
if (!found_event) {
1959+
ASSERT_OR_PRINT(!mongoc_change_stream_error_document(cs, &error, NULL /* document */), error);
1960+
}
1961+
1962+
int64_t delta = bson_get_monotonic_time() - start_time;
1963+
if (delta > 10 * 1000 * 1000) {
1964+
test_error("test exceeded 10 seconds");
1965+
}
1966+
}
1967+
ASSERT_MATCH(event, "{'operationType': 'drop'}");
1968+
}
1969+
1970+
// Iterate until the next event. Expect "invalidate" event.
1971+
{
1972+
const bson_t *event;
1973+
bool found_event = false;
1974+
while (!found_event) {
1975+
found_event = mongoc_change_stream_next(cs, &event);
1976+
if (!found_event) {
1977+
ASSERT_OR_PRINT(!mongoc_change_stream_error_document(cs, &error, NULL /* document */), error);
1978+
}
1979+
1980+
int64_t delta = bson_get_monotonic_time() - start_time;
1981+
if (delta > 10 * 1000 * 1000) {
1982+
test_error("test exceeded 10 seconds");
1983+
}
1984+
}
1985+
ASSERT_MATCH(event, "{'operationType': 'invalidate'}");
1986+
}
1987+
1988+
// Iterate. Expect error suggesting failure to iterate a closed cursor.
1989+
{
1990+
const bson_t *event;
1991+
bool found_error = false;
1992+
while (!found_error) {
1993+
ASSERT_WITH_MSG(!mongoc_change_stream_next(cs, &event), "expected no event, got: %s", tmp_json(event));
1994+
found_error = mongoc_change_stream_error_document(cs, &error, NULL /* document */);
1995+
}
1996+
ASSERT_ERROR_CONTAINS(
1997+
error, MONGOC_ERROR_CURSOR, MONGOC_ERROR_CURSOR_INVALID_CURSOR, "Cannot advance a closed change stream");
1998+
}
1999+
2000+
mongoc_change_stream_destroy(cs);
2001+
mongoc_collection_destroy(coll);
2002+
mongoc_client_destroy(client);
2003+
}
2004+
19332005
typedef struct {
19342006
bson_t *commands[6];
19352007
size_t commands_len;
@@ -2176,6 +2248,12 @@ test_change_stream_install(TestSuite *suite)
21762248
test_framework_skip_if_not_replset);
21772249
TestSuite_AddMockServerTest(suite, "/change_streams/prose_test_17", prose_test_17);
21782250
TestSuite_AddMockServerTest(suite, "/change_streams/prose_test_18", prose_test_18);
2251+
TestSuite_AddFull(suite,
2252+
"/change_streams/iterate_after_invalidate",
2253+
iterate_after_invalidate,
2254+
NULL,
2255+
NULL,
2256+
test_framework_skip_if_not_replset);
21792257
TestSuite_AddFull(suite,
21802258
"/change_stream/batchSize0",
21812259
test_change_stream_batchSize0,

0 commit comments

Comments
 (0)