Skip to content

Commit 0b59e66

Browse files
committed
feat(WebWorker) Add channel support to MessageBus
closes angular#3661 and angular#3686
1 parent 104302a commit 0b59e66

36 files changed

+1148
-746
lines changed

modules/angular2/src/facade/async.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,13 @@ class ObservableWrapper {
8181
}
8282

8383
class EventEmitter extends Stream {
84-
StreamController<String> _controller;
84+
StreamController<dynamic> _controller;
8585

8686
EventEmitter() {
8787
_controller = new StreamController.broadcast();
8888
}
8989

90-
StreamSubscription listen(void onData(String line),
90+
StreamSubscription listen(void onData(dynamic line),
9191
{void onError(Error error), void onDone(), bool cancelOnError}) {
9292
return _controller.stream.listen(onData,
9393
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
library angular2.src.web_workers.shared.isolate_message_bus;
2+
3+
import 'dart:isolate';
4+
import 'dart:async';
5+
import 'dart:core';
6+
import 'package:angular2/src/web-workers/shared/message_bus.dart'
7+
show MessageBus, MessageBusSink, MessageBusSource;
8+
import 'package:angular2/src/facade/async.dart';
9+
10+
class IsolateMessageBus implements MessageBus {
11+
final IsolateMessageBusSink sink;
12+
final IsolateMessageBusSource source;
13+
14+
IsolateMessageBus(IsolateMessageBusSink sink, IsolateMessageBusSource source)
15+
: sink = sink,
16+
source = source;
17+
18+
EventEmitter from(String channel) {
19+
return source.from(channel);
20+
}
21+
22+
EventEmitter to(String channel) {
23+
return sink.to(channel);
24+
}
25+
}
26+
27+
class IsolateMessageBusSink implements MessageBusSink {
28+
final SendPort _port;
29+
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
30+
31+
IsolateMessageBusSink(SendPort port) : _port = port;
32+
33+
EventEmitter to(String channel) {
34+
if (_channels.containsKey(channel)) {
35+
return _channels[channel];
36+
} else {
37+
var emitter = new EventEmitter();
38+
emitter.listen((message) {
39+
_port.send({'channel': channel, 'message': message});
40+
});
41+
_channels[channel] = emitter;
42+
return emitter;
43+
}
44+
}
45+
}
46+
47+
class IsolateMessageBusSource extends MessageBusSource {
48+
final Stream rawDataStream;
49+
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
50+
51+
IsolateMessageBusSource(ReceivePort port)
52+
: rawDataStream = port.asBroadcastStream() {
53+
rawDataStream.listen((message) {
54+
if (message is SendPort){
55+
return;
56+
}
57+
58+
if (message.containsKey("channel")) {
59+
var channel = message['channel'];
60+
if (_channels.containsKey(channel)) {
61+
_channels[channel].add(message['message']);
62+
}
63+
}
64+
});
65+
}
66+
67+
EventEmitter from(String channel) {
68+
if (_channels.containsKey(channel)) {
69+
return _channels[channel];
70+
} else {
71+
var emitter = new EventEmitter();
72+
_channels[channel] = emitter;
73+
return emitter;
74+
}
75+
}
76+
}
Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,52 @@
1-
// TODO(jteplitz602) to be idiomatic these should be releated to Observable's or Streams
1+
import {EventEmitter} from 'angular2/src/facade/async';
2+
import {BaseException} from 'angular2/src/facade/lang';
3+
// TODO(jteplitz602): Replace both the interface and the exported class with an abstract class #3683
4+
5+
function _abstract() {
6+
throw new BaseException("This method is abstract");
7+
}
8+
29
/**
3-
* Message Bus is a low level API used to communicate between the UI and the worker.
4-
* It smooths out the differences between Javascript's postMessage and Dart's Isolate
5-
* allowing you to work with one consistent API.
10+
* Message Bus is a low level API used to communicate between the UI and the background.
11+
* Communication is based on a channel abstraction. Messages published in a
12+
* given channel to one MessageBusSink are received on the same channel
13+
* by the corresponding MessageBusSource.
14+
* TODO(jteplitz602): This should just extend both the source and the sink once
15+
* https://github.com/angular/ts2dart/issues/263 is closed.
616
*/
7-
export interface MessageBus {
8-
sink: MessageBusSink;
9-
source: MessageBusSource;
10-
}
17+
export interface MessageBusInterface {
18+
/**
19+
* Returns an {@link EventEmitter} that emits every time a messsage
20+
* is received on the given channel.
21+
*/
22+
from(channel: string): EventEmitter;
1123

12-
export interface SourceListener {
13-
(data: any): void; // TODO: Replace this any type with the type of a real messaging protocol
24+
/**
25+
* Returns an {@link EventEmitter} for the given channel
26+
* To publish methods to that channel just call next (or add in dart) on the returned emitter
27+
*/
28+
to(channel: string): EventEmitter;
1429
}
1530

1631
export interface MessageBusSource {
1732
/**
18-
* Attaches the SourceListener to this source.
19-
* The SourceListener will get called whenever the bus receives a message
20-
* Returns a listener id that can be passed to {removeListener}
33+
* Returns an {@link EventEmitter} that emits every time a messsage
34+
* is received on the given channel.
2135
*/
22-
addListener(fn: SourceListener): number;
23-
removeListener(index: number);
36+
from(channel: string): EventEmitter;
2437
}
2538

26-
export interface MessageBusSink { send(message: Object): void; }
39+
export interface MessageBusSink {
40+
/**
41+
* Returns an {@link EventEmitter} for the given channel
42+
* To publish methods to that channel just call next (or add in dart) on the returned emitter
43+
*/
44+
to(channel: string): EventEmitter;
45+
}
46+
47+
// TODO(jteplitz602): Remove this class once we have abstract classes #3683
48+
export class MessageBus implements MessageBusInterface {
49+
from(channel: string): EventEmitter { throw _abstract(); }
50+
51+
to(channel: string): EventEmitter { throw _abstract(); }
52+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/**
2+
* All channels used by angular's WebWorker components are listed here.
3+
* You should not use these channels in your application code.
4+
*/
5+
export const SETUP_CHANNEL = "ng-WebWorkerSetup";
6+
export const RENDER_COMPILER_CHANNEL = "ng-RenderCompiler";
7+
export const RENDERER_CHANNEL = "ng-Renderer";
8+
export const XHR_CHANNEL = "ng-XHR";
9+
export const EVENT_CHANNEL = "ng-events";
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// PostMessageBus can't be implemented in dart since dart doesn't use postMessage
2+
// This file is only here to prevent ts2dart from trying to transpile the PostMessageBus
3+
library angular2.src.web_workers.shared.post_message_bus;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import {
2+
MessageBusInterface,
3+
MessageBusSource,
4+
MessageBusSink
5+
} from "angular2/src/web-workers/shared/message_bus";
6+
import {EventEmitter} from 'angular2/src/facade/async';
7+
import {StringMap, StringMapWrapper} from 'angular2/src/facade/collection';
8+
import {Injectable} from "angular2/di";
9+
10+
/**
11+
* A TypeScript implementation of {@link MessageBus} for communicating via JavaScript's
12+
* postMessage API.
13+
*/
14+
@Injectable()
15+
export class PostMessageBus implements MessageBusInterface {
16+
constructor(private _sink: PostMessageBusSink, private _source: PostMessageBusSource) {}
17+
18+
from(channel: string): EventEmitter { return this._source.from(channel); }
19+
20+
to(channel: string): EventEmitter { return this._sink.to(channel); }
21+
}
22+
23+
export class PostMessageBusSink implements MessageBusSink {
24+
private _channels: StringMap<string, EventEmitter> = StringMapWrapper.create();
25+
26+
constructor(private _postMessageTarget: PostMessageTarget) {}
27+
28+
public to(channel: string): EventEmitter {
29+
if (StringMapWrapper.contains(this._channels, channel)) {
30+
return this._channels[channel];
31+
} else {
32+
var emitter = new EventEmitter();
33+
emitter.observer({
34+
next: (message: Object) => {
35+
this._postMessageTarget.postMessage({channel: channel, message: message});
36+
}
37+
});
38+
return emitter;
39+
}
40+
}
41+
}
42+
43+
export class PostMessageBusSource implements MessageBusSource {
44+
private _channels: StringMap<string, EventEmitter> = StringMapWrapper.create();
45+
46+
constructor(eventTarget?: EventTarget) {
47+
if (eventTarget) {
48+
eventTarget.addEventListener("message", (ev: MessageEvent) => this._handleMessage(ev));
49+
} else {
50+
// if no eventTarget is given we assume we're in a WebWorker and listen on the global scope
51+
addEventListener("message", (ev: MessageEvent) => this._handleMessage(ev));
52+
}
53+
}
54+
55+
private _handleMessage(ev: MessageEvent) {
56+
var data = ev.data;
57+
var channel = data.channel;
58+
if (StringMapWrapper.contains(this._channels, channel)) {
59+
this._channels[channel].next(data.message);
60+
}
61+
}
62+
63+
public from(channel: string): EventEmitter {
64+
if (StringMapWrapper.contains(this._channels, channel)) {
65+
return this._channels[channel];
66+
} else {
67+
var emitter = new EventEmitter();
68+
this._channels[channel] = emitter;
69+
return emitter;
70+
}
71+
}
72+
}
73+
74+
// TODO(jteplitz602) Replace this with the definition in lib.webworker.d.ts(#3492)
75+
export interface PostMessageTarget { postMessage: (message: any, transfer?:[ArrayBuffer]) => void; }

modules/angular2/src/web-workers/ui/application.dart

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import 'dart:isolate';
44
import 'dart:async';
55
import 'dart:core';
66
import 'package:angular2/src/web-workers/shared/message_bus.dart'
7-
show MessageBus, MessageBusSink, MessageBusSource;
7+
show MessageBus;
88
import 'package:angular2/src/web-workers/ui/impl.dart' show bootstrapUICommon;
9+
import 'package:angular2/src/web-workers/shared/isolate_message_bus.dart';
910

1011
/**
1112
* Bootstrapping a WebWorker
@@ -23,63 +24,22 @@ Future<MessageBus> bootstrap(String uri) {
2324
/**
2425
* To be called from the main thread to spawn and communicate with the worker thread
2526
*/
26-
Future<UIMessageBus> spawnWebWorker(Uri uri) {
27+
Future<MessageBus> spawnWebWorker(Uri uri) {
2728
var receivePort = new ReceivePort();
2829
var isolateEndSendPort = receivePort.sendPort;
2930
return Isolate.spawnUri(uri, const [], isolateEndSendPort).then((_) {
3031
var source = new UIMessageBusSource(receivePort);
3132
return source.sink.then((sendPort) {
32-
var sink = new UIMessageBusSink(sendPort);
33-
return new UIMessageBus(sink, source);
33+
var sink = new IsolateMessageBusSink(sendPort);
34+
return new IsolateMessageBus(sink, source);
3435
});
3536
});
3637
}
3738

38-
class UIMessageBus extends MessageBus {
39-
final UIMessageBusSink sink;
40-
final UIMessageBusSource source;
41-
42-
UIMessageBus(UIMessageBusSink sink, UIMessageBusSource source)
43-
: sink = sink,
44-
source = source;
45-
}
46-
47-
class UIMessageBusSink extends MessageBusSink {
48-
final SendPort _port;
49-
50-
UIMessageBusSink(SendPort port) : _port = port;
51-
52-
void send(message) {
53-
_port.send(message);
54-
}
55-
}
56-
57-
class UIMessageBusSource extends MessageBusSource {
58-
final ReceivePort _port;
59-
final Stream rawDataStream;
60-
Map<int, StreamSubscription> _listenerStore =
61-
new Map<int, StreamSubscription>();
62-
int _numListeners = 0;
63-
64-
UIMessageBusSource(ReceivePort port)
65-
: _port = port,
66-
rawDataStream = port.asBroadcastStream();
39+
class UIMessageBusSource extends IsolateMessageBusSource {
40+
UIMessageBusSource(ReceivePort port) : super(port);
6741

6842
Future<SendPort> get sink => rawDataStream.firstWhere((message) {
6943
return message is SendPort;
7044
});
71-
72-
int addListener(Function fn) {
73-
var subscription = rawDataStream.listen((message) {
74-
fn({"data": message});
75-
});
76-
77-
_listenerStore[++_numListeners] = subscription;
78-
return _numListeners;
79-
}
80-
81-
void removeListener(int index) {
82-
_listenerStore[index].cancel();
83-
_listenerStore.remove(index);
84-
}
8545
}
Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import {
2-
MessageBus,
3-
MessageBusSource,
4-
MessageBusSink,
5-
SourceListener
6-
} from "angular2/src/web-workers/shared/message_bus";
2+
PostMessageBus,
3+
PostMessageBusSink,
4+
PostMessageBusSource
5+
} from 'angular2/src/web-workers/shared/post_message_bus';
6+
import {MessageBus} from 'angular2/src/web-workers/shared/message_bus';
77
import {BaseException} from "angular2/src/facade/lang";
88
import {bootstrapUICommon} from "angular2/src/web-workers/ui/impl";
99

@@ -23,33 +23,7 @@ export function bootstrap(uri: string): MessageBus {
2323

2424
export function spawnWebWorker(uri: string): MessageBus {
2525
var webWorker: Worker = new Worker(uri);
26-
return new UIMessageBus(new UIMessageBusSink(webWorker), new UIMessageBusSource(webWorker));
27-
}
28-
29-
export class UIMessageBus implements MessageBus {
30-
constructor(public sink: UIMessageBusSink, public source: UIMessageBusSource) {}
31-
}
32-
33-
export class UIMessageBusSink implements MessageBusSink {
34-
constructor(private _webWorker: Worker) {}
35-
36-
send(message: Object): void { this._webWorker.postMessage(message); }
37-
}
38-
39-
export class UIMessageBusSource implements MessageBusSource {
40-
private _listenerStore: Map<int, SourceListener> = new Map<int, SourceListener>();
41-
private _numListeners: int = 0;
42-
43-
constructor(private _webWorker: Worker) {}
44-
45-
public addListener(fn: SourceListener): int {
46-
this._webWorker.addEventListener("message", fn);
47-
this._listenerStore[++this._numListeners] = fn;
48-
return this._numListeners;
49-
}
50-
51-
public removeListener(index: int): void {
52-
removeEventListener("message", this._listenerStore[index]);
53-
this._listenerStore.delete(index);
54-
}
26+
var sink = new PostMessageBusSink(webWorker);
27+
var source = new PostMessageBusSource(webWorker);
28+
return new PostMessageBus(sink, source);
5529
}

0 commit comments

Comments
 (0)