|
7 | 7 | import com.koushikdutta.async.callback.CompletedCallback;
|
8 | 8 | import com.koushikdutta.async.future.Cancellable;
|
9 | 9 | import com.koushikdutta.async.future.DependentCancellable;
|
| 10 | +import com.koushikdutta.async.future.FutureCallback; |
| 11 | +import com.koushikdutta.async.future.TransformFuture; |
10 | 12 | import com.koushikdutta.async.http.AsyncHttpClient;
|
11 | 13 | import com.koushikdutta.async.http.AsyncHttpResponse;
|
12 | 14 | import com.koushikdutta.async.http.WebSocket;
|
@@ -85,66 +87,63 @@ public void disconnect(SocketIOClient client) {
|
85 | 87 | webSocket = null;
|
86 | 88 | }
|
87 | 89 |
|
| 90 | + Cancellable connecting; |
88 | 91 | void reconnect(final DependentCancellable child) {
|
89 | 92 | if (isConnected()) {
|
90 | 93 | return;
|
91 | 94 | }
|
92 | 95 |
|
| 96 | + // if a connection is in progress, just wait. |
| 97 | + if (connecting != null && !connecting.isDone() && !connecting.isCancelled()) { |
| 98 | + if (child != null) |
| 99 | + child.setParent(connecting); |
| 100 | + return; |
| 101 | + } |
| 102 | + |
93 | 103 | request.logi("Reconnecting socket.io");
|
94 | 104 |
|
95 | 105 | // dont invoke onto main handler, as it is unnecessary until a session is ready or failed
|
96 | 106 | request.setHandler(null);
|
97 |
| - // initiate a session |
98 |
| - Cancellable cancel = httpClient.executeString(request, new AsyncHttpClient.StringCallback() { |
| 107 | + |
| 108 | + Cancellable connecting = httpClient.executeString(request) |
| 109 | + .then(new TransformFuture<WebSocket, String>() { |
99 | 110 | @Override
|
100 |
| - public void onCompleted(final Exception e, AsyncHttpResponse response, String result) { |
101 |
| - request.logi("socket.io session received"); |
| 111 | + protected void transform(String result) throws Exception { |
| 112 | + String[] parts = result.split(":"); |
| 113 | + String session = parts[0]; |
| 114 | + if (!"".equals(parts[1])) |
| 115 | + heartbeat = Integer.parseInt(parts[1]) / 2 * 1000; |
| 116 | + else |
| 117 | + heartbeat = 0; |
| 118 | + |
| 119 | + String transportsLine = parts[3]; |
| 120 | + String[] transports = transportsLine.split(","); |
| 121 | + HashSet<String> set = new HashSet<String>(Arrays.asList(transports)); |
| 122 | + if (!set.contains("websocket")) |
| 123 | + throw new Exception("websocket not supported"); |
| 124 | + |
| 125 | + final String sessionUrl = request.getUri().toString() + "websocket/" + session + "/"; |
| 126 | + |
| 127 | + httpClient.websocket(sessionUrl, null, null) |
| 128 | + .setCallback(getCompletionCallback()); |
| 129 | + } |
| 130 | + }) |
| 131 | + .setCallback(new FutureCallback<WebSocket>() { |
| 132 | + @Override |
| 133 | + public void onCompleted(Exception e, WebSocket result) { |
102 | 134 | if (e != null) {
|
103 | 135 | reportDisconnect(e);
|
104 | 136 | return;
|
105 | 137 | }
|
106 | 138 |
|
107 |
| - try { |
108 |
| - String[] parts = result.split(":"); |
109 |
| - String session = parts[0]; |
110 |
| - if (!"".equals(parts[1])) |
111 |
| - heartbeat = Integer.parseInt(parts[1]) / 2 * 1000; |
112 |
| - else |
113 |
| - heartbeat = 0; |
114 |
| - |
115 |
| - String transportsLine = parts[3]; |
116 |
| - String[] transports = transportsLine.split(","); |
117 |
| - HashSet<String> set = new HashSet<String>(Arrays.asList(transports)); |
118 |
| - if (!set.contains("websocket")) |
119 |
| - throw new Exception("websocket not supported"); |
120 |
| - |
121 |
| - final String sessionUrl = request.getUri().toString() + "websocket/" + session + "/"; |
122 |
| - |
123 |
| - Cancellable cancel = httpClient.websocket(sessionUrl, null, new AsyncHttpClient.WebSocketConnectCallback() { |
124 |
| - @Override |
125 |
| - public void onCompleted(Exception ex, WebSocket webSocket) { |
126 |
| - if (ex != null) { |
127 |
| - reportDisconnect(ex); |
128 |
| - return; |
129 |
| - } |
130 |
| - |
131 |
| - reconnectDelay = 1000L; |
132 |
| - SocketIOConnection.this.webSocket = webSocket; |
133 |
| - attach(); |
134 |
| - } |
135 |
| - }); |
136 |
| - |
137 |
| - if (child != null) |
138 |
| - child.setParent(cancel); |
139 |
| - } |
140 |
| - catch (Exception ex) { |
141 |
| - reportDisconnect(ex); |
142 |
| - } |
| 139 | + reconnectDelay = 1000L; |
| 140 | + SocketIOConnection.this.webSocket = result; |
| 141 | + attach(); |
143 | 142 | }
|
144 | 143 | });
|
145 | 144 |
|
146 | 145 | if (child != null)
|
147 |
| - child.setParent(cancel); |
| 146 | + child.setParent(connecting); |
148 | 147 | }
|
149 | 148 |
|
150 | 149 | void setupHeartbeat() {
|
|
0 commit comments