Skip to content

Commit f3da37c

Browse files
committed
fix(WebWorker): Add zone support to MessageBus
Closes angular#4053
1 parent 3b9c086 commit f3da37c

35 files changed

+611
-348
lines changed

modules/angular2/src/core/zone/ng_zone.dart

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,15 @@ class NgZone {
215215
_inVmTurnDone = true;
216216
parent.run(_innerZone, _onTurnDone);
217217

218-
if (_pendingMicrotasks == 0 && _onEventDone != null) {
219-
runOutsideAngular(_onEventDone);
220-
}
221218
} finally {
222219
_inVmTurnDone = false;
223220
_hasExecutedCodeInInnerZone = false;
224221
}
225222
}
223+
224+
if (_pendingMicrotasks == 0 && _onEventDone != null) {
225+
runOutsideAngular(_onEventDone);
226+
}
226227
}
227228
}
228229
}

modules/angular2/src/core/zone/ng_zone.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export class NgZone {
100100
*
101101
* This hook is useful for validating application state (e.g. in a test).
102102
*/
103-
overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean): void {
103+
overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean = false): void {
104104
var normalizedOnEventDone = normalizeBlank(onEventDoneFn);
105105
if (opt_waitForAsync) {
106106
this._onEventDone = () => {
@@ -212,14 +212,15 @@ export class NgZone {
212212
try {
213213
this._inVmTurnDone = true;
214214
parentRun.call(ngZone._innerZone, ngZone._onTurnDone);
215-
if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) {
216-
ngZone.runOutsideAngular(ngZone._onEventDone);
217-
}
218215
} finally {
219216
this._inVmTurnDone = false;
220217
ngZone._hasExecutedCodeInInnerZone = false;
221218
}
222219
}
220+
221+
if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) {
222+
ngZone.runOutsideAngular(ngZone._onEventDone);
223+
}
223224
}
224225
}
225226
};
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
import {NgZone} from 'angular2/src/core/zone/ng_zone';
22

33
export class MockNgZone extends NgZone {
4+
_onEventDone: () => void;
5+
46
constructor() { super({enableLongStackTrace: false}); }
57

68
run(fn: Function): any { return fn(); }
79

810
runOutsideAngular(fn: Function): any { return fn(); }
11+
12+
overrideOnEventDone(fn: () => void, opt_waitForAsync: boolean = false): void {
13+
this._onEventDone = fn;
14+
}
15+
16+
simulateZoneExit(): void { this._onEventDone(); }
917
}
Lines changed: 72 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
library angular2.src.web_workers.debug_tools.multi_client_server_message_bus;
22

3-
import "package:angular2/src/web_workers/shared/message_bus.dart"
4-
show MessageBus, MessageBusSink, MessageBusSource;
53
import 'dart:io';
64
import 'dart:convert' show JSON;
75
import 'dart:async';
8-
import 'package:angular2/src/core/facade/async.dart' show EventEmitter;
96
import 'package:angular2/src/web_workers/shared/messaging_api.dart';
7+
import 'package:angular2/src/web_workers/shared/generic_message_bus.dart';
108

