Skip to content

Commit ce127f0

Browse files
Implement connection draining feature
1 parent be13f0b commit ce127f0

File tree

1 file changed

+41
-8
lines changed

1 file changed

+41
-8
lines changed

src/Microsoft.AspNetCore.NodeServices/NodeServicesImpl.cs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@ namespace Microsoft.AspNetCore.NodeServices
1414
/// class will create a new instance and dispatch future calls to it, while keeping the old instance
1515
/// alive for a defined period so that any in-flight RPC calls can complete. This latter feature is
1616
/// analogous to the "connection draining" feature implemented by HTTP load balancers.
17-
///
18-
/// TODO: Implement everything in the preceding paragraph.
1917
/// </summary>
2018
/// <seealso cref="Microsoft.AspNetCore.NodeServices.INodeServices" />
2119
internal class NodeServicesImpl : INodeServices
2220
{
21+
private static TimeSpan ConnectionDrainingTimespan = TimeSpan.FromSeconds(15);
2322
private NodeServicesOptions _options;
2423
private Func<INodeInstance> _nodeInstanceFactory;
2524
private INodeInstance _currentNodeInstance;
2625
private object _currentNodeInstanceAccessLock = new object();
26+
private Exception _instanceDelayedDisposalException;
2727

2828
internal NodeServicesImpl(NodeServicesOptions options, Func<INodeInstance> nodeInstanceFactory)
2929
{
@@ -43,6 +43,7 @@ public Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionNa
4343

4444
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry)
4545
{
46+
ThrowAnyOutstandingDelayedDisposalException();
4647
var nodeInstance = GetOrCreateCurrentNodeInstance();
4748

4849
try
@@ -56,11 +57,13 @@ public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, st
5657
if (allowRetry && ex.NodeInstanceUnavailable)
5758
{
5859
// Perform the retry after clearing away the old instance
60+
// Since we disposal is delayed even though the node instance is replaced immediately, this produces the
61+
// "connection draining" feature whereby in-flight RPC calls are given a certain period to complete.
5962
lock (_currentNodeInstanceAccessLock)
6063
{
6164
if (_currentNodeInstance == nodeInstance)
6265
{
63-
DisposeNodeInstance(_currentNodeInstance);
66+
DisposeNodeInstance(_currentNodeInstance, delay: ConnectionDrainingTimespan);
6467
_currentNodeInstance = null;
6568
}
6669
}
@@ -83,17 +86,47 @@ public void Dispose()
8386
{
8487
if (_currentNodeInstance != null)
8588
{
86-
DisposeNodeInstance(_currentNodeInstance);
89+
DisposeNodeInstance(_currentNodeInstance, delay: TimeSpan.Zero);
8790
_currentNodeInstance = null;
8891
}
8992
}
9093
}
9194

92-
private static void DisposeNodeInstance(INodeInstance nodeInstance)
95+
private void DisposeNodeInstance(INodeInstance nodeInstance, TimeSpan delay)
96+
{
97+
if (delay == TimeSpan.Zero)
98+
{
99+
nodeInstance.Dispose();
100+
}
101+
else
102+
{
103+
Task.Run(async () => {
104+
try
105+
{
106+
await Task.Delay(delay);
107+
nodeInstance.Dispose();
108+
}
109+
catch(Exception ex)
110+
{
111+
// Nothing's waiting for the delayed disposal task, so any exceptions in it would
112+
// by default just get ignored. To make these discoverable, capture them here so
113+
// they can be rethrown to the next caller to InvokeExportAsync.
114+
_instanceDelayedDisposalException = ex;
115+
}
116+
});
117+
}
118+
}
119+
120+
private void ThrowAnyOutstandingDelayedDisposalException()
93121
{
94-
// TODO: Implement delayed disposal for connection draining
95-
// Or consider having the delayedness of it being a responsibility of the INodeInstance
96-
nodeInstance.Dispose();
122+
if (_instanceDelayedDisposalException != null)
123+
{
124+
var ex = _instanceDelayedDisposalException;
125+
_instanceDelayedDisposalException = null;
126+
throw new AggregateException(
127+
"A previous attempt to dispose a Node instance failed. See InnerException for details.",
128+
ex);
129+
}
97130
}
98131

99132
private INodeInstance GetOrCreateCurrentNodeInstance()

0 commit comments

Comments
 (0)