Skip to content

RFC: Reusable Source Executor#72

Open
fuyufjh wants to merge 4 commits intomainfrom
eric/reuse-source
Open

RFC: Reusable Source Executor#72
fuyufjh wants to merge 4 commits intomainfrom
eric/reuse-source

Conversation

@fuyufjh
Copy link
Copy Markdown
Contributor

@fuyufjh fuyufjh commented Aug 18, 2023

No description provided.

Copy link
Copy Markdown
Contributor

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really love this idea! This makes "Source" in our system much more well-defined.

let source_stream = the stream from upstream SourceExecutor i.e. new events
let backfill_stream = read from message brokers with user-defined scan_startup_mode) i.e. historical events

// In fact, these should be per kafka partition
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth noting that split changes should also be applied to Backfill executors, and...

  • we need to persist the backfill offsets in this case,
  • aligning the distribution/scheduling of Backfill and Source could make life much easier.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for both 2 items


So how to combine the backfill data with incoming data without duplicating or missing events?

Since everything happens inside the `BackfillExecutor`, the implementation is not difficult. Here is the pseudo-code.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this seems to be another implementation besides MV-on-MV backfill and CDC backfill, since each of them seems to have a slightly different algorithm?

Hope that there's some abstraction for this new backfill executor, so that we don't have to introduce a physical executor every time we support a new source connector. 🥵

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is already pretty lots of duplication between MV-on-MV backfill and CDC backfill. I am not sure whether it's possible to DRY it. Anyway, either DRY or not is acceptable to me.


Here,

- `KafkaSourceExecutor` is created on executing `CREATE SOURCE` statement, although a trivial optimization is doing nothing when there is no downstream operator attached.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this "optimization" is necessary, as there're kinds of sources that do not support rewinding the historical data like Nexmark or Datagen. So it can be confusing if the source executor runs itself and loses some records in the downstream materialized views created later.

Copy link
Copy Markdown
Contributor

@st1page st1page left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this idea. Some supplements:

  1. For the watermark on the source, We need to add the WatermarkFilter in the source fragment during the CREATE SOURCE statement. And I think we do not need to support do the watermark logic in the backfill part. because
    • we have not supported and do not have a design about the MV on MV's watermark in the backfill phase. risingwavelabs/risingwave#8375 So we should assume the downstream can accept there could be no watermark for a long time.
    • Our watermark filter algorithm is non-deterministic among different splits. So even when we filter the out-date data in backfill. Backfills in different streaming queries are still inconsistency.
  2. Based on this design, we will have a global unique instance for the source. And we can easily insert some common logic on the source and show it to users naturally. E.g, an internal table includes the record filtered by watermark. An internal table that includes the when parsing from the connector.

@BugenZhao
Copy link
Copy Markdown
Contributor

I personally prefer putting one WatermarkFilter after each Backfill, as different materialized views indeed filter out different records considering TTL on external sources. This also matches the current behavior that sources instantiated in different streaming jobs has their own watermarks.

let backfill_offset = 0;
let backfill_completed = false;

next_event = select! {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require back fill executor has same parallelism as source executor?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerning that backfill a source may delay the barrier event. If we bias for source_stream, the backfill progress may not catch up with upstream and hurt freshness, if we bias for backfill_stream, the barrier will be queued in the source. So may be we need a customized round-robin select! here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the barrier should be passed directly (aka. "stealing")

This is same with our design of any other source - the barrier will always be injected whenever the barrier comes

Conceptually, a source is very different from a table or a sink in terms of whether it is instantiated. A source defined by a `CREATE SOURCE` statement is not an instance, it's just a set of metadata stored in the catalog until it's referenced by a materialized view, at which point a `SourceExecutor` is instantiated.

As a result, when creating multiple materialized views on top of one source, the `SourceExecutor` is not reused between them. It leads to multiple times of resource utilization and multiple consuming offsets, metrics, etc. Furthermore, it becomes a bug if the source definition uses some exclusive property, such as the consumer group of Kafka source.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include @xxchan's comment in this RFC: https://github.com/risingwavelabs/rfcs/pull/72/files#r1388947714

I think currently for N stream jobs on a source, we have N consumers. With this design, we have 1+N consumers during backfilling, and only 1 in steady-state after backfill is finished.

It makes it a lot clearer.

Suggested change
Consider that now, for N stream jobs on a source, we have N consumers. With this design, we have 1+N consumers during backfilling (1 source, N backfill), and only source executor in steady-state after backfill is finished.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update it to elaborate more the possible benefits.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants