Skip to content

Commit 41eba4f

Browse files
authored
Merge pull request dotnet#293 from danielcweber/OptimizeObservableToTask
Reduce number of allocations (closures, delegates) in TaskObservableExtensions.ToTask
2 parents 91b4800 + 7c02407 commit 41eba4f

File tree

1 file changed

+56
-39
lines changed

1 file changed

+56
-39
lines changed

Rx.NET/Source/System.Reactive.Linq/Reactive/Threading/Tasks/TaskObservableExtensions.cs

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,60 @@ public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable
287287
return observable.ToTask(cancellationToken, null);
288288
}
289289

290+
private sealed class ToTaskObserver<TResult> : IObserver<TResult>
291+
{
292+
private readonly CancellationToken _ct;
293+
private readonly IDisposable _disposable;
294+
private readonly TaskCompletionSource<TResult> _tcs;
295+
private readonly CancellationTokenRegistration _ctr = default(CancellationTokenRegistration);
296+
297+
private bool _hasValue;
298+
private TResult _lastValue;
299+
300+
public ToTaskObserver(TaskCompletionSource<TResult> tcs, IDisposable disposable, CancellationToken ct)
301+
{
302+
this._ct = ct;
303+
this._tcs = tcs;
304+
this._disposable = disposable;
305+
306+
if (ct.CanBeCanceled)
307+
{
308+
this._ctr = ct.Register(this.Cancel);
309+
}
310+
}
311+
312+
public void OnNext(TResult value)
313+
{
314+
this._hasValue = true;
315+
this._lastValue = value;
316+
}
317+
318+
public void OnError(Exception error)
319+
{
320+
this._tcs.TrySetException(error);
321+
322+
this._ctr.Dispose(); // no null-check needed (struct)
323+
this._disposable.Dispose();
324+
}
325+
326+
public void OnCompleted()
327+
{
328+
if (this._hasValue)
329+
this._tcs.TrySetResult(this._lastValue);
330+
else
331+
this._tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
332+
333+
this._ctr.Dispose(); // no null-check needed (struct)
334+
this._disposable.Dispose();
335+
}
336+
337+
private void Cancel()
338+
{
339+
this._disposable.Dispose();
340+
this._tcs.TrySetCanceled(this._ct);
341+
}
342+
}
343+
290344
/// <summary>
291345
/// Returns a task that will receive the last value or the exception produced by the observable sequence.
292346
/// </summary>
@@ -301,49 +355,12 @@ public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable
301355
if (observable == null)
302356
throw new ArgumentNullException(nameof(observable));
303357

304-
var hasValue = false;
305-
var lastValue = default(TResult);
306-
307358
var tcs = new TaskCompletionSource<TResult>(state);
308359

309360
var disposable = new SingleAssignmentDisposable();
310361

311-
var ctr = default(CancellationTokenRegistration);
312-
313-
if (cancellationToken.CanBeCanceled)
314-
{
315-
ctr = cancellationToken.Register(() =>
316-
{
317-
disposable.Dispose();
318-
tcs.TrySetCanceled(cancellationToken);
319-
});
320-
}
321-
322-
var taskCompletionObserver = new AnonymousObserver<TResult>(
323-
value =>
324-
{
325-
hasValue = true;
326-
lastValue = value;
327-
},
328-
ex =>
329-
{
330-
tcs.TrySetException(ex);
331-
332-
ctr.Dispose(); // no null-check needed (struct)
333-
disposable.Dispose();
334-
},
335-
() =>
336-
{
337-
if (hasValue)
338-
tcs.TrySetResult(lastValue);
339-
else
340-
tcs.TrySetException(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
341-
342-
ctr.Dispose(); // no null-check needed (struct)
343-
disposable.Dispose();
344-
}
345-
);
346-
362+
var taskCompletionObserver = new ToTaskObserver<TResult>(tcs, disposable, cancellationToken);
363+
347364
//
348365
// Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable
349366
// will already have been disposed. Upon assignment, the disposable resource being set will be disposed on the

0 commit comments

Comments
 (0)