Skip to content

RFC: Refined S3 Source#76

Merged
tabVersion merged 3 commits intomainfrom
tab/refined-s3-source
Oct 18, 2023
Merged

RFC: Refined S3 Source#76
tabVersion merged 3 commits intomainfrom
tab/refined-s3-source

Conversation

@tabVersion
Copy link
Copy Markdown
Contributor

@tabVersion tabVersion commented Sep 8, 2023

a combination of #22 and #74

risingwavelabs/risingwave#11391

Comment on lines +65 to +68
* Task queue is used to store the filenames that are not fetched yet.
* Task queue is initialized by `select * from state_table where partition in local_vnode`. It just reads all things in the local vnode.
* When receiving new filenames from the message stream, it will push the filenames to the task queue.
* State cache is used to store the filenames that are fetched but not written to hummock yet.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two questions here:

  1. How many state tables are maintained in Fech executor? What are the schemas?
  2. What does the state transition look like for each file?

Here are my thoughts:

  • Two state tables "Task Queue" and "File State" are maintained. The former is the partial index of the later one, meaning that files in "Task Queue" must be present in "File State". Schemas:
"Task Queue"
| file_name | read_offset | enqueue_epoch? (can be used to sort tasks on recovery) |

"File State"
| file_name (bloom_filter_key) | metadata (size/enqueue_epoch/finish_epoch/...) | 
  • State transitions for a file (these are logical stated and may not all exist in the implementation)
    • INIT: file name is received from list executor or is seen after reading from state store on recovery
    • INIT -> FINISH: file name is present in "File State" and absent in "Task Queue"
    • INIT -> IN_PRORESS: file name is absent in "File State". Insert into "File State" and "Task Queue".
    • IN_PROGRESS -> IN_PROGRESS: read_offset updated. Update read_offset "Task Queue".
    • IN_PROGRESS -> FINISH: finish reading the file. Delete from "Task Queue" and Update finish_epoch in "File State"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for fetch, we just need one state table. Here is the schema.

| filename | read offset | status | 

In my design, the task queue is in memory.
Every time we insert a new file, it will be (filename, NULL, INIT) and for reading file, the row will be (filename, offset, GOING | DONE)

I think we don't need to maintain the order of each file in the task queue.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main benefit of having a persistent state of received but unfinished files is that we don't need to do a full table scan on the state table on recovery. In other words, this extra state table is an index. More files will be inserted into the file state table and the states never be cleaned up. Therefore, I exepct that recovery will become slower over time without the extra index.

The order is just a side-benefit of persisting the task queue and I agree the order doesn't affect correctness.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I underestimated the overhead of doing a table scan. Then, your solution fits the scene best for you. We can keep the task queue at the thousand-row level with pagination mechanism.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I underestimated the overhead of doing a table scan. Then, your solution fits the scene best for you. We can keep the task queue at the thousand-row level with pagination mechanism.

SGTM.

Comment on lines +71 to +72
* When receiving a barrier, it will write the state cache to hummock and clear the state cache. (please note here we require
the hummock not to perform update on insert the same key)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please note here we require the hummock not to perform update on insert the same key

Why is this the case? Can you explain more?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list executor has no grantee on filenames never duplicate.
For example, file_1 has been done and it has been materialized to the state table. But somehow list passed file_1 again, and fetch exec will enqueue file_1 to the task queue and write it to the state table as a new file when a barrier comes. The default behavior of state table is to overwrite when meeting the same pk and it will reset file_1 to the init stage.
Or we may do a point get every time receive a new file to check whether it has been in the state table.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we may do a point get every time receive a new file to check whether it has been in the state table.

+1 for this approach since it is more straight-fordward. Also, I think fetch anyway need to do a point get when it receives a file from list to determine whether it should be put into the task queue or it should be ignored.

Let's say they are `list` and `fetch`, aligning the name in [RFC: S3 Source with SQS](https://github.com/risingwavelabs/rfcs/pull/22),
and they do the same thing.

* `List` is unique in one source plan. It is responsible for keep listing all files and push the filenames to the downstream.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which layer does List reside in?

  • A new kind of connector (split) in the source executor, which persists its state in the offset.
  • A new executor.

## List and Fetch

So I am proposing a new solution, which is to use two separate executors to complete the job.
Let's say they are `list` and `fetch`, aligning the name in [RFC: S3 Source with SQS](https://github.com/risingwavelabs/rfcs/pull/22),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we expose this two-phase implementation to users? In other words, how will user create an S3 source?

  • By create source file_names (..) and then create materialized view s3_files as select fetch(file_name),
  • or create source s3_files (..) directly? And in this way, how many streaming jobs will we have? 1 or 2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for create source, we will only init the list part and fetch is accompanied by the following mvs

Comment on lines +48 to +49
The `list` executor will keep listing the files in S3 bucket and try its best to de-duplicate the files (but no guarantee).
It will record the timestamp when doing the last listing and filter out the files that are created before the timestamp.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask where the duplication comes from?

Copy link
Copy Markdown
Contributor Author

@tabVersion tabVersion Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list is going to keep querying, so the same file will be listed repeatedly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but we'll filter the files with the created/modified date. Is it possible that the same file is included in multiple list responses?

And we can utilize the idea of [RFC: Refine S3 Source: List Objects in A Lazy Way](https://github.com/risingwavelabs/rfcs/pull/74),
not to list all pages at once, but to list the pages one by one, preventing overload downstream.

`list` executor will keep the max page `p` number as well as the last list timestamp `ts` in its state store.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask how can we find the files being deleted?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. list will only handle the new added ones. If fetch meets an invalid file path, it will be discarded.

@fuyufjh
Copy link
Copy Markdown
Contributor

fuyufjh commented Sep 15, 2023

Some thoughts about reusing the source (#72)

  • Need to read a snapshot from the filename : read offset table, as the workload for FileSourceBackfillExecutor
  • Need to support hidden system column: file name and offset

@tabVersion tabVersion merged commit 8cd318c into main Oct 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants