Skip to content

Commit e7ce0b0

Browse files
committed
Always subscribe to all ACKs on all servers
Prior to this fix, Groups.Add would fail to complete when there was no subscription configured to receive the ACK on the server Groups.Add was called on.
1 parent 57691b9 commit e7ce0b0

File tree

14 files changed

+440
-188
lines changed

14 files changed

+440
-188
lines changed

src/Microsoft.AspNet.SignalR.Core/DefaultDependencyResolver.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ private void RegisterDefaultServices()
6363
var ackHandler = new Lazy<AckHandler>();
6464
Register(typeof(IAckHandler), () => ackHandler.Value);
6565

66+
var serverMessageHandler = new Lazy<AckSubscriber>(() => new AckSubscriber(this));
67+
Register(typeof(AckSubscriber), () => serverMessageHandler.Value);
68+
6669
var perfCounterWriter = new Lazy<PerformanceCounterManager>(() => new PerformanceCounterManager(this));
6770
Register(typeof(IPerformanceCounterManager), () => perfCounterWriter.Value);
6871

src/Microsoft.AspNet.SignalR.Core/Hubs/HubDispatcher.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,7 @@ protected override IList<string> GetSignals(string userId, string connectionId)
408408
})
409409
.Concat(new[]
410410
{
411-
PrefixHelper.GetConnectionId(connectionId),
412-
PrefixHelper.GetAck(connectionId)
411+
PrefixHelper.GetConnectionId(connectionId)
413412
});
414413

415414
return signals.ToList();
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.md in the project root for license information.
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.IO;
6+
using System.Threading.Tasks;
7+
using Microsoft.AspNet.SignalR.Messaging;
8+
9+
namespace Microsoft.AspNet.SignalR.Infrastructure
10+
{
11+
/// <summary>
12+
/// A singleton that subscribes to all ACKs sent over the
13+
/// <see cref="Microsoft.AspNet.SignalR.Messaging.IMessageBus"/> and
14+
/// triggers any corresponding ACKs on the <see cref="IAckHandler"/>.
15+
/// </summary>
16+
internal class AckSubscriber : ISubscriber, IDisposable
17+
{
18+
private readonly IMessageBus _messageBus;
19+
private readonly IAckHandler _ackHandler;
20+
private IDisposable _subscription;
21+
22+
private const int MaxMessages = 10;
23+
24+
private static readonly string[] ServerSignals = new[] { Signal };
25+
26+
public AckSubscriber(IDependencyResolver resolver) :
27+
this(resolver.Resolve<IMessageBus>(),
28+
resolver.Resolve<IAckHandler>())
29+
{
30+
}
31+
32+
public AckSubscriber(IMessageBus messageBus, IAckHandler ackHandler)
33+
{
34+
_messageBus = messageBus;
35+
_ackHandler = ackHandler;
36+
37+
Identity = Guid.NewGuid().ToString();
38+
39+
ProcessMessages();
40+
}
41+
42+
// The signal for all signalr servers
43+
public const string Signal = "__SIGNALR__SERVER__";
44+
45+
public IList<string> EventKeys
46+
{
47+
get { return ServerSignals; }
48+
}
49+
50+
public event Action<ISubscriber, string> EventKeyAdded
51+
{
52+
add { }
53+
remove { }
54+
}
55+
56+
public event Action<ISubscriber, string> EventKeyRemoved
57+
{
58+
add { }
59+
remove { }
60+
}
61+
62+
public Action<TextWriter> WriteCursor { get; set; }
63+
64+
public string Identity { get; private set; }
65+
66+
public Subscription Subscription { get; set; }
67+
68+
public void Dispose()
69+
{
70+
if (_subscription != null)
71+
{
72+
_subscription.Dispose();
73+
}
74+
}
75+
76+
private void ProcessMessages()
77+
{
78+
// Process messages that come from the bus for servers
79+
_subscription = _messageBus.Subscribe(this, cursor: null, callback: TriggerAcks, maxMessages: MaxMessages, state: null);
80+
}
81+
82+
private Task<bool> TriggerAcks(MessageResult result, object state)
83+
{
84+
result.Messages.Enumerate<object>(m => m.IsAck,
85+
(s, m) => ((IAckHandler)s).TriggerAck(m.CommandId),
86+
state: _ackHandler);
87+
88+
return TaskAsyncHelper.True;
89+
}
90+
}
91+
}

src/Microsoft.AspNet.SignalR.Core/Infrastructure/Connection.cs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public string Identity
9494
}
9595
}
9696

97-
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode", Justification = "Used for debugging purposes.")]
9897
private TraceSource Trace
9998
{
10099
get
@@ -299,34 +298,27 @@ private bool ExcludeMessage(Message message)
299298

300299
private void ProcessResults(MessageResult result)
301300
{
302-
result.Messages.Enumerate<Connection>(message => message.IsAck || message.IsCommand,
301+
result.Messages.Enumerate<Connection>(message => message.IsCommand,
303302
(connection, message) => ProcessResultsCore(connection, message), this);
304303
}
305304

306305
private static void ProcessResultsCore(Connection connection, Message message)
307306
{
308307
if (message.IsAck)
309308
{
310-
connection._ackHandler.TriggerAck(message.CommandId);
309+
connection.Trace.TraceError("Connection {0} received an unexpected ACK message.", connection.Identity);
310+
return;
311311
}
312-
else if (message.IsCommand)
313-
{
314-
var command = connection._serializer.Parse<Command>(message.Value, message.Encoding);
315-
connection.ProcessCommand(command);
316312

317-
// Only send the ack if this command is waiting for it
318-
if (message.WaitForAck)
319-
{
320-
// If we're on the same box and there's a pending ack for this command then
321-
// just trip it
322-
if (!connection._ackHandler.TriggerAck(message.CommandId))
323-
{
324-
connection._bus.Ack(
325-
acker: connection._connectionId,
326-
waiter: message.Source,
327-
commandId: message.CommandId).Catch(connection._traceSource);
328-
}
329-
}
313+
var command = connection._serializer.Parse<Command>(message.Value, message.Encoding);
314+
connection.ProcessCommand(command);
315+
316+
// Only send the ack if this command is waiting for it
317+
if (message.WaitForAck)
318+
{
319+
connection._bus.Ack(
320+
acker: connection._connectionId,
321+
commandId: message.CommandId).Catch(connection._traceSource);
330322
}
331323
}
332324

src/Microsoft.AspNet.SignalR.Core/Infrastructure/ConnectionManager.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ internal Connection GetConnectionCore(string connectionName)
120120
{
121121
IList<string> signals = connectionName == null ? ListHelper<string>.Empty : new[] { connectionName };
122122

123+
// Ensure that this server is listening for any ACKs sent over the bus.
124+
// This is important in case there are any calls to Groups.Add on a context.
125+
_resolver.Resolve<AckSubscriber>();
126+
123127
// Give this a unique id
124128
var connectionId = Guid.NewGuid().ToString();
125129
return new Connection(_resolver.Resolve<IMessageBus>(),

src/Microsoft.AspNet.SignalR.Core/Infrastructure/PrefixHelper.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ internal static class PrefixHelper
2121

2222
// Both
2323
internal const string ConnectionIdPrefix = "c-";
24-
internal const string AckPrefix = "ack-";
2524

2625
public static bool HasGroupPrefix(string value)
2726
{
@@ -64,11 +63,6 @@ public static string GetPersistentConnectionName(string connectionName)
6463
return PersistentConnectionPrefix + connectionName;
6564
}
6665

67-
public static string GetAck(string connectionId)
68-
{
69-
return AckPrefix + connectionId;
70-
}
71-
7266
public static IList<string> GetPrefixedConnectionIds(IList<string> connectionIds)
7367
{
7468
if (connectionIds.Count == 0)

src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBusExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ public static Task Publish(this IMessageBus bus, string source, string key, stri
3131
return bus.Publish(new Message(source, key, value));
3232
}
3333

34-
internal static Task Ack(this IMessageBus bus, string acker, string waiter, string commandId)
34+
internal static Task Ack(this IMessageBus bus, string acker, string commandId)
3535
{
3636
// Prepare the ack
37-
var message = new Message(acker, PrefixHelper.GetAck(waiter), null);
37+
var message = new Message(acker, AckSubscriber.Signal, null);
3838
message.CommandId = commandId;
3939
message.IsAck = true;
4040
return bus.Publish(message);

src/Microsoft.AspNet.SignalR.Core/Microsoft.AspNet.SignalR.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@
270270
<Compile Include="Transports\ITransportConnection.cs" />
271271
<Compile Include="Json\JsonUtility.cs" />
272272
<Compile Include="GroupManager.cs" />
273+
<Compile Include="Infrastructure\AckSubscriber.cs" />
273274
<Compile Include="Messaging\Command.cs" />
274275
<Compile Include="Resources.Designer.cs">
275276
<AutoGen>True</AutoGen>

src/Microsoft.AspNet.SignalR.Core/PersistentConnection.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public virtual void Initialize(IDependencyResolver resolver)
5959
_configurationManager = resolver.Resolve<IConfigurationManager>();
6060
_transportManager = resolver.Resolve<ITransportManager>();
6161

62+
// Ensure that this server is listening for any ACKs sent over the bus.
63+
resolver.Resolve<AckSubscriber>();
64+
6265
_initialized = true;
6366
}
6467

@@ -393,12 +396,10 @@ private IList<string> GetDefaultSignals(string userId, string connectionId)
393396
// The list of default signals this connection cares about:
394397
// 1. The default signal (the type name)
395398
// 2. The connection id (so we can message this particular connection)
396-
// 3. Ack signal
397399

398400
return new string[] {
399401
DefaultSignal,
400-
PrefixHelper.GetConnectionId(connectionId),
401-
PrefixHelper.GetAck(connectionId)
402+
PrefixHelper.GetConnectionId(connectionId)
402403
};
403404
}
404405

tests/Microsoft.AspNet.SignalR.FunctionalTests/Microsoft.AspNet.SignalR.FunctionalTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
<Compile Include="Properties\AssemblyInfo.cs" />
9191
<Compile Include="Server\Connections\ConnectionFacts.cs" />
9292
<Compile Include="Server\Connections\DisconnectFacts.cs" />
93+
<Compile Include="Server\Connections\FarmFacts.cs" />
9394
<Compile Include="Server\Connections\PersistentConnectionFacts.cs" />
9495
<Compile Include="Server\Connections\ReconnectFacts.cs" />
9596
<Compile Include="Server\Hubs\GetHubContextFacts.cs" />

0 commit comments

Comments
 (0)