-
Notifications
You must be signed in to change notification settings - Fork 31
[FLINK-37950][Connectors/MongoDB] Supporting ordered & bypassdocumentValidation behaviour for sink writer. #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…Validation behaviour for sink writer.
|
@Jiabao-Sun can you please help review? |
Savonitar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding support for MongoDB's ordered and bypassDocumentValidation write options!
The PR looks good overall.
However, I have concerns about the test coverage and whether the tests actually validate that these parameters work as intended when failures occur.
| } | ||
|
|
||
| @Test | ||
| void unorderedWrite() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The naming style is slightly inconsistent (unorderedWrite vs testRecovery)
Maybe it is cleaner to pick one style?
| void bypassDocumentValidation() throws Exception { | ||
| final String collection = "test-sink-with-bypass-doc-validation"; | ||
| final MongoSink<Document> sink = | ||
| createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);
In this test, you are passing false to the bypass parameter, effectively disabling the feature you intend to test. Or I'm missing something?
| } | ||
|
|
||
| @Test | ||
| void bypassDocumentValidation() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink); | ||
| env.execute(); | ||
| assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current tests don't fully validate the behavioral difference between ordered and unordered writes.
It would be valuable to add a test case that injects a failure (e.g., a duplicate key error) in the middle of a batch.
|
|
||
| env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink); | ||
| env.execute(); | ||
| assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the current test passes regardless of the flag because the collection has no validation rules.
Maybe to verify bypassDocumentValidation works, we can create the collection with a validator and assert that writes only succeed when the bypass flag is enabled?
Improve mongodb bulkwrite performance by supporting configuration optional parameters
This is a backward compatible changes with existing behaviour of ordered writes with document validation is maintained configuration default values.