119
// TODO(jteplitz602): Remove hard coded result type and
1210
// clear messageHistory once app is done with it #3859
13-
class MultiClientServerMessageBus implements MessageBus {
14-
final MultiClientServerMessageBusSink sink;
15-
MultiClientServerMessageBusSource source;
11+
class MultiClientServerMessageBus extends GenericMessageBus {
1612
bool hasPrimary = false;
1713

18-
MultiClientServerMessageBus(this.sink, this.source);
14+
@override
15+
MultiClientServerMessageBusSink get sink => super.sink;
16+
@override
17+
MultiClientServerMessageBusSource get source => super.source;
18+
19+
MultiClientServerMessageBus(MultiClientServerMessageBusSink sink,
20+
MultiClientServerMessageBusSource source)
21+
: super(sink, source);
1922

2023
MultiClientServerMessageBus.fromHttpServer(HttpServer server)
21-
: sink = new MultiClientServerMessageBusSink() {
22-
source = new MultiClientServerMessageBusSource(resultReceived);
24+
: super(new MultiClientServerMessageBusSink(),
25+
new MultiClientServerMessageBusSource()) {
26+
source.onResult.listen(_resultReceived);
2327
server.listen((HttpRequest request) {
2428
if (request.uri.path == "/ws") {
2529
WebSocketTransformer.upgrade(request).then((WebSocket socket) {
@@ -38,18 +42,10 @@ class MultiClientServerMessageBus implements MessageBus {
3842
});
3943
}
4044

41-
void resultReceived() {
45+
void _resultReceived(_) {
4246
sink.resultReceived();
4347
}
4448

45-
EventEmitter from(String channel) {
46-
return source.from(channel);
47-
}
48-
49-
EventEmitter to(String channel) {
50-
return sink.to(channel);
51-
}
52-
5349
Function _handleDisconnect(WebSocketWrapper wrapper) {
5450
return () {
5551
sink.removeConnection(wrapper);
@@ -72,12 +68,15 @@ class WebSocketWrapper {
7268
WebSocketWrapper(this._messageHistory, this._resultMarkers, this.socket) {
7369
stream = socket.asBroadcastStream();
7470
stream.listen((encodedMessage) {
75-
var message = JSON.decode(encodedMessage)['message'];
76-
if (message is Map && message.containsKey("type")) {
77-
if (message['type'] == 'result') {
78-
resultReceived();
71+
var messages = JSON.decode(encodedMessage);
72+
messages.forEach((data) {
73+
var message = data['message'];
74+
if (message is Map && message.containsKey("type")) {
75+
if (message['type'] == 'result') {
76+
resultReceived();
77+
}
7978
}
80-
}
79+
});
8180
});
8281
}
8382

@@ -121,10 +120,9 @@ class WebSocketWrapper {
121120
}
122121
}
123122

124-
class MultiClientServerMessageBusSink implements MessageBusSink {
123+
class MultiClientServerMessageBusSink extends GenericMessageBusSink {
125124
final List<String> messageHistory = new List<String>();
126125
final Set<WebSocketWrapper> openConnections = new Set<WebSocketWrapper>();
127-
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
128126
final List<int> resultMarkers = new List<int>();
129127

130128
void resultReceived() {
@@ -141,76 +139,77 @@ class MultiClientServerMessageBusSink implements MessageBusSink {
141139
openConnections.remove(webSocket);
142140
}
143141

144-
EventEmitter to(String channel) {
145-
if (_channels.containsKey(channel)) {
146-
return _channels[channel];
147-
} else {
148-
var emitter = new EventEmitter();
149-
emitter.listen((message) {
150-
_send({'channel': channel, 'message': message});
151-
});
152-
return emitter;
153-
}
154-
}
155-
156-
void _send(dynamic message) {
157-
String encodedMessage = JSON.encode(message);
142+
@override
143+
void sendMessages(List<dynamic> messages) {
144+
String encodedMessages = JSON.encode(messages);
158145
openConnections.forEach((WebSocketWrapper webSocket) {
159146
if (webSocket.caughtUp) {
160-
webSocket.socket.add(encodedMessage);
147+
webSocket.socket.add(encodedMessages);
161148
}
162149
});
163-
messageHistory.add(encodedMessage);
150+
messageHistory.add(encodedMessages);
164151
}
165152
}
166153

167-
class MultiClientServerMessageBusSource implements MessageBusSource {
168-
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
154+
class MultiClientServerMessageBusSource extends GenericMessageBusSource {
169155
Function onResultReceived;
156+
final StreamController mainController;
157+
final StreamController resultController = new StreamController();
170158

171-
MultiClientServerMessageBusSource(this.onResultReceived);
159+
MultiClientServerMessageBusSource._(controller)
160+
: mainController = controller,
161+
super(controller.stream);
172162

173-
EventEmitter from(String channel) {
174-
if (_channels.containsKey(channel)) {
175-
return _channels[channel];
176-
} else {
177-
var emitter = new EventEmitter();
178-
_channels[channel] = emitter;
179-
return emitter;
180-
}
163+
factory MultiClientServerMessageBusSource() {
164+
return new MultiClientServerMessageBusSource._(
165+
new StreamController.broadcast());
181166
}
182167

168+
Stream get onResult => resultController.stream;
169+
183170
void addConnection(WebSocketWrapper webSocket) {
184171
if (webSocket.isPrimary) {
185-
webSocket.stream.listen((encodedMessage) {
186-
var decodedMessage = decodeMessage(encodedMessage);
187-
var channel = decodedMessage['channel'];
188-
var message = decodedMessage['message'];
189-
if (message is Map && message.containsKey("type")) {
190-
if (message['type'] == 'result') {
191-
// tell the bus that a result was received on the primary
192-
onResultReceived();
172+
webSocket.stream.listen((encodedMessages) {
173+
var decodedMessages = _decodeMessages(encodedMessages);
174+
decodedMessages.forEach((decodedMessage) {
175+
var message = decodedMessage['message'];
176+
if (message is Map && message.containsKey("type")) {
177+
if (message['type'] == 'result') {
178+
// tell the bus that a result was received on the primary
179+
resultController.add(message);
180+
}
193181
}
194-
}
182+
});
195183

196-
if (_channels.containsKey(channel)) {
197-
_channels[channel].add(message);
198-
}
184+
mainController.add(decodedMessages);
199185
});
200186
} else {
201-
webSocket.stream.listen((encodedMessage) {
202-
// handle events from non-primary browser
203-
var decodedMessage = decodeMessage(encodedMessage);
204-
var channel = decodedMessage['channel'];
205-
var message = decodedMessage['message'];
206-
if (_channels.containsKey(EVENT_CHANNEL) && channel == EVENT_CHANNEL) {
207-
_channels[channel].add(message);
187+
webSocket.stream.listen((encodedMessages) {
188+
// handle events from non-primary connection.
189+
var decodedMessages = _decodeMessages(encodedMessages);
190+
var eventMessages = new List<Map<String, dynamic>>();
191+
decodedMessages.forEach((decodedMessage) {
192+
var channel = decodedMessage['channel'];
193+
if (channel == EVENT_CHANNEL) {
194+
eventMessages.add(decodedMessage);
195+
}
196+
});
197+
if (eventMessages.length > 0) {
198+
mainController.add(eventMessages);
208199
}
209200
});
210201
}
211202
}
212203

213-
Map<String, dynamic> decodeMessage(dynamic message) {
214-
return JSON.decode(message);
204+
List<dynamic> _decodeMessages(dynamic messages) {
205+
return JSON.decode(messages);
206+
}
207+
208+
// This is a noop for the MultiClientBus because it has to decode the JSON messages before
209+
// the generic bus receives them in order to check for results and forward events
210+
// from the non-primary connection.
211+
@override
212+
List<dynamic> decodeMessages(dynamic messages) {
213+
return messages;
215214
}
216215
}

0 commit comments

Comments
 (0)