Skip to content

Commit 24eba2a

Browse files
authored
Merge pull request dotnet#247 from akarnokd/WithLatestFromFix
Fix WithLatestFrom potential memory visibility issues
2 parents bd48f48 + eb639bb commit 24eba2a

File tree

1 file changed

+22
-3
lines changed

1 file changed

+22
-3
lines changed

Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/WithLatestFrom.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#if !NO_PERF
66
using System;
77
using System.Reactive.Disposables;
8+
using System.Threading;
89

910
namespace System.Reactive.Linq.ObservableImpl
1011
{
@@ -42,9 +43,12 @@ public _(WithLatestFrom<TFirst, TSecond, TResult> parent, IObserver<TResult> obs
4243
private volatile bool _hasLatest;
4344
private TSecond _latest;
4445

46+
private object _latestGate;
47+
4548
public IDisposable Run()
4649
{
4750
_gate = new object();
51+
_latestGate = new object();
4852

4953
var sndSubscription = new SingleAssignmentDisposable();
5054

@@ -88,11 +92,19 @@ public void OnNext(TFirst value)
8892
{
8993
if (_parent._hasLatest) // Volatile read
9094
{
95+
96+
TSecond latest;
97+
98+
lock (_parent._latestGate)
99+
{
100+
latest = _parent._latest;
101+
}
102+
91103
var res = default(TResult);
92104

93105
try
94106
{
95-
res = _parent._parent._resultSelector(value, _parent._latest);
107+
res = _parent._parent._resultSelector(value, latest);
96108
}
97109
catch (Exception ex)
98110
{
@@ -140,8 +152,15 @@ public void OnError(Exception error)
140152

141153
public void OnNext(TSecond value)
142154
{
143-
_parent._latest = value;
144-
_parent._hasLatest = true; // Volatile write
155+
lock (_parent._latestGate)
156+
{
157+
_parent._latest = value;
158+
}
159+
160+
if (!_parent._hasLatest)
161+
{
162+
_parent._hasLatest = true;
163+
}
145164
}
146165
}
147166
}

0 commit comments

Comments
 (0)