-
Notifications
You must be signed in to change notification settings - Fork 65
Stream larger data #1262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Stream larger data #1262
Conversation
| exc.sent.code if hasattr(exc, "sent") and exc.sent else None | ||
| ) | ||
|
|
||
| if close_code == 1009: # MESSAGE_TOO_BIG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the best way to handle this, but it's at least better than silently reconnecting in a loop and trying to read the same message thats too large over and over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed.
I'm not sure where we want to land on this ultimately. I'm inclined to be strict at first because we can always add affordances later. It's harder to take them away.
Logging an error describing what to do (as you have) and then permanently disconnecting feels like a safe, strict initial implementation. We can grow from there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the end-of-run consumers handle MESSAGE_TO_BIG if we do a permanent disconnect?
The process running the consumer stops, Then we notice the problem because the processing job didn't start, then someone searches the logs to find the error, updates the consumer code and then starts the service again?
Or maybe we just set the max_size to be really large so that we only get an error log when the data is too large for the consumer to handle?
I'm thinking that we should have a solid theory about how the end-of-run consumers would work with whatever design choice we make here, because I think the plan is to retire Kafka in the next cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed we need a solid plan for those.
I think that end-of-run consumers will be subscribed to containers (monitoring for runs stop) and so the messages they receive will always be small. We will subscribe to data (arrays, tables) within the Prefect jobs themselves. Those should have high, reasonable limits. If the limits are hit, the jobs will fail, and that point it will be good for a person to investigate whether the limit is too low or (perhaps more likely) something unexpected occurred.
7149e8b to
5bf6c20
Compare
| def subscribe( | ||
| self, | ||
| executor: Optional[concurrent.futures.Executor] = None, | ||
| max_size: int = 1_000_000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the design trade-offs between specifying this here and specifying it in Subscription.start()?
I remember that we initially required the start parameter here, and then we moved it from __init__ to start because we don't technically need to know it until start, and this gives the caller a little more flexibility. The code that constructs the Subscription may not be the code that runs it. (We have an example of this in napari-tiled.) Only the code that runs it and forms that WS connection needs to know the max_size.
I could also imagine potentially eventually loosening the requirement that a Subscription can only be started once per instance. If we did relax that, this would be convenient:
sub = x.subscribe()
x.child_created.add_callback(print)
x.start()
# success for a while and then...
# Error! Message to large.
# Try again without remaking the Subscription.
x.start(max_size=1_000_000_000)
There might be good reasons to not support restart-able Subscriptions. I wouldn't suggest doing that now, but if we later did specifying the max_size as late as possible would pay off.
Are there reasons to leave max_size where it is, in __init__?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your reasoning, I think start is better. I don't have a good reason for it to stay in __init__
Add a max_size param for subscriptions so that clients can specify that they want to receive larger data.
This also improves the client behavior when receiving a message that is too large.
I think we ignore the codecov failures because the Starlette TestClient.websocket_connect() doesn't support
max_size.I tested this on a local tiled and it has the behavior that I was going for.
Any thoughts about what the client behavior should be when receiving data that is too large?