-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Immediately close channel when subscribers cancel #1459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
try { | ||
future.done(); | ||
} catch (Exception t) { | ||
// Never propagate exception once we know we are done. | ||
logger.debug(t.getMessage(), t); | ||
logger.error(t.getMessage(), t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why raise logging level to error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed like a pretty severe error to me - but I can reduce it back to debug.
} | ||
|
||
// The subscriber cancelled early - this channel is dead and should be closed. | ||
channelManager.releaseChannelLock(future.getPartitionKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Releasing the lock is already done on future.done() above. You're most likely double releasing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
channelManager.releaseChannelLock(future.getPartitionKey()); | ||
channelManager.closeChannel(channel); | ||
|
||
channel.pipeline().remove(StreamedResponsePublisher.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we close the channel, I think it's useless to remove the publisher from the pipeline. The channel will be garbage collected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
This commit includes two changes: - More aggressively mark the ResponseFuture as done to avoid threading issues that still read the old state - Directly invoke channel closing, since there is no guarantee of another channelRead occuring (e.g. if the stream was hard closed)
Addressed all comments by @slandelle (from this PR and master version) |
try { | ||
future.done(); | ||
} catch (Exception t) { | ||
// Never propagate exception once we know we are done. | ||
logger.debug(t.getMessage(), t); | ||
} | ||
|
||
// The subscriber cancelled early - this channel is dead and should be closed. | ||
channelManager.closeChannel(channel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why you moved this after completing the future?
I think it should be moved back where it was first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually yes. There are other places that read the channel which check the future status.
In the previous version, it was possible that channelManager.closeChannel was invoked, the scheduler switches over to another thread, and now that other thread tries to read from the channel that is currently being closed (because its future.isDone check returned false)
Pushing the future.done() further up will avoid that potential threading issue.
This commit includes two changes: - More aggressively mark the ResponseFuture as done to avoid threading issues that still read the old state - Directly invoke channel closing, since there is no guarantee of another channelRead occuring (e.g. if the stream was hard closed)
Cherry-picked. Release 2.0.36 is under way. |
This commit includes two changes:
to avoid threading issues that still read the old
state
no guarantee of another channelRead occuring (e.g.
if the stream was hard closed)