Skip to content

Conversation

gustavodemorais
Copy link
Contributor

What is the purpose of the change

The MultiJoin has a JoinKeyExtractor helper that derives keys from AttributeRef mappings in the joinAttributeMap. It describes how attributes from different inputs are equated via equi-join * conditions.

The methods present in the JoinKeyExtractor should always return rows for the join conditions and commonJoinKey in the same order so we have stable a joinKey for state access and key selector for routing. This adds a fix to make sure the order is maintained.

To make the whole class easier to read, I've also refactored it, added comments and examples.

Brief change log

  • Fix stable ordering issue
  • Refactor JoinKeyExtractor for better readability + comments and examples
  • Add multiple tests

Verifying this change

  • Added semantic tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 23, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this big refactoring and adding all these Javadocs with examples. Improves the readability significantly. I had just minor comments.

* <p>Example used throughout the comments: t1.id1 = t2.user_id2 and t3.user_id3 = t2.user_id2. All
* three attributes (t1.id1, t2.user_id2, t3.user_id3) represent the same conceptual key.
*
* <p>The {@code joinAttributeMap} for this example would be structured as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this example. Way easier to understand!

userIdFieldName, pkFieldName, String.format("details_%d", inputIndex)
});
} else if (inputIndex
== 3) { // Shipments: user_id (VARCHAR), pk (VARCHAR), details (BIGINT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
== 3) { // Shipments: user_id (VARCHAR), pk (VARCHAR), details (BIGINT)
== 3) { // Shipments: user_id (CHAR NOT NULL), pk (VARCHAR), details (BIGINT)

String userIdFieldName = String.format("user_id_%d", inputIndex);
String pkFieldName = String.format("pk_%d", inputIndex); // Generic PK name

if (inputIndex == 0) { // Users: user_id (VARCHAR), pk (VARCHAR), details (BIGINT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (inputIndex == 0) { // Users: user_id (VARCHAR), pk (VARCHAR), details (BIGINT)
if (inputIndex == 0) { // Users: user_id (CHAR NOT NULL), pk (VARCHAR), details (BIGINT)

+ "k3 BOOLEAN,"
+ "k2 INT,"
+ "k4 STRING")
.addOption("changelog-mode", "I")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the new addMode(ChangelogMode.insertOnly())

"k2 INT",
"k3 BOOLEAN",
"k4 STRING")
.addOption("changelog-mode", "I,UA,UB,D")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the new addMode(ChangelogMode.all())

.addSchema(
"`record_id` STRING PRIMARY KEY NOT ENFORCED",
"`user_id` INT")
.addOption("changelog-mode", "I,UA,D")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the new addMode(ChangelogMode.upsert())

@gustavodemorais
Copy link
Contributor Author

Thanks for the review, @twalthr! I've addressed the comments and replaced all changelogs with the new addMode 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants