-
Notifications
You must be signed in to change notification settings - Fork 1.5k
refactor(storage): refactor MultiRangeDownloader #13524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @tritone, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a substantial architectural overhaul to the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
fa95482 to
bf70b70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request presents a significant and well-structured refactoring of the MultiRangeDownloader. The new design, which utilizes an event loop (actor model), is an excellent approach for managing concurrency and state, resulting in more robust and comprehensible logic. The clear separation of concerns among the manager, sessions, and commands is commendable.
However, I have identified a critical issue where a blocking io.Writer could stall the entire downloader. Additionally, there are a couple of medium-severity issues concerning API design and code duplication that, if addressed, would further enhance the resilience and maintainability of this new implementation.
storage/grpc_reader_multi_range.go
Outdated
|
|
||
| content := dataRange.GetChecksummedData().GetContent() | ||
| req.bytesWritten += int64(len(content)) | ||
| _, err := req.output.Write(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The req.output.Write(content) call is executed within the manager's single-threaded event loop. If a user provides an io.Writer that blocks (e.g., writing to a slow network connection or a full pipe), it will stall the entire event loop. This prevents the processing of any other events, such as receiving more data from the gRPC stream, handling new Add requests, or closing the downloader. This can lead to performance degradation, deadlocks, and may cause the stream's receive buffer to fill up, creating backpressure on the server.
To resolve this, the Write operation should be performed asynchronously, outside of the event loop. A potential solution is to use a dedicated writer goroutine for each range request that consumes data from a channel. The event loop would then push data chunks to the appropriate channel without blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this offline, it is the caller's responsibility to ensure a non-blocking writer. We can consider this as an option later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even a non-blocking writer will take some time to write though - if this is blocking Add() calls in the meantime it may be noticeable performance degradation for certain workloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely worth investigating but if we are just writing messages to in-memory buffers it will be trivial. For file io I agree it will be a bigger deal perhaps.
| func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult) { | ||
| m.currentSession = nil | ||
| err := result.err | ||
|
|
||
| if result.redirect != nil { | ||
| m.readSpec.RoutingToken = result.redirect.RoutingToken | ||
| m.readSpec.ReadHandle = result.redirect.ReadHandle | ||
| if ensureErr := m.ensureSession(m.ctx); ensureErr != nil { | ||
| if !m.isRetryable(ensureErr) { | ||
| m.permanentErr = ensureErr | ||
| m.attrsOnce.Do(func() { close(m.attrsReady) }) | ||
| m.failAllPending(m.permanentErr) | ||
| } | ||
| } | ||
| } else if m.isRetryable(err) { | ||
| if len(m.pendingRanges) > 0 { | ||
| if ensureErr := m.ensureSession(m.ctx); ensureErr != nil { | ||
| if !m.isRetryable(ensureErr) { | ||
| m.permanentErr = ensureErr | ||
| m.attrsOnce.Do(func() { close(m.attrsReady) }) | ||
| m.failAllPending(m.permanentErr) | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| if !errors.Is(err, context.Canceled) && !errors.Is(err, errClosed) { | ||
| if m.permanentErr == nil { | ||
| m.permanentErr = err | ||
| } | ||
| } else if m.permanentErr == nil { | ||
| m.permanentErr = errClosed | ||
| } | ||
| m.failAllPending(m.permanentErr) | ||
| m.attrsOnce.Do(func() { close(m.attrsReady) }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for handling a non-retryable error from ensureSession is duplicated within both the if result.redirect != nil block and the else if m.isRetryable(err) block. This repetition makes the code more difficult to read and maintain.
This complex error-handling logic could be simplified by refactoring the duplicated code into a common path that is executed after the if/else if chain. This would improve code clarity and reduce the chance of introducing bugs in the future.
func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult) {
m.currentSession = nil
err := result.err
var ensureErr error
if result.redirect != nil {
m.readSpec.RoutingToken = result.redirect.RoutingToken
m.readSpec.ReadHandle = result.redirect.ReadHandle
ensureErr = m.ensureSession(m.ctx)
} else if m.isRetryable(err) {
if len(m.pendingRanges) > 0 {
ensureErr = m.ensureSession(m.ctx)
}
} else {
// Non-retryable error from the stream itself.
if !errors.Is(err, context.Canceled) && !errors.Is(err, errClosed) {
if m.permanentErr == nil {
m.permanentErr = err
}
} else if m.permanentErr == nil {
m.permanentErr = errClosed
}
m.failAllPending(m.permanentErr)
m.attrsOnce.Do(func() { close(m.attrsReady) })
return
}
// Handle error from ensureSession.
if ensureErr != nil && !m.isRetryable(ensureErr) {
m.permanentErr = ensureErr
m.attrsOnce.Do(func() { close(m.attrsReady) })
m.failAllPending(m.permanentErr)
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I think there is a bit of code duplication here. Can be simplified a bit I think.
Complete rewrite of storage.MultiRangeDownloader. The new design should be more resilient to concurrency issues, deadlocks, retries, etc.
bf70b70 to
34ac27c
Compare
BrennaEpp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments/questions
| } | ||
| m.readIDCounter++ | ||
|
|
||
| // Attributes should be ready if we are processing Add commands |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also check a case where if offset is greater than the object size, the range should be failed and not added to the stream. Otherwise, a permanent error will be set on the MRD when server gives a Out of Range error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of a tricky case because it is possible if there is another concurrent writer to the object, the Size will be out-of-date and these calls will in fact succeed. I don't think we validate this in the existing MRD code and it's on the caller to decide how to handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but the MRD works with the single version of the object which is what we should stick to ? Without this validation we would set permanent error if there is any one invalid range provided. (not getting data for any valid range provided after this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MRD is supposed to support an object that grows; see the tailing reads example: https://github.com/GoogleCloudPlatform/golang-samples/blob/main/storage/rapid/read_appendable_object_tail.go
Can you check with the GCSFuse team on the expected behavior here? I know they have logic in their code to recover from these types of permanent errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will confirm with them, if required can be fixed in a subsequent PR
| func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult) { | ||
| m.currentSession = nil | ||
| err := result.err | ||
|
|
||
| if result.redirect != nil { | ||
| m.readSpec.RoutingToken = result.redirect.RoutingToken | ||
| m.readSpec.ReadHandle = result.redirect.ReadHandle | ||
| if ensureErr := m.ensureSession(m.ctx); ensureErr != nil { | ||
| if !m.isRetryable(ensureErr) { | ||
| m.permanentErr = ensureErr | ||
| m.attrsOnce.Do(func() { close(m.attrsReady) }) | ||
| m.failAllPending(m.permanentErr) | ||
| } | ||
| } | ||
| } else if m.isRetryable(err) { | ||
| if len(m.pendingRanges) > 0 { | ||
| if ensureErr := m.ensureSession(m.ctx); ensureErr != nil { | ||
| if !m.isRetryable(ensureErr) { | ||
| m.permanentErr = ensureErr | ||
| m.attrsOnce.Do(func() { close(m.attrsReady) }) | ||
| m.failAllPending(m.permanentErr) | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| if !errors.Is(err, context.Canceled) && !errors.Is(err, errClosed) { | ||
| if m.permanentErr == nil { | ||
| m.permanentErr = err | ||
| } | ||
| } else if m.permanentErr == nil { | ||
| m.permanentErr = errClosed | ||
| } | ||
| m.failAllPending(m.permanentErr) | ||
| m.attrsOnce.Do(func() { close(m.attrsReady) }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I think there is a bit of code duplication here. Can be simplified a bit I think.
Complete rewrite of storage.MultiRangeDownloader. The new design should be more resilient to concurrency issues, deadlocks, retries, etc.