ackCallback);
}
diff --git a/src/main/java/com/corundumstudio/socketio/ClientOperations.java b/src/main/java/com/corundumstudio/socketio/ClientOperations.java
index bbd48d46a..207c360c0 100644
--- a/src/main/java/com/corundumstudio/socketio/ClientOperations.java
+++ b/src/main/java/com/corundumstudio/socketio/ClientOperations.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/Configuration.java b/src/main/java/com/corundumstudio/socketio/Configuration.java
index 63e59bd81..c29539ed0 100644
--- a/src/main/java/com/corundumstudio/socketio/Configuration.java
+++ b/src/main/java/com/corundumstudio/socketio/Configuration.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,17 +15,18 @@
*/
package com.corundumstudio.socketio;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.List;
-
import com.corundumstudio.socketio.handler.SuccessAuthorizationListener;
import com.corundumstudio.socketio.listener.DefaultExceptionListener;
import com.corundumstudio.socketio.listener.ExceptionListener;
-import com.corundumstudio.socketio.protocol.JacksonJsonSupport;
import com.corundumstudio.socketio.protocol.JsonSupport;
import com.corundumstudio.socketio.store.MemoryStoreFactory;
import com.corundumstudio.socketio.store.StoreFactory;
+import io.netty.handler.codec.http.HttpDecoderConfig;
+
+import javax.net.ssl.KeyManagerFactory;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
public class Configuration {
@@ -44,6 +45,7 @@ public class Configuration {
private int upgradeTimeout = 10000;
private int pingTimeout = 60000;
private int pingInterval = 25000;
+ private int firstDataTimeout = 5000;
private int maxHttpContentLength = 64 * 1024;
private int maxFramePayloadLength = 64 * 1024;
@@ -58,17 +60,21 @@ public class Configuration {
private InputStream keyStore;
private String keyStorePassword;
+ private String allowHeaders;
+
private String trustStoreFormat = "JKS";
private InputStream trustStore;
private String trustStorePassword;
+ private String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
+
private boolean preferDirectBuffer = true;
private SocketConfig socketConfig = new SocketConfig();
private StoreFactory storeFactory = new MemoryStoreFactory();
- private JsonSupport jsonSupport = new JacksonJsonSupport();
+ private JsonSupport jsonSupport;
private AuthorizationListener authorizationListener = new SuccessAuthorizationListener();
@@ -78,13 +84,25 @@ public class Configuration {
private String origin;
+ private boolean enableCors = true;
+
+ private boolean httpCompression = true;
+
+ private boolean websocketCompression = true;
+
+ private boolean randomSession = false;
+
+ private boolean needClientAuth = false;
+
+ private HttpRequestDecoderConfiguration httpRequestDecoderConfiguration = new HttpRequestDecoderConfiguration();
+
public Configuration() {
}
/**
* Defend from further modifications by cloning
*
- * @param configuration - Configuration object to clone
+ * @param conf - Configuration object to clone
*/
Configuration(Configuration conf) {
setBossThreads(conf.getBossThreads());
@@ -93,10 +111,26 @@ public Configuration() {
setPingInterval(conf.getPingInterval());
setPingTimeout(conf.getPingTimeout());
+ setFirstDataTimeout(conf.getFirstDataTimeout());
setHostname(conf.getHostname());
setPort(conf.getPort());
+ if (conf.getJsonSupport() == null) {
+ try {
+ getClass().getClassLoader().loadClass("com.fasterxml.jackson.databind.ObjectMapper");
+ try {
+ Class> jjs = getClass().getClassLoader().loadClass("com.corundumstudio.socketio.protocol.JacksonJsonSupport");
+ JsonSupport js = (JsonSupport) jjs.getConstructor().newInstance();
+ conf.setJsonSupport(js);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Can't find jackson lib in classpath", e);
+ }
+ }
+
setJsonSupport(new JsonSupportWrapper(conf.getJsonSupport()));
setContext(conf.getContext());
setAllowCustomRequests(conf.isAllowCustomRequests());
@@ -107,8 +141,9 @@ public Configuration() {
setTrustStore(conf.getTrustStore());
setTrustStoreFormat(conf.getTrustStoreFormat());
setTrustStorePassword(conf.getTrustStorePassword());
+ setKeyManagerFactoryAlgorithm(conf.getKeyManagerFactoryAlgorithm());
- setTransports(conf.getTransports().toArray(new Transport[conf.getTransports().size()]));
+ setTransports(conf.getTransports().toArray(new Transport[0]));
setMaxHttpContentLength(conf.getMaxHttpContentLength());
setPackagePrefix(conf.getPackagePrefix());
@@ -123,7 +158,15 @@ public Configuration() {
setAddVersionHeader(conf.isAddVersionHeader());
setOrigin(conf.getOrigin());
+ setEnableCors(conf.isEnableCors());
+ setAllowHeaders(conf.getAllowHeaders());
setSSLProtocol(conf.getSSLProtocol());
+
+ setHttpCompression(conf.isHttpCompression());
+ setWebsocketCompression(conf.isWebsocketCompression());
+ setRandomSession(conf.randomSession);
+ setNeedClientAuth(conf.isNeedClientAuth());
+ setHttpRequestDecoderConfiguration(conf.getHttpRequestDecoderConfiguration());
}
public JsonSupport getJsonSupport() {
@@ -134,7 +177,7 @@ public JsonSupport getJsonSupport() {
* Allows to setup custom implementation of
* JSON serialization/deserialization
*
- * @param jsonSupport
+ * @param jsonSupport - json mapper
*
* @see JsonSupport
*/
@@ -150,7 +193,7 @@ public String getHostname() {
* Optional parameter. If not set then bind address
* will be 0.0.0.0 or ::0
*
- * @param hostname
+ * @param hostname - name of host
*/
public void setHostname(String hostname) {
this.hostname = hostname;
@@ -180,7 +223,7 @@ public void setWorkerThreads(int workerThreads) {
/**
* Ping interval
*
- * @param value - time in milliseconds
+ * @param heartbeatIntervalSecs - time in milliseconds
*/
public void setPingInterval(int heartbeatIntervalSecs) {
this.pingInterval = heartbeatIntervalSecs;
@@ -193,7 +236,7 @@ public int getPingInterval() {
* Ping timeout
* Use 0 to disable it
*
- * @param value - time in milliseconds
+ * @param heartbeatTimeoutSecs - time in milliseconds
*/
public void setPingTimeout(int heartbeatTimeoutSecs) {
this.pingTimeout = heartbeatTimeoutSecs;
@@ -231,7 +274,7 @@ public void setAllowCustomRequests(boolean allowCustomRequests) {
/**
* SSL key store password
*
- * @param keyStorePassword
+ * @param keyStorePassword - password of key store
*/
public void setKeyStorePassword(String keyStorePassword) {
this.keyStorePassword = keyStorePassword;
@@ -243,7 +286,7 @@ public String getKeyStorePassword() {
/**
* SSL key store stream, maybe appointed to any source
*
- * @param keyStore
+ * @param keyStore - key store input stream
*/
public void setKeyStore(InputStream keyStore) {
this.keyStore = keyStore;
@@ -255,7 +298,7 @@ public InputStream getKeyStore() {
/**
* Key store format
*
- * @param keyStoreFormat
+ * @param keyStoreFormat - key store format
*/
public void setKeyStoreFormat(String keyStoreFormat) {
this.keyStoreFormat = keyStoreFormat;
@@ -267,7 +310,7 @@ public String getKeyStoreFormat() {
/**
* Set maximum http content length limit
*
- * @param maxContentLength
+ * @param value
* the maximum length of the aggregated http content.
*/
public void setMaxHttpContentLength(int value) {
@@ -329,7 +372,7 @@ public boolean isPreferDirectBuffer() {
* Data store - used to store session data and implements distributed pubsub.
* Default is {@code MemoryStoreFactory}
*
- * @param storeFactory - implements StoreFactory
+ * @param clientStoreFactory - implements StoreFactory
*
* @see com.corundumstudio.socketio.store.MemoryStoreFactory
* @see com.corundumstudio.socketio.store.RedissonStoreFactory
@@ -344,7 +387,7 @@ public StoreFactory getStoreFactory() {
/**
* Authorization listener invoked on every handshake.
- * Accepts or denies a client by {@code AuthorizationListener.isAuthorized} method.
+ * Accepts or denies a client by {@code AuthorizationListener.getAuthorizationResult} method.
* Accepts all clients by default.
*
* @param authorizationListener - authorization listener itself
@@ -362,7 +405,7 @@ public AuthorizationListener getAuthorizationListener() {
* Exception listener invoked on any exception in
* SocketIO listener
*
- * @param exceptionListener
+ * @param exceptionListener - listener
*
* @see com.corundumstudio.socketio.listener.ExceptionListener
*/
@@ -379,7 +422,7 @@ public SocketConfig getSocketConfig() {
/**
* TCP socket configuration
*
- * @param socketConfig
+ * @param socketConfig - config
*/
public void setSocketConfig(SocketConfig socketConfig) {
this.socketConfig = socketConfig;
@@ -391,7 +434,7 @@ public void setSocketConfig(SocketConfig socketConfig) {
*
* @see AckMode
*
- * @param ackMode
+ * @param ackMode - ack mode
*/
public void setAckMode(AckMode ackMode) {
this.ackMode = ackMode;
@@ -422,10 +465,18 @@ public void setTrustStorePassword(String trustStorePassword) {
this.trustStorePassword = trustStorePassword;
}
+ public String getKeyManagerFactoryAlgorithm() {
+ return keyManagerFactoryAlgorithm;
+ }
+ public void setKeyManagerFactoryAlgorithm(String keyManagerFactoryAlgorithm) {
+ this.keyManagerFactoryAlgorithm = keyManagerFactoryAlgorithm;
+ }
+
+
/**
* Set maximum websocket frame content length limit
*
- * @param maxContentLength
+ * @param maxFramePayloadLength - length
*/
public void setMaxFramePayloadLength(int maxFramePayloadLength) {
this.maxFramePayloadLength = maxFramePayloadLength;
@@ -437,7 +488,7 @@ public int getMaxFramePayloadLength() {
/**
* Transport upgrade timeout in milliseconds
*
- * @param upgradeTimeout
+ * @param upgradeTimeout - upgrade timeout
*/
public void setUpgradeTimeout(int upgradeTimeout) {
this.upgradeTimeout = upgradeTimeout;
@@ -448,9 +499,10 @@ public int getUpgradeTimeout() {
/**
* Adds Server header with lib version to http response.
+ *
* Default is true
*
- * @param addVersionHeader
+ * @param addVersionHeader - true to add header
*/
public void setAddVersionHeader(boolean addVersionHeader) {
this.addVersionHeader = addVersionHeader;
@@ -466,18 +518,35 @@ public boolean isAddVersionHeader() {
*
* If value is null then request ORIGIN header value used.
*
- * @param origin
+ * @param origin - origin
*/
public void setOrigin(String origin) {
this.origin = origin;
}
+
public String getOrigin() {
return origin;
}
+ /**
+ * cors dispose
+ *
+ * Default is true
+ *
+ * @param enableCors enableCors
+ */
+ public void setEnableCors(boolean enableCors) {
+ this.enableCors = enableCors;
+ }
+
+ public boolean isEnableCors() {
+ return enableCors;
+ }
+
public boolean isUseLinuxNativeEpoll() {
return useLinuxNativeEpoll;
}
+
public void setUseLinuxNativeEpoll(boolean useLinuxNativeEpoll) {
this.useLinuxNativeEpoll = useLinuxNativeEpoll;
}
@@ -485,7 +554,7 @@ public void setUseLinuxNativeEpoll(boolean useLinuxNativeEpoll) {
/**
* Set the name of the requested SSL protocol
*
- * @param sslProtocol
+ * @param sslProtocol - name of protocol
*/
public void setSSLProtocol(String sslProtocol) {
this.sslProtocol = sslProtocol;
@@ -495,4 +564,96 @@ public String getSSLProtocol() {
}
+ /**
+ * Set the response Access-Control-Allow-Headers
+ * @param allowHeaders - allow headers
+ * */
+ public void setAllowHeaders(String allowHeaders) {
+ this.allowHeaders = allowHeaders;
+ }
+ public String getAllowHeaders() {
+ return allowHeaders;
+ }
+
+ /**
+ * Timeout between channel opening and first data transfer
+ * Helps to avoid 'silent channel' attack and prevents
+ * 'Too many open files' problem in this case
+ *
+ * @param firstDataTimeout - timeout value
+ */
+ public void setFirstDataTimeout(int firstDataTimeout) {
+ this.firstDataTimeout = firstDataTimeout;
+ }
+ public int getFirstDataTimeout() {
+ return firstDataTimeout;
+ }
+
+ /**
+ * Activate http protocol compression. Uses {@code gzip} or
+ * {@code deflate} encoding choice depends on the {@code "Accept-Encoding"} header value.
+ *
+ * Default is true
+ *
+ * @param httpCompression - true to use http compression
+ */
+ public void setHttpCompression(boolean httpCompression) {
+ this.httpCompression = httpCompression;
+ }
+ public boolean isHttpCompression() {
+ return httpCompression;
+ }
+
+ /**
+ * Activate websocket protocol compression.
+ * Uses {@code permessage-deflate} encoding only.
+ *
+ * Default is true
+ *
+ * @param websocketCompression - true to use websocket compression
+ */
+ public void setWebsocketCompression(boolean websocketCompression) {
+ this.websocketCompression = websocketCompression;
+ }
+ public boolean isWebsocketCompression() {
+ return websocketCompression;
+ }
+
+ public boolean isRandomSession() {
+ return randomSession;
+ }
+
+ public void setRandomSession(boolean randomSession) {
+ this.randomSession = randomSession;
+ }
+
+ /**
+ * Enable/disable client authentication.
+ * Has no effect unless a trust store has been provided.
+ *
+ * Default is false
+ *
+ * @param needClientAuth - true to use client authentication
+ */
+ public void setNeedClientAuth(boolean needClientAuth) {
+ this.needClientAuth = needClientAuth;
+ }
+ public boolean isNeedClientAuth() {
+ return needClientAuth;
+ }
+
+ public HttpRequestDecoderConfiguration getHttpRequestDecoderConfiguration() {
+ return httpRequestDecoderConfiguration;
+ }
+
+ public void setHttpRequestDecoderConfiguration(HttpRequestDecoderConfiguration httpRequestDecoderConfiguration) {
+ this.httpRequestDecoderConfiguration = httpRequestDecoderConfiguration;
+ }
+
+ public HttpDecoderConfig getHttpDecoderConfig() {
+ return new HttpDecoderConfig()
+ .setMaxInitialLineLength(httpRequestDecoderConfiguration.getMaxInitialLineLength())
+ .setMaxHeaderSize(httpRequestDecoderConfiguration.getMaxHeaderSize())
+ .setMaxChunkSize(httpRequestDecoderConfiguration.getMaxChunkSize());
+ }
}
diff --git a/src/main/java/com/corundumstudio/socketio/Disconnectable.java b/src/main/java/com/corundumstudio/socketio/Disconnectable.java
index 39840965e..83111b1fd 100644
--- a/src/main/java/com/corundumstudio/socketio/Disconnectable.java
+++ b/src/main/java/com/corundumstudio/socketio/Disconnectable.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/DisconnectableHub.java b/src/main/java/com/corundumstudio/socketio/DisconnectableHub.java
index be4e13d09..e7794d56a 100644
--- a/src/main/java/com/corundumstudio/socketio/DisconnectableHub.java
+++ b/src/main/java/com/corundumstudio/socketio/DisconnectableHub.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/HandshakeData.java b/src/main/java/com/corundumstudio/socketio/HandshakeData.java
index 6b475c503..9ed241efb 100644
--- a/src/main/java/com/corundumstudio/socketio/HandshakeData.java
+++ b/src/main/java/com/corundumstudio/socketio/HandshakeData.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,49 +21,80 @@
import java.util.List;
import java.util.Map;
+import io.netty.handler.codec.http.HttpHeaders;
+
public class HandshakeData implements Serializable {
private static final long serialVersionUID = 1196350300161819978L;
- private Map> headers;
+ private HttpHeaders headers;
private InetSocketAddress address;
private Date time = new Date();
+ private InetSocketAddress local;
private String url;
private Map> urlParams;
private boolean xdomain;
+ private Object authToken;
+ // needed for correct deserialization
public HandshakeData() {
}
- public HandshakeData(Map> headers, Map> urlParams, InetSocketAddress address, String url, boolean xdomain) {
+ public HandshakeData(HttpHeaders headers, Map> urlParams, InetSocketAddress address, String url, boolean xdomain) {
+ this(headers, urlParams, address, null, url, xdomain);
+ }
+
+ public HandshakeData(HttpHeaders headers, Map> urlParams, InetSocketAddress address, InetSocketAddress local, String url, boolean xdomain) {
super();
this.headers = headers;
this.urlParams = urlParams;
this.address = address;
+ this.local = local;
this.url = url;
this.xdomain = xdomain;
}
+ /**
+ * Client network address
+ *
+ * @return network address
+ */
public InetSocketAddress getAddress() {
return address;
}
- public Map> getHeaders() {
- return headers;
+ /**
+ * Connection local address
+ *
+ * @return local address
+ */
+ public InetSocketAddress getLocal() {
+ return local;
}
- public String getSingleHeader(String name) {
- List values = headers.get(name);
- if (values != null && values.size() == 1) {
- return values.iterator().next();
- }
- return null;
+ /**
+ * Http headers sent during first client request
+ *
+ * @return headers
+ */
+ public HttpHeaders getHttpHeaders() {
+ return headers;
}
+ /**
+ * Client connection date
+ *
+ * @return date
+ */
public Date getTime() {
return time;
}
+ /**
+ * Url used by client during first request
+ *
+ * @return url
+ */
public String getUrl() {
return url;
}
@@ -72,6 +103,11 @@ public boolean isXdomain() {
return xdomain;
}
+ /**
+ * Url params stored in url used by client during first request
+ *
+ * @return map
+ */
public Map> getUrlParams() {
return urlParams;
}
@@ -84,4 +120,11 @@ public String getSingleUrlParam(String name) {
return null;
}
+ public void setAuthToken(Object token) {
+ this.authToken = token;
+ }
+
+ public Object getAuthToken() {
+ return this.authToken;
+ }
}
diff --git a/src/main/java/com/corundumstudio/socketio/HttpRequestDecoderConfiguration.java b/src/main/java/com/corundumstudio/socketio/HttpRequestDecoderConfiguration.java
new file mode 100644
index 000000000..2897881f2
--- /dev/null
+++ b/src/main/java/com/corundumstudio/socketio/HttpRequestDecoderConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (c) 2012-2023 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.corundumstudio.socketio;
+
+public class HttpRequestDecoderConfiguration {
+
+ private int maxInitialLineLength;
+ private int maxHeaderSize;
+ private int maxChunkSize;
+
+ public HttpRequestDecoderConfiguration(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
+ this.maxInitialLineLength = maxInitialLineLength;
+ this.maxHeaderSize = maxHeaderSize;
+ this.maxChunkSize = maxChunkSize;
+ }
+
+ public HttpRequestDecoderConfiguration() {
+ this(4096, 8192, 8192);
+ }
+
+ public int getMaxInitialLineLength() {
+ return maxInitialLineLength;
+ }
+
+ public void setMaxInitialLineLength(int maxInitialLineLength) {
+ this.maxInitialLineLength = maxInitialLineLength;
+ }
+
+ public int getMaxHeaderSize() {
+ return maxHeaderSize;
+ }
+
+ public void setMaxHeaderSize(int maxHeaderSize) {
+ this.maxHeaderSize = maxHeaderSize;
+ }
+
+ public int getMaxChunkSize() {
+ return maxChunkSize;
+ }
+
+ public void setMaxChunkSize(int maxChunkSize) {
+ this.maxChunkSize = maxChunkSize;
+ }
+}
diff --git a/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java b/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java
index 879e4a4cc..a94d5f733 100644
--- a/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java
+++ b/src/main/java/com/corundumstudio/socketio/JsonSupportWrapper.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@
class JsonSupportWrapper implements JsonSupport {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(JsonSupportWrapper.class);
private final JsonSupport delegate;
@@ -37,47 +37,48 @@ class JsonSupportWrapper implements JsonSupport {
this.delegate = delegate;
}
+ @Override
public AckArgs readAckArgs(ByteBufInputStream src, AckCallback> callback) throws IOException {
try {
return delegate.readAckArgs(src, callback);
} catch (Exception e) {
src.reset();
log.error("Can't read ack args: " + src.readLine() + " for type: " + callback.getResultClass(), e);
- return null;
+ throw new IOException(e);
}
}
+ @Override
public T readValue(String namespaceName, ByteBufInputStream src, Class valueType) throws IOException {
try {
return delegate.readValue(namespaceName, src, valueType);
} catch (Exception e) {
src.reset();
log.error("Can't read value: " + src.readLine() + " for type: " + valueType, e);
- return null;
+ throw new IOException(e);
}
}
+ @Override
public void writeValue(ByteBufOutputStream out, Object value) throws IOException {
try {
delegate.writeValue(out, value);
} catch (Exception e) {
log.error("Can't write value: " + value, e);
+ throw new IOException(e);
}
}
+ @Override
public void addEventMapping(String namespaceName, String eventName, Class> ... eventClass) {
delegate.addEventMapping(namespaceName, eventName, eventClass);
}
+ @Override
public void removeEventMapping(String namespaceName, String eventName) {
delegate.removeEventMapping(namespaceName, eventName);
}
- @Override
- public void writeJsonpValue(ByteBufOutputStream out, Object value) throws IOException {
- delegate.writeJsonpValue(out, value);
- }
-
@Override
public List getArrays() {
return delegate.getArrays();
diff --git a/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java
new file mode 100644
index 000000000..59f69dd49
--- /dev/null
+++ b/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java
@@ -0,0 +1,135 @@
+/**
+ * Copyright (c) 2012-2023 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.corundumstudio.socketio;
+
+import com.corundumstudio.socketio.protocol.Packet;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * author: liangjiaqi
+ * date: 2020/8/8 6:02 PM
+ */
+public class MultiRoomBroadcastOperations implements BroadcastOperations {
+
+ private final Collection broadcastOperations;
+
+ public MultiRoomBroadcastOperations(Collection broadcastOperations) {
+ this.broadcastOperations = broadcastOperations;
+ }
+
+ @Override
+ public Collection getClients() {
+ Set clients = new HashSet();
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return clients;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ clients.addAll( b.getClients() );
+ }
+ return clients;
+ }
+
+ @Override
+ public void send(Packet packet, BroadcastAckCallback ackCallback) {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.send( packet, ackCallback );
+ }
+ }
+
+ @Override
+ public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
+ Predicate excludePredicate = (socketIOClient) -> Objects.equals(
+ socketIOClient.getSessionId(), excludedClient.getSessionId()
+ );
+ sendEvent(name, excludePredicate, data);
+ }
+
+ @Override
+ public void sendEvent(String name, Predicate excludePredicate, Object... data) {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.sendEvent( name, excludePredicate, data );
+ }
+ }
+
+ @Override
+ public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.sendEvent( name, data, ackCallback );
+ }
+ }
+
+ @Override
+ public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) {
+ Predicate excludePredicate = (socketIOClient) -> Objects.equals(
+ socketIOClient.getSessionId(), excludedClient.getSessionId()
+ );
+ sendEvent(name, data, excludePredicate, ackCallback);
+ }
+
+ @Override
+ public void sendEvent(String name, Object data, Predicate excludePredicate, BroadcastAckCallback ackCallback) {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.sendEvent( name, data, excludePredicate, ackCallback );
+ }
+ }
+
+ @Override
+ public void send(Packet packet) {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.send( packet );
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.disconnect();
+ }
+ }
+
+ @Override
+ public void sendEvent(String name, Object... data) {
+ if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
+ return;
+ }
+ for( BroadcastOperations b : this.broadcastOperations ) {
+ b.sendEvent( name, data );
+ }
+ }
+}
diff --git a/src/main/java/com/corundumstudio/socketio/MultiTypeAckCallback.java b/src/main/java/com/corundumstudio/socketio/MultiTypeAckCallback.java
index b1dd46dd1..c4ae704cb 100644
--- a/src/main/java/com/corundumstudio/socketio/MultiTypeAckCallback.java
+++ b/src/main/java/com/corundumstudio/socketio/MultiTypeAckCallback.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/MultiTypeArgs.java b/src/main/java/com/corundumstudio/socketio/MultiTypeArgs.java
index 3e8b35bb8..5741acbb4 100644
--- a/src/main/java/com/corundumstudio/socketio/MultiTypeArgs.java
+++ b/src/main/java/com/corundumstudio/socketio/MultiTypeArgs.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -50,8 +50,9 @@ public T second() {
/**
* "index out of bounds"-safe method for getting elements
*
- * @param index
- * @return
+ * @param type of argument
+ * @param index to get
+ * @return argument
*/
public T get(int index) {
if (size() <= index) {
diff --git a/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java
new file mode 100644
index 000000000..7187925ae
--- /dev/null
+++ b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java
@@ -0,0 +1,144 @@
+/**
+ * Copyright (c) 2012-2023 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.corundumstudio.socketio;
+
+import com.corundumstudio.socketio.misc.IterableCollection;
+import com.corundumstudio.socketio.protocol.EngineIOVersion;
+import com.corundumstudio.socketio.protocol.Packet;
+import com.corundumstudio.socketio.protocol.PacketType;
+import com.corundumstudio.socketio.store.StoreFactory;
+import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
+import com.corundumstudio.socketio.store.pubsub.PubSubType;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * Author: liangjiaqi
+ * Date: 2020/8/8 6:08 PM
+ */
+public class SingleRoomBroadcastOperations implements BroadcastOperations {
+ private final String namespace;
+ private final String room;
+ private final Iterable clients;
+ private final StoreFactory storeFactory;
+
+ public SingleRoomBroadcastOperations(String namespace, String room, Iterable clients, StoreFactory storeFactory) {
+ super();
+ this.namespace = namespace;
+ this.room = room;
+ this.clients = clients;
+ this.storeFactory = storeFactory;
+ }
+
+ private void dispatch(Packet packet) {
+ this.storeFactory.pubSubStore().publish(
+ PubSubType.DISPATCH,
+ new DispatchMessage(this.room, packet, this.namespace));
+ }
+
+ @Override
+ public Collection getClients() {
+ return new IterableCollection(clients);
+ }
+
+ @Override
+ public void send(Packet packet) {
+ for (SocketIOClient client : clients) {
+ packet.setEngineIOVersion(client.getEngineIOVersion());
+ client.send(packet);
+ }
+ dispatch(packet);
+ }
+
+ @Override
+ public void send(Packet packet, BroadcastAckCallback ackCallback) {
+ for (SocketIOClient client : clients) {
+ client.send(packet, ackCallback.createClientCallback(client));
+ }
+ ackCallback.loopFinished();
+ }
+
+ @Override
+ public void disconnect() {
+ for (SocketIOClient client : clients) {
+ client.disconnect();
+ }
+ }
+
+ @Override
+ public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
+ Predicate excludePredicate = (socketIOClient) -> Objects.equals(
+ socketIOClient.getSessionId(), excludedClient.getSessionId()
+ );
+ sendEvent(name, excludePredicate, data);
+ }
+
+ @Override
+ public void sendEvent(String name, Predicate excludePredicate, Object... data) {
+ Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN);
+ packet.setSubType(PacketType.EVENT);
+ packet.setName(name);
+ packet.setData(Arrays.asList(data));
+
+ for (SocketIOClient client : clients) {
+ packet.setEngineIOVersion(client.getEngineIOVersion());
+ if (excludePredicate.test(client)) {
+ continue;
+ }
+ client.send(packet);
+ }
+ dispatch(packet);
+ }
+
+ @Override
+ public void sendEvent(String name, Object... data) {
+ Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN);
+ packet.setSubType(PacketType.EVENT);
+ packet.setName(name);
+ packet.setData(Arrays.asList(data));
+ send(packet);
+ }
+
+ @Override
+ public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) {
+ for (SocketIOClient client : clients) {
+ client.sendEvent(name, ackCallback.createClientCallback(client), data);
+ }
+ ackCallback.loopFinished();
+ }
+
+ @Override
+ public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) {
+ Predicate excludePredicate = (socketIOClient) -> Objects.equals(
+ socketIOClient.getSessionId(), excludedClient.getSessionId()
+ );
+ sendEvent(name, data, excludePredicate, ackCallback);
+ }
+
+ @Override
+ public void sendEvent(String name, Object data, Predicate excludePredicate, BroadcastAckCallback ackCallback) {
+ for (SocketIOClient client : clients) {
+ if (excludePredicate.test(client)) {
+ continue;
+ }
+ client.sendEvent(name, ackCallback.createClientCallback(client), data);
+ }
+ ackCallback.loopFinished();
+ }
+}
diff --git a/src/main/java/com/corundumstudio/socketio/SocketConfig.java b/src/main/java/com/corundumstudio/socketio/SocketConfig.java
index 2c6e9c7fb..03a34d854 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketConfig.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketConfig.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,6 +37,10 @@ public class SocketConfig {
private int acceptBackLog = 1024;
+ private int writeBufferWaterMarkLow = -1;
+
+ private int writeBufferWaterMarkHigh = -1;
+
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
@@ -86,4 +90,19 @@ public void setAcceptBackLog(int acceptBackLog) {
this.acceptBackLog = acceptBackLog;
}
+ public int getWriteBufferWaterMarkLow() {
+ return writeBufferWaterMarkLow;
+ }
+
+ public void setWriteBufferWaterMarkLow(int writeBufferWaterMarkLow) {
+ this.writeBufferWaterMarkLow = writeBufferWaterMarkLow;
+ }
+
+ public int getWriteBufferWaterMarkHigh() {
+ return writeBufferWaterMarkHigh;
+ }
+
+ public void setWriteBufferWaterMarkHigh(int writeBufferWaterMarkHigh) {
+ this.writeBufferWaterMarkHigh = writeBufferWaterMarkHigh;
+ }
}
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
index 9465593b6..099a6ea0e 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,15 +15,6 @@
*/
package com.corundumstudio.socketio;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpRequestDecoder;
-import io.netty.handler.codec.http.HttpResponseEncoder;
-import io.netty.handler.ssl.SslHandler;
-
import java.security.KeyStore;
import javax.net.ssl.KeyManagerFactory;
@@ -32,6 +23,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,25 +36,39 @@
import com.corundumstudio.socketio.handler.PacketListener;
import com.corundumstudio.socketio.handler.WrongUrlHandler;
import com.corundumstudio.socketio.namespace.NamespacesHub;
+import com.corundumstudio.socketio.protocol.JsonSupport;
import com.corundumstudio.socketio.protocol.PacketDecoder;
import com.corundumstudio.socketio.protocol.PacketEncoder;
-import com.corundumstudio.socketio.protocol.JsonSupport;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
-import com.corundumstudio.socketio.scheduler.HashedWheelScheduler;
+import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DisconnectMessage;
-import com.corundumstudio.socketio.store.pubsub.PubSubStore;
-import com.corundumstudio.socketio.transport.WebSocketTransport;
+import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.PollingTransport;
+import com.corundumstudio.socketio.transport.WebSocketTransport;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.ssl.SslHandler;
public class SocketIOChannelInitializer extends ChannelInitializer implements DisconnectableHub {
public static final String SOCKETIO_ENCODER = "socketioEncoder";
+ public static final String WEB_SOCKET_TRANSPORT_COMPRESSION = "webSocketTransportCompression";
public static final String WEB_SOCKET_TRANSPORT = "webSocketTransport";
+ public static final String WEB_SOCKET_AGGREGATOR = "webSocketAggregator";
public static final String XHR_POLLING_TRANSPORT = "xhrPollingTransport";
public static final String AUTHORIZE_HANDLER = "authorizeHandler";
public static final String PACKET_HANDLER = "packetHandler";
public static final String HTTP_ENCODER = "httpEncoder";
+ public static final String HTTP_COMPRESSION = "httpCompression";
public static final String HTTP_AGGREGATOR = "httpAggregator";
public static final String HTTP_REQUEST_DECODER = "httpDecoder";
public static final String SSL_HANDLER = "ssl";
@@ -70,7 +76,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl
public static final String RESOURCE_HANDLER = "resourceHandler";
public static final String WRONG_URL_HANDLER = "wrongUrlBlocker";
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(SocketIOChannelInitializer.class);
private AckManager ackManager;
@@ -81,7 +87,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl
private EncoderHandler encoderHandler;
private WrongUrlHandler wrongUrlHandler;
- private CancelableScheduler scheduler = new HashedWheelScheduler();
+ private CancelableScheduler scheduler = new HashedWheelTimeoutScheduler();
private InPacketHandler packetHandler;
private SSLContext sslContext;
@@ -99,7 +105,7 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) {
JsonSupport jsonSupport = configuration.getJsonSupport();
PacketEncoder encoder = new PacketEncoder(configuration, jsonSupport);
- PacketDecoder decoder = new PacketDecoder(jsonSupport, namespacesHub, ackManager);
+ PacketDecoder decoder = new PacketDecoder(jsonSupport, ackManager);
String connectPath = configuration.getContext() + "/";
@@ -135,21 +141,54 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
+ addSslHandler(pipeline);
+ addSocketioHandlers(pipeline);
+ }
+ /**
+ * Adds the ssl handler
+ *
+ * @param pipeline - channel pipeline
+ */
+ protected void addSslHandler(ChannelPipeline pipeline) {
if (sslContext != null) {
SSLEngine engine = sslContext.createSSLEngine();
engine.setUseClientMode(false);
+ if (configuration.isNeedClientAuth() &&(configuration.getTrustStore() != null)) {
+ engine.setNeedClientAuth(true);
+ }
pipeline.addLast(SSL_HANDLER, new SslHandler(engine));
}
+ }
+
+ /**
+ * Adds the socketio channel handlers
+ *
+ * @param pipeline - channel pipeline
+ */
+ protected void addSocketioHandlers(ChannelPipeline pipeline) {
+ pipeline.addLast(HTTP_REQUEST_DECODER, new HttpRequestDecoder(configuration.getHttpDecoderConfig()));
+ pipeline.addLast(HTTP_AGGREGATOR, new HttpObjectAggregator(configuration.getMaxHttpContentLength()) {
+ @Override
+ protected Object newContinueResponse(HttpMessage start, int maxContentLength,
+ ChannelPipeline pipeline) {
+ return null;
+ }
- pipeline.addLast(HTTP_REQUEST_DECODER, new HttpRequestDecoder());
- pipeline.addLast(HTTP_AGGREGATOR, new HttpObjectAggregator(configuration.getMaxHttpContentLength()));
+ });
pipeline.addLast(HTTP_ENCODER, new HttpResponseEncoder());
+ if (configuration.isHttpCompression()) {
+ pipeline.addLast(HTTP_COMPRESSION, new HttpContentCompressor());
+ }
+
pipeline.addLast(PACKET_HANDLER, packetHandler);
pipeline.addLast(AUTHORIZE_HANDLER, authorizeHandler);
pipeline.addLast(XHR_POLLING_TRANSPORT, xhrPollingTransport);
+ if (configuration.isWebsocketCompression()) {
+ pipeline.addLast(WEB_SOCKET_TRANSPORT_COMPRESSION, new WebSocketServerCompressionHandler());
+ }
pipeline.addLast(WEB_SOCKET_TRANSPORT, webSocketTransport);
pipeline.addLast(SOCKETIO_ENCODER, encoderHandler);
@@ -170,7 +209,7 @@ private SSLContext createSSLContext(Configuration configuration) throws Exceptio
KeyStore ks = KeyStore.getInstance(configuration.getKeyStoreFormat());
ks.load(configuration.getKeyStore(), configuration.getKeyStorePassword().toCharArray());
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(configuration.getKeyManagerFactoryAlgorithm());
kmf.init(ks, configuration.getKeyStorePassword().toCharArray());
SSLContext serverContext = SSLContext.getInstance(configuration.getSSLProtocol());
@@ -178,12 +217,13 @@ private SSLContext createSSLContext(Configuration configuration) throws Exceptio
return serverContext;
}
+ @Override
public void onDisconnect(ClientHead client) {
ackManager.onDisconnect(client);
authorizeHandler.onDisconnect(client);
configuration.getStoreFactory().onDisconnect(client);
- configuration.getStoreFactory().pubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId()));
+ configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId()));
log.debug("Client with sessionId: {} disconnected", client.getSessionId());
}
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java
index 45453a933..289205df2 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
import java.util.Set;
import java.util.UUID;
+import com.corundumstudio.socketio.protocol.EngineIOVersion;
import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.store.Store;
@@ -43,6 +44,19 @@ public interface SocketIOClient extends ClientOperations, Store {
*/
Transport getTransport();
+ /**
+ * Engine IO Protocol version
+ * @return
+ */
+ EngineIOVersion getEngineIOVersion();
+
+ /**
+ * Returns true if and only if the I/O thread will perform the requested write operation immediately.
+ * Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests.
+ * @return
+ */
+ boolean isWritable();
+
/**
* Send event with ack callback
*
@@ -91,22 +105,45 @@ public interface SocketIOClient extends ClientOperations, Store {
/**
* Join client to room
*
- * @param room
+ * @param room - name of room
*/
void joinRoom(String room);
/**
- * Join client to room
+ * Join client to rooms
*
- * @param room
+ * @param rooms - names of rooms
+ */
+ void joinRooms(Set rooms);
+
+ /**
+ * Leave client from room
+ *
+ * @param room - name of room
*/
void leaveRoom(String room);
+ /**
+ * Leave client from rooms
+ *
+ * @param rooms - names of rooms
+ */
+ void leaveRooms(Set rooms);
+
/**
* Get all rooms a client is joined in.
*
- * @return
+ * @return name of rooms
*/
Set getAllRooms();
+ /**
+ * Get current room Size (contain in cluster)
+ *
+ * @param room - name of room
+ *
+ * @return int
+ */
+ int getCurrentRoomSize(String room);
+
}
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java b/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java
index 13843db7f..7c331444b 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIONamespace.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,23 +26,28 @@
*/
public interface SocketIONamespace extends ClientListeners {
+ String getName();
+
BroadcastOperations getBroadcastOperations();
BroadcastOperations getRoomOperations(String room);
+ BroadcastOperations getRoomOperations(String... rooms);
+
/**
* Get all clients connected to namespace
*
- * @return
+ * @return collection of clients
*/
Collection getAllClients();
/**
* Get client by uuid connected to namespace
*
- * @param uuid
- * @return
+ * @param uuid - id of client
+ * @return client
*/
SocketIOClient getClient(UUID uuid);
+ void addAuthTokenListener(AuthTokenListener listener);
}
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java
index c20e4ea91..5af4b8bd6 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,10 +15,9 @@
*/
package com.corundumstudio.socketio;
+import com.corundumstudio.socketio.listener.*;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -27,17 +26,14 @@
import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.corundumstudio.socketio.listener.ClientListeners;
-import com.corundumstudio.socketio.listener.ConnectListener;
-import com.corundumstudio.socketio.listener.DataListener;
-import com.corundumstudio.socketio.listener.DisconnectListener;
-import com.corundumstudio.socketio.listener.MultiTypeEventListener;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
@@ -47,7 +43,7 @@
*/
public class SocketIOServer implements ClientListeners {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(SocketIOServer.class);
private final Configuration configCopy;
private final Configuration configuration;
@@ -83,8 +79,8 @@ public Collection getAllClients() {
/**
* Get client by uuid from default namespace
*
- * @param uuid
- * @return
+ * @param uuid - id of client
+ * @return client
*/
public SocketIOClient getClient(UUID uuid) {
return namespacesHub.get(Namespace.DEFAULT_NAME).getClient(uuid);
@@ -100,19 +96,38 @@ public Collection getAllNamespaces() {
}
public BroadcastOperations getBroadcastOperations() {
- return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory());
+ Collection namespaces = namespacesHub.getAllNamespaces();
+ List list = new ArrayList();
+ BroadcastOperations broadcast = null;
+ if( namespaces != null && namespaces.size() > 0 ) {
+ for( SocketIONamespace n : namespaces ) {
+ broadcast = n.getBroadcastOperations();
+ list.add( broadcast );
+ }
+ }
+ return new MultiRoomBroadcastOperations( list );
}
/**
* Get broadcast operations for clients within
- * room by room name
+ * rooms by rooms' names
*
- * @param room
- * @return
+ * @param rooms rooms' names
+ * @return broadcast operations
*/
- public BroadcastOperations getRoomOperations(String room) {
- Iterable clients = namespacesHub.getRoomClients(room);
- return new BroadcastOperations(clients, configCopy.getStoreFactory());
+ public BroadcastOperations getRoomOperations(String... rooms) {
+ Collection namespaces = namespacesHub.getAllNamespaces();
+ List list = new ArrayList();
+ BroadcastOperations broadcast = null;
+ if( namespaces != null && namespaces.size() > 0 ) {
+ for( SocketIONamespace n : namespaces ) {
+ for ( String room : rooms ) {
+ broadcast = n.getRoomOperations( room );
+ list.add( broadcast );
+ }
+ }
+ }
+ return new MultiRoomBroadcastOperations( list );
}
/**
@@ -124,6 +139,8 @@ public void start() {
/**
* Start server asynchronously
+ *
+ * @return void
*/
public Future startAsync() {
log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
@@ -131,7 +148,7 @@ public Future startAsync() {
pipelineFactory.start(configCopy, namespacesHub);
- Class channelClass = NioServerSocketChannel.class;
+ Class extends ServerChannel> channelClass = NioServerSocketChannel.class;
if (configCopy.isUseLinuxNativeEpoll()) {
channelClass = EpollServerSocketChannel.class;
}
@@ -169,9 +186,16 @@ protected void applyConnectionOptions(ServerBootstrap bootstrap) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, config.getTcpReceiveBufferSize());
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(config.getTcpReceiveBufferSize()));
}
+ //default value @see WriteBufferWaterMark.DEFAULT
+ if (config.getWriteBufferWaterMarkLow() != -1 && config.getWriteBufferWaterMarkHigh() != -1) {
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+ config.getWriteBufferWaterMarkLow(), config.getWriteBufferWaterMarkHigh()
+ ));
+ }
+
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive());
+ bootstrap.childOption(ChannelOption.SO_LINGER, config.getSoLinger());
- bootstrap.option(ChannelOption.SO_LINGER, config.getSoLinger());
bootstrap.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress());
bootstrap.option(ChannelOption.SO_BACKLOG, config.getAcceptBackLog());
}
@@ -230,6 +254,18 @@ public void addEventListener(String eventName, Class eventClass, DataList
mainNamespace.addEventListener(eventName, eventClass, listener);
}
+ @Override
+ public void addEventInterceptor(EventInterceptor eventInterceptor) {
+ mainNamespace.addEventInterceptor(eventInterceptor);
+
+ }
+
+
+ @Override
+ public void removeAllListeners(String eventName) {
+ mainNamespace.removeAllListeners(eventName);
+ }
+
@Override
public void addDisconnectListener(DisconnectListener listener) {
mainNamespace.addDisconnectListener(listener);
@@ -240,13 +276,27 @@ public void addConnectListener(ConnectListener listener) {
mainNamespace.addConnectListener(listener);
}
+ @Override
+ public void addPingListener(PingListener listener) {
+ mainNamespace.addPingListener(listener);
+ }
+ @Override
+ public void addPongListener(PongListener listener) {
+ mainNamespace.addPongListener(listener);
+ }
+
@Override
public void addListeners(Object listeners) {
mainNamespace.addListeners(listeners);
}
@Override
- public void addListeners(Object listeners, Class listenersClass) {
+ public void addListeners(Iterable listeners) {
+ mainNamespace.addListeners(listeners);
+ }
+
+ @Override
+ public void addListeners(Object listeners, Class> listenersClass) {
mainNamespace.addListeners(listeners, listenersClass);
}
diff --git a/src/main/java/com/corundumstudio/socketio/Transport.java b/src/main/java/com/corundumstudio/socketio/Transport.java
index 5a3f4b230..b979731f1 100644
--- a/src/main/java/com/corundumstudio/socketio/Transport.java
+++ b/src/main/java/com/corundumstudio/socketio/Transport.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,14 +32,4 @@ public enum Transport {
public String getValue() {
return value;
}
-
- public static Transport byName(String value) {
- for (Transport t : Transport.values()) {
- if (t.getValue().equals(value)) {
- return t;
- }
- }
- throw new IllegalArgumentException("Can't find " + value + " transport");
- }
-
}
diff --git a/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java b/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java
index c2452732f..45402dd39 100644
--- a/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java
+++ b/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/ack/AckManager.java b/src/main/java/com/corundumstudio/socketio/ack/AckManager.java
index c0bd612f8..48b61f3a0 100644
--- a/src/main/java/com/corundumstudio/socketio/ack/AckManager.java
+++ b/src/main/java/com/corundumstudio/socketio/ack/AckManager.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,33 +15,29 @@
*/
package com.corundumstudio.socketio.ack;
+import com.corundumstudio.socketio.*;
+import com.corundumstudio.socketio.handler.ClientHead;
+import com.corundumstudio.socketio.protocol.Packet;
+import com.corundumstudio.socketio.scheduler.CancelableScheduler;
+import com.corundumstudio.socketio.scheduler.SchedulerKey;
+import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
+import io.netty.util.internal.PlatformDependent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.corundumstudio.socketio.AckCallback;
-import com.corundumstudio.socketio.Disconnectable;
-import com.corundumstudio.socketio.MultiTypeAckCallback;
-import com.corundumstudio.socketio.MultiTypeArgs;
-import com.corundumstudio.socketio.SocketIOClient;
-import com.corundumstudio.socketio.handler.ClientHead;
-import com.corundumstudio.socketio.protocol.Packet;
-import com.corundumstudio.socketio.scheduler.CancelableScheduler;
-import com.corundumstudio.socketio.scheduler.SchedulerKey;
-import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
-
public class AckManager implements Disconnectable {
- class AckEntry {
+ static class AckEntry {
- final Map> ackCallbacks = new ConcurrentHashMap>();
+ final Map> ackCallbacks = PlatformDependent.newConcurrentHashMap();
final AtomicLong ackIndex = new AtomicLong(-1);
public long addAckCallback(AckCallback> callback) {
@@ -68,9 +64,9 @@ public void initAckIndex(long index) {
}
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(AckManager.class);
- private final Map ackEntries = new ConcurrentHashMap();
+ private final ConcurrentMap ackEntries = PlatformDependent.newConcurrentHashMap();
private final CancelableScheduler scheduler;
@@ -88,7 +84,7 @@ private AckEntry getAckEntry(UUID sessionId) {
AckEntry ackEntry = ackEntries.get(sessionId);
if (ackEntry == null) {
ackEntry = new AckEntry();
- AckEntry oldAckEntry = ackEntries.put(sessionId, ackEntry);
+ AckEntry oldAckEntry = ackEntries.putIfAbsent(sessionId, ackEntry);
if (oldAckEntry != null) {
ackEntry = oldAckEntry;
}
@@ -96,6 +92,7 @@ private AckEntry getAckEntry(UUID sessionId) {
return ackEntry;
}
+ @SuppressWarnings("unchecked")
public void onAck(SocketIOClient client, Packet packet) {
AckSchedulerKey key = new AckSchedulerKey(Type.ACK_TIMEOUT, client.getSessionId(), packet.getAckId());
scheduler.cancel(key);
@@ -120,7 +117,7 @@ public void onAck(SocketIOClient client, Packet packet) {
}
}
- private AckCallback removeCallback(UUID sessionId, long index) {
+ private AckCallback> removeCallback(UUID sessionId, long index) {
AckEntry ackEntry = ackEntries.get(sessionId);
// may be null if client disconnected
// before timeout occurs
@@ -135,7 +132,7 @@ public AckCallback> getCallback(UUID sessionId, long index) {
return ackEntry.getAckCallback(index);
}
- public long registerAck(UUID sessionId, AckCallback callback) {
+ public long registerAck(UUID sessionId, AckCallback> callback) {
AckEntry ackEntry = getAckEntry(sessionId);
ackEntry.initAckIndex(0);
long index = ackEntry.addAckCallback(callback);
@@ -149,7 +146,7 @@ public long registerAck(UUID sessionId, AckCallback callback) {
return index;
}
- private void scheduleTimeout(final long index, final UUID sessionId, AckCallback callback) {
+ private void scheduleTimeout(final long index, final UUID sessionId, AckCallback> callback) {
if (callback.getTimeout() == -1) {
return;
}
@@ -157,7 +154,7 @@ private void scheduleTimeout(final long index, final UUID sessionId, AckCallback
scheduler.scheduleCallback(key, new Runnable() {
@Override
public void run() {
- AckCallback cb = removeCallback(sessionId, index);
+ AckCallback> cb = removeCallback(sessionId, index);
if (cb != null) {
cb.onTimeout();
}
diff --git a/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java b/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java
index 9a11fc9b1..abf300385 100644
--- a/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java
+++ b/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/AnnotationScanner.java b/src/main/java/com/corundumstudio/socketio/annotation/AnnotationScanner.java
index 9756841a7..b2bddb071 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/AnnotationScanner.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/AnnotationScanner.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,6 @@ public interface AnnotationScanner {
void addListener(Namespace namespace, Object object, Method method, Annotation annotation);
- void validate(Method method, Class clazz);
+ void validate(Method method, Class> clazz);
}
\ No newline at end of file
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/OnConnect.java b/src/main/java/com/corundumstudio/socketio/annotation/OnConnect.java
index 8c98f0ef6..3cb42284d 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/OnConnect.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/OnConnect.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/OnConnectScanner.java b/src/main/java/com/corundumstudio/socketio/annotation/OnConnectScanner.java
index a3cbc7099..2d5cbcc43 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/OnConnectScanner.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/OnConnectScanner.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,10 +26,12 @@
public class OnConnectScanner implements AnnotationScanner {
+ @Override
public Class extends Annotation> getScanAnnotation() {
return OnConnect.class;
}
+ @Override
public void addListener(Namespace namespace, final Object object, final Method method, Annotation annotation) {
namespace.addConnectListener(new ConnectListener() {
@Override
@@ -45,19 +47,19 @@ public void onConnect(SocketIOClient client) {
});
}
- public void validate(Method method, Class clazz) {
+ @Override
+ public void validate(Method method, Class> clazz) {
if (method.getParameterTypes().length != 1) {
throw new IllegalArgumentException("Wrong OnConnect listener signature: " + clazz + "." + method.getName());
}
- boolean valid = false;
+
for (Class> eventType : method.getParameterTypes()) {
- if (eventType.equals(SocketIOClient.class)) {
- valid = true;
- }
- }
- if (!valid) {
- throw new IllegalArgumentException("Wrong OnConnect listener signature: " + clazz + "." + method.getName());
+ if (SocketIOClient.class.equals(eventType)) {
+ return;
+ }
}
+
+ throw new IllegalArgumentException("Wrong OnConnect listener signature: " + clazz + "." + method.getName());
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnect.java b/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnect.java
index d42c5c3e1..3a42eda37 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnect.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnect.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnectScanner.java b/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnectScanner.java
index 4ed71116c..a682fc9d1 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnectScanner.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/OnDisconnectScanner.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -48,19 +48,18 @@ public void onDisconnect(SocketIOClient client) {
}
@Override
- public void validate(Method method, Class clazz) {
+ public void validate(Method method, Class> clazz) {
if (method.getParameterTypes().length != 1) {
throw new IllegalArgumentException("Wrong OnDisconnect listener signature: " + clazz + "." + method.getName());
}
- boolean valid = false;
+
for (Class> eventType : method.getParameterTypes()) {
- if (eventType.equals(SocketIOClient.class)) {
- valid = true;
- }
- }
- if (!valid) {
- throw new IllegalArgumentException("Wrong OnDisconnect listener signature: " + clazz + "." + method.getName());
+ if (SocketIOClient.class.equals(eventType)) {
+ return;
+ }
}
+
+ throw new IllegalArgumentException("Wrong OnDisconnect listener signature: " + clazz + "." + method.getName());
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/OnEvent.java b/src/main/java/com/corundumstudio/socketio/annotation/OnEvent.java
index 821f91bbd..d237f9975 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/OnEvent.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/OnEvent.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,6 +37,8 @@
/**
* Event name
+ *
+ * @return value
*/
String value();
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/OnEventScanner.java b/src/main/java/com/corundumstudio/socketio/annotation/OnEventScanner.java
index effca7906..754fb4591 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/OnEventScanner.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/OnEventScanner.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@ public void addListener(Namespace namespace, final Object object, final Method m
final List dataIndexes = dataIndexes(method);
if (dataIndexes.size() > 1) {
- List classes = new ArrayList();
+ List> classes = new ArrayList>();
for (int index : dataIndexes) {
Class> param = method.getParameterTypes()[index];
classes.add(param);
@@ -77,7 +77,7 @@ public void onData(SocketIOClient client, MultiTypeArgs data, AckRequest ackSend
throw new SocketIOException(e);
}
}
- }, classes.toArray(new Class[classes.size()]));
+ }, classes.toArray(new Class[0]));
} else {
Class objectType = Void.class;
if (!dataIndexes.isEmpty()) {
@@ -113,7 +113,7 @@ public void onData(SocketIOClient client, Object data, AckRequest ackSender) {
private List dataIndexes(Method method) {
List result = new ArrayList();
int index = 0;
- for (Class type : method.getParameterTypes()) {
+ for (Class> type : method.getParameterTypes()) {
if (!type.equals(AckRequest.class) && !type.equals(SocketIOClient.class)) {
result.add(index);
}
@@ -122,9 +122,9 @@ private List dataIndexes(Method method) {
return result;
}
- private int paramIndex(Method method, Class clazz) {
+ private int paramIndex(Method method, Class> clazz) {
int index = 0;
- for (Class type : method.getParameterTypes()) {
+ for (Class> type : method.getParameterTypes()) {
if (type.equals(clazz)) {
return index;
}
@@ -134,7 +134,7 @@ private int paramIndex(Method method, Class clazz) {
}
@Override
- public void validate(Method method, Class clazz) {
+ public void validate(Method method, Class> clazz) {
int paramsCount = method.getParameterTypes().length;
final int socketIOClientIndex = paramIndex(method, SocketIOClient.class);
final int ackRequestIndex = paramIndex(method, AckRequest.class);
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/ScannerEngine.java b/src/main/java/com/corundumstudio/socketio/annotation/ScannerEngine.java
index da66f6f4a..724e04e38 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/ScannerEngine.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/ScannerEngine.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,7 +28,7 @@
public class ScannerEngine {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(ScannerEngine.class);
private static final List extends AnnotationScanner> annotations =
Arrays.asList(new OnConnectScanner(), new OnDisconnectScanner(), new OnEventScanner());
@@ -36,7 +36,7 @@ public class ScannerEngine {
private Method findSimilarMethod(Class> objectClazz, Method method) {
Method[] methods = objectClazz.getDeclaredMethods();
for (Method m : methods) {
- if (equals(m, method)) {
+ if (isEquals(m, method)) {
return m;
}
}
@@ -86,7 +86,7 @@ public void scan(Namespace namespace, Object object, Class> clazz)
}
- private boolean equals(Method method1, Method method2) {
+ private boolean isEquals(Method method1, Method method2) {
if (!method1.getName().equals(method2.getName())
|| !method1.getReturnType().equals(method2.getReturnType())) {
return false;
diff --git a/src/main/java/com/corundumstudio/socketio/annotation/SpringAnnotationScanner.java b/src/main/java/com/corundumstudio/socketio/annotation/SpringAnnotationScanner.java
index 529bde646..6013f8e8a 100644
--- a/src/main/java/com/corundumstudio/socketio/annotation/SpringAnnotationScanner.java
+++ b/src/main/java/com/corundumstudio/socketio/annotation/SpringAnnotationScanner.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@
public class SpringAnnotationScanner implements BeanPostProcessor {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(SpringAnnotationScanner.class);
private final List> annotations =
Arrays.asList(OnConnect.class, OnDisconnect.class, OnEvent.class);
@@ -42,6 +42,10 @@ public class SpringAnnotationScanner implements BeanPostProcessor {
private Class originalBeanClass;
+ private Object originalBean;
+
+ private String originalBeanName;
+
public SpringAnnotationScanner(SocketIOServer socketIOServer) {
super();
this.socketIOServer = socketIOServer;
@@ -50,9 +54,10 @@ public SpringAnnotationScanner(SocketIOServer socketIOServer) {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (originalBeanClass != null) {
- socketIOServer.addListeners(bean, originalBeanClass);
- log.info("{} bean listeners added", beanName);
+ socketIOServer.addListeners(originalBean, originalBeanClass);
+ log.info("{} bean listeners added", originalBeanName);
originalBeanClass = null;
+ originalBeanName = null;
}
return bean;
}
@@ -82,6 +87,8 @@ public boolean matches(Method method) {
if (add.get()) {
originalBeanClass = bean.getClass();
+ originalBean = bean;
+ originalBeanName = beanName;
}
return bean;
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
index ec7b7470f..b446a4fe8 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,25 +16,15 @@
package com.corundumstudio.socketio.handler;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.QueryStringDecoder;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import com.corundumstudio.socketio.*;
+import com.corundumstudio.socketio.protocol.EngineIOVersion;
+import com.corundumstudio.socketio.store.Store;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +35,7 @@
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.Transport;
import com.corundumstudio.socketio.ack.AckManager;
+import com.corundumstudio.socketio.messages.HttpErrorMessage;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
import com.corundumstudio.socketio.protocol.AuthPacket;
@@ -55,14 +46,29 @@
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.ConnectMessage;
-import com.corundumstudio.socketio.store.pubsub.PubSubStore;
+import com.corundumstudio.socketio.store.pubsub.PubSubType;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.cookie.Cookie;
+import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
@Sharable
public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Disconnectable {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(AuthorizeHandler.class);
- private final CancelableScheduler disconnectScheduler;
+ private final CancelableScheduler scheduler;
private final String connectPath;
private final Configuration configuration;
@@ -77,7 +83,7 @@ public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Confi
super();
this.connectPath = connectPath;
this.configuration = configuration;
- this.disconnectScheduler = scheduler;
+ this.scheduler = scheduler;
this.namespacesHub = namespacesHub;
this.storeFactory = storeFactory;
this.disconnectable = disconnectable;
@@ -85,26 +91,41 @@ public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Confi
this.clientsBox = clientsBox;
}
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, ctx.channel());
+ scheduler.schedule(key, new Runnable() {
+ @Override
+ public void run() {
+ ctx.channel().close();
+ log.debug("Client with ip {} opened channel but doesn't send any data! Channel closed!", ctx.channel().remoteAddress());
+ }
+ }, configuration.getFirstDataTimeout(), TimeUnit.MILLISECONDS);
+ super.channelActive(ctx);
+ }
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, ctx.channel());
+ scheduler.cancel(key);
+
if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
Channel channel = ctx.channel();
- QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
+ QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri());
if (!configuration.isAllowCustomRequests()
&& !queryDecoder.path().startsWith(connectPath)) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
channel.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
req.release();
- log.warn("Blocked wrong request! url: {}, ip: {}", queryDecoder.path(), channel.remoteAddress());
return;
}
List sid = queryDecoder.parameters().get("sid");
if (queryDecoder.path().equals(connectPath)
&& sid == null) {
- String origin = req.headers().get(HttpHeaders.Names.ORIGIN);
+ String origin = req.headers().get(HttpHeaderNames.ORIGIN);
if (!authorize(ctx, channel, origin, queryDecoder.parameters(), req)) {
req.release();
return;
@@ -123,13 +144,17 @@ private boolean authorize(ChannelHandlerContext ctx, Channel channel, String ori
headers.put(name, values);
}
- HandshakeData data = new HandshakeData(headers, params,
- (InetSocketAddress)channel.remoteAddress(),
- req.getUri(), origin != null && !origin.equalsIgnoreCase("null"));
+ HandshakeData data = new HandshakeData(req.headers(), params,
+ (InetSocketAddress)channel.remoteAddress(),
+ (InetSocketAddress)channel.localAddress(),
+ req.uri(), origin != null && !origin.equalsIgnoreCase("null"));
boolean result = false;
+ Map storeParams = Collections.emptyMap();
try {
- result = configuration.getAuthorizationListener().isAuthorized(data);
+ AuthorizationResult authResult = configuration.getAuthorizationListener().getAuthorizationResult(data);
+ result = authResult.isAuthorized();
+ storeParams = authResult.getStoreParams();
} catch (Exception e) {
log.error("Authorization error", e);
}
@@ -142,56 +167,118 @@ private boolean authorize(ChannelHandlerContext ctx, Channel channel, String ori
return false;
}
- // TODO try to get sessionId from cookie
- UUID sessionId = UUID.randomUUID();
+ UUID sessionId = null;
+ if (configuration.isRandomSession()) {
+ sessionId = UUID.randomUUID();
+ } else {
+ sessionId = this.generateOrGetSessionIdFromRequest(req.headers());
+ }
List transportValue = params.get("transport");
if (transportValue == null) {
- log.warn("Got no transports for request {}", req.getUri());
+ log.error("Got no transports for request {}", req.uri());
+ writeAndFlushTransportError(channel, origin);
+ return false;
+ }
- HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
- channel.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
+ Transport transport = null;
+ try {
+ transport = Transport.valueOf(transportValue.get(0).toUpperCase());
+ } catch (IllegalArgumentException e) {
+ log.error("Unknown transport for request {}", req.uri());
+ writeAndFlushTransportError(channel, origin);
+ return false;
+ }
+ if (!configuration.getTransports().contains(transport)) {
+ log.error("Unsupported transport for request {}", req.uri());
+ writeAndFlushTransportError(channel, origin);
return false;
}
- Transport transport = Transport.byName(transportValue.get(0));
- ClientHead client = new ClientHead(sessionId, ackManager, disconnectable, storeFactory, data, clientsBox, transport, disconnectScheduler, configuration);
+ ClientHead client = new ClientHead(sessionId, ackManager, disconnectable, storeFactory, data, clientsBox, transport, scheduler, configuration, params);
+ Store store = client.getStore();
+ storeParams.forEach(store::set);
channel.attr(ClientHead.CLIENT).set(client);
clientsBox.addClient(client);
-
String[] transports = {};
- if (configuration.getTransports().contains(Transport.WEBSOCKET)) {
- transports = new String[] {"websocket"};
+ //:TODO lyjnew Current WEBSOCKET retrun upgrade[] engine-io protocol
+ // the test case line
+ // https://github.com/socketio/engine.io-protocol/blob/de247df875ddcd4778d1165829c8644301750e9f/test-suite/test-suite.js#L131C43-L131C43
+ if (configuration.getTransports().contains(Transport.WEBSOCKET) &&
+ !(EngineIOVersion.V4.equals(client.getEngineIOVersion()) && Transport.WEBSOCKET.equals(client.getCurrentTransport()))) {
+ transports = new String[]{"websocket"};
}
AuthPacket authPacket = new AuthPacket(sessionId, transports, configuration.getPingInterval(),
- configuration.getPingTimeout());
- Packet packet = new Packet(PacketType.OPEN);
+ configuration.getPingTimeout());
+ Packet packet = new Packet(PacketType.OPEN, client.getEngineIOVersion());
packet.setData(authPacket);
client.send(packet);
+ client.schedulePing();
client.schedulePingTimeout();
log.debug("Handshake authorized for sessionId: {}, query params: {} headers: {}", sessionId, params, headers);
return true;
}
+ private void writeAndFlushTransportError(Channel channel, String origin) {
+ Map errorData = new HashMap();
+ errorData.put("code", 0);
+ errorData.put("message", "Transport unknown");
+
+ channel.attr(EncoderHandler.ORIGIN).set(origin);
+ channel.writeAndFlush(new HttpErrorMessage(errorData));
+ }
+
+ /**
+ * This method will either generate a new random sessionId or will retrieve the value stored
+ * in the "io" cookie. Failures to parse will cause a logging warning to be generated and a
+ * random uuid to be generated instead (same as not passing a cookie in the first place).
+ */
+ private UUID generateOrGetSessionIdFromRequest(HttpHeaders headers) {
+ List values = headers.getAll("io");
+ if (values.size() == 1) {
+ try {
+ return UUID.fromString(values.get(0));
+ } catch (IllegalArgumentException iaex) {
+ log.warn("Malformed UUID received for session! io=" + values.get(0));
+ }
+ }
+
+ for (String cookieHeader : headers.getAll(HttpHeaderNames.COOKIE)) {
+ Set cookies = ServerCookieDecoder.LAX.decode(cookieHeader);
+
+ for (Cookie cookie : cookies) {
+ if (cookie.name().equals("io")) {
+ try {
+ return UUID.fromString(cookie.value());
+ } catch (IllegalArgumentException iaex) {
+ log.warn("Malformed UUID received for session! io=" + cookie.value());
+ }
+ }
+ }
+ }
+
+ return UUID.randomUUID();
+ }
+
public void connect(UUID sessionId) {
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId);
- disconnectScheduler.cancel(key);
+ scheduler.cancel(key);
}
public void connect(ClientHead client) {
Namespace ns = namespacesHub.get(Namespace.DEFAULT_NAME);
if (!client.getNamespaces().contains(ns)) {
-// connect(client.getSessionId());
-
- Packet packet = new Packet(PacketType.MESSAGE);
+ Packet packet = new Packet(PacketType.MESSAGE, client.getEngineIOVersion());
packet.setSubType(PacketType.CONNECT);
- client.send(packet);
+ //::TODO lyjnew V4 delay send connect packet ON client add Namecapse
+ if (!EngineIOVersion.V4.equals(client.getEngineIOVersion()))
+ client.send(packet);
- configuration.getStoreFactory().pubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId()));
+ configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId()));
SocketIOClient nsClient = client.addNamespaceClient(ns);
ns.onConnect(nsClient);
diff --git a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java
index 99c7f85b6..dfb1220e0 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,27 +15,6 @@
*/
package com.corundumstudio.socketio.handler;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.util.AttributeKey;
-
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HandshakeData;
@@ -43,6 +22,7 @@
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.messages.OutPacketMessage;
import com.corundumstudio.socketio.namespace.Namespace;
+import com.corundumstudio.socketio.protocol.EngineIOVersion;
import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketType;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
@@ -51,24 +31,40 @@
import com.corundumstudio.socketio.store.Store;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.transport.NamespaceClient;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.util.AttributeKey;
+import io.netty.util.internal.PlatformDependent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class ClientHead {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(ClientHead.class);
public static final AttributeKey CLIENT = AttributeKey.valueOf("client");
private final AtomicBoolean disconnected = new AtomicBoolean();
- private final Map namespaceClients = new ConcurrentHashMap();
- private final Map channels = new HashMap();
+ private final Map namespaceClients = PlatformDependent.newConcurrentHashMap();
+ private final Map channels = new HashMap(2);
private final HandshakeData handshakeData;
private final UUID sessionId;
+ private final EngineIOVersion engineIOVersion;
+
private final Store store;
private final DisconnectableHub disconnectableHub;
private final AckManager ackManager;
private ClientsBox clientsBox;
- private final CancelableScheduler disconnectScheduler;
+ private final CancelableScheduler scheduler;
private final Configuration configuration;
private Packet lastBinaryPacket;
@@ -77,8 +73,8 @@ public class ClientHead {
private volatile Transport currentTransport;
public ClientHead(UUID sessionId, AckManager ackManager, DisconnectableHub disconnectable,
- StoreFactory storeFactory, HandshakeData handshakeData, ClientsBox clientsBox, Transport transport, CancelableScheduler disconnectScheduler,
- Configuration configuration) {
+ StoreFactory storeFactory, HandshakeData handshakeData, ClientsBox clientsBox, Transport transport, CancelableScheduler scheduler,
+ Configuration configuration, Map> params) {
this.sessionId = sessionId;
this.ackManager = ackManager;
this.disconnectableHub = disconnectable;
@@ -86,11 +82,18 @@ public ClientHead(UUID sessionId, AckManager ackManager, DisconnectableHub disco
this.handshakeData = handshakeData;
this.clientsBox = clientsBox;
this.currentTransport = transport;
- this.disconnectScheduler = disconnectScheduler;
+ this.scheduler = scheduler;
this.configuration = configuration;
channels.put(Transport.POLLING, new TransportState());
channels.put(Transport.WEBSOCKET, new TransportState());
+
+ List versions = params.getOrDefault(EngineIOVersion.EIO, new ArrayList());
+ if (versions.isEmpty()) {
+ engineIOVersion = EngineIOVersion.UNKNOWN;
+ } else {
+ engineIOVersion = EngineIOVersion.fromValue(versions.get(0));
+ }
}
public void bindChannel(Channel channel, Transport transport) {
@@ -106,28 +109,59 @@ public void bindChannel(Channel channel, Transport transport) {
sendPackets(transport, channel);
}
+ public void releasePollingChannel(Channel channel) {
+ TransportState state = channels.get(Transport.POLLING);
+ if(channel.equals(state.getChannel())) {
+ clientsBox.remove(channel);
+ state.update(null);
+ }
+ }
+
public String getOrigin() {
- return handshakeData.getSingleHeader(HttpHeaders.Names.ORIGIN);
+ return handshakeData.getHttpHeaders().get(HttpHeaderNames.ORIGIN);
}
public ChannelFuture send(Packet packet) {
return send(packet, getCurrentTransport());
}
+ public void cancelPing() {
+ SchedulerKey key = new SchedulerKey(Type.PING, sessionId);
+ scheduler.cancel(key);
+ }
public void cancelPingTimeout() {
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId);
- disconnectScheduler.cancel(key);
+ scheduler.cancel(key);
+ }
+
+ public void schedulePing() {
+ cancelPing();
+ final SchedulerKey key = new SchedulerKey(Type.PING, sessionId);
+ scheduler.schedule(key, new Runnable() {
+ @Override
+ public void run() {
+ ClientHead client = clientsBox.get(sessionId);
+ if (client != null) {
+ EngineIOVersion version = client.getEngineIOVersion();
+ //only send ping packet for engine.io version 4
+ if (EngineIOVersion.V4.equals(version)) {
+ client.send(new Packet(PacketType.PING, version));
+ }
+ schedulePing();
+ }
+ }
+ }, configuration.getPingInterval(), TimeUnit.MILLISECONDS);
}
public void schedulePingTimeout() {
cancelPingTimeout();
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId);
- disconnectScheduler.schedule(key, new Runnable() {
+ scheduler.schedule(key, new Runnable() {
@Override
public void run() {
ClientHead client = clientsBox.get(sessionId);
if (client != null) {
- client.onChannelDisconnect();
+ client.disconnect();
log.debug("{} removed due to ping timeout", sessionId);
}
}
@@ -147,7 +181,6 @@ public ChannelFuture send(Packet packet, Transport transport) {
}
private ChannelFuture sendPackets(Transport transport, Channel channel) {
- // TODO promise handling
return channel.writeAndFlush(new OutPacketMessage(this, transport));
}
@@ -177,6 +210,7 @@ public boolean isConnected() {
}
public void onChannelDisconnect() {
+ cancelPing();
cancelPingTimeout();
disconnected.set(true);
@@ -207,8 +241,12 @@ public SocketAddress getRemoteAddress() {
}
public void disconnect() {
- ChannelFuture future = send(new Packet(PacketType.DISCONNECT));
- future.addListener(ChannelFutureListener.CLOSE);
+ Packet packet = new Packet(PacketType.MESSAGE, engineIOVersion);
+ packet.setSubType(PacketType.DISCONNECT);
+ ChannelFuture future = send(packet);
+ if(future != null) {
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
onChannelDisconnect();
}
@@ -267,4 +305,20 @@ public Packet getLastBinaryPacket() {
return lastBinaryPacket;
}
+ public EngineIOVersion getEngineIOVersion() {
+ return engineIOVersion;
+ }
+
+ /**
+ * Returns true if and only if the I/O thread will perform the requested write operation immediately.
+ * Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests.
+ * @return
+ */
+ public boolean isWritable() {
+ TransportState state = channels.get(getCurrentTransport());
+ Channel channel = state.getChannel();
+ return channel != null && channel.isWritable();
+ }
+
+
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/ClientsBox.java b/src/main/java/com/corundumstudio/socketio/handler/ClientsBox.java
index fc3725a47..beafd2cef 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/ClientsBox.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/ClientsBox.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,17 +16,17 @@
package com.corundumstudio.socketio.handler;
import io.netty.channel.Channel;
+import io.netty.util.internal.PlatformDependent;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import com.corundumstudio.socketio.HandshakeData;
public class ClientsBox {
- private final Map uuid2clients = new ConcurrentHashMap();
- private final Map channel2clients = new ConcurrentHashMap();
+ private final Map uuid2clients = PlatformDependent.newConcurrentHashMap();
+ private final Map channel2clients = PlatformDependent.newConcurrentHashMap();
// TODO use storeFactory
public HandshakeData getHandshakeData(UUID sessionId) {
@@ -42,8 +42,8 @@ public void addClient(ClientHead clientHead) {
uuid2clients.put(clientHead.getSessionId(), clientHead);
}
- public void removeClient(UUID sessionId) {
- uuid2clients.remove(sessionId);
+ public ClientHead removeClient(UUID sessionId) {
+ return uuid2clients.remove(sessionId);
}
public ClientHead get(UUID sessionId) {
diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
index b9fa45178..4d538a716 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,53 +15,56 @@
*/
package com.corundumstudio.socketio.handler;
-import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_CREDENTIALS;
-import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_HEADERS;
-import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN;
-import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
-import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import com.corundumstudio.socketio.Configuration;
+import com.corundumstudio.socketio.Transport;
+import com.corundumstudio.socketio.messages.HttpErrorMessage;
+import com.corundumstudio.socketio.messages.HttpMessage;
+import com.corundumstudio.socketio.messages.OutPacketMessage;
+import com.corundumstudio.socketio.messages.XHROptionsMessage;
+import com.corundumstudio.socketio.messages.XHRPostMessage;
+import com.corundumstudio.socketio.protocol.Packet;
+import com.corundumstudio.socketio.protocol.PacketEncoder;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
-
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.List;
import java.util.Queue;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.corundumstudio.socketio.Configuration;
-import com.corundumstudio.socketio.Transport;
-import com.corundumstudio.socketio.messages.BaseMessage;
-import com.corundumstudio.socketio.messages.HttpMessage;
-import com.corundumstudio.socketio.messages.OutPacketMessage;
-import com.corundumstudio.socketio.messages.XHROptionsMessage;
-import com.corundumstudio.socketio.messages.XHRPostMessage;
-import com.corundumstudio.socketio.protocol.Packet;
-import com.corundumstudio.socketio.protocol.PacketEncoder;
-
@Sharable
public class EncoderHandler extends ChannelOutboundHandlerAdapter {
@@ -73,7 +76,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
public static final AttributeKey JSONP_INDEX = AttributeKey.valueOf("jsonpIndex");
public static final AttributeKey WRITE_ONCE = AttributeKey.valueOf("writeOnce");
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(EncoderHandler.class);
private final PacketEncoder encoder;
@@ -92,8 +95,8 @@ public EncoderHandler(Configuration configuration, PacketEncoder encoder) throws
private void readVersion() throws IOException {
Enumeration resources = getClass().getClassLoader().getResources("META-INF/MANIFEST.MF");
while (resources.hasMoreElements()) {
- try {
- Manifest manifest = new Manifest(resources.nextElement().openStream());
+ try (InputStream inputStream = resources.nextElement().openStream()){
+ Manifest manifest = new Manifest(inputStream);
Attributes attrs = manifest.getMainAttributes();
if (attrs == null) {
continue;
@@ -109,32 +112,39 @@ private void readVersion() throws IOException {
}
}
- private void write(XHROptionsMessage msg, ChannelHandlerContext ctx) {
+ private void write(XHROptionsMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
- HttpHeaders.addHeader(res, "Set-Cookie", "io=" + msg.getSessionId());
- HttpHeaders.addHeader(res, CONNECTION, KEEP_ALIVE);
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_HEADERS, CONTENT_TYPE);
- addOriginHeaders(ctx.channel(), res);
+ res.headers().add(HttpHeaderNames.SET_COOKIE, "io=" + msg.getSessionId())
+ .add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
+ .add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, HttpHeaderNames.CONTENT_TYPE);
+
+ String origin = ctx.channel().attr(ORIGIN).get();
+ addOriginHeaders(origin, res);
ByteBuf out = encoder.allocateBuffer(ctx.alloc());
- sendMessage(msg, ctx.channel(), out, res);
+ sendMessage(msg, ctx.channel(), out, res, promise);
}
- private void write(XHRPostMessage msg, ChannelHandlerContext ctx) {
+ private void write(XHRPostMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) {
ByteBuf out = encoder.allocateBuffer(ctx.alloc());
out.writeBytes(OK);
- sendMessage(msg, ctx.channel(), out, "text/html");
+ sendMessage(msg, ctx.channel(), out, "text/html", promise, HttpResponseStatus.OK);
}
- private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, String type) {
- HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
+ private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, String type, ChannelPromise promise, HttpResponseStatus status) {
+ HttpResponse res = new DefaultHttpResponse(HTTP_1_1, status);
+
+ res.headers().add(HttpHeaderNames.CONTENT_TYPE, type)
+ .add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+ if (msg.getSessionId() != null) {
+ res.headers().add(HttpHeaderNames.SET_COOKIE, "io=" + msg.getSessionId());
+ }
- res.headers().add(CONTENT_TYPE, type).add("Set-Cookie", "io=" + msg.getSessionId())
- .add(CONNECTION, KEEP_ALIVE);
+ String origin = channel.attr(ORIGIN).get();
+ addOriginHeaders(origin, res);
- addOriginHeaders(channel, res);
- HttpHeaders.setContentLength(res, out.readableBytes());
+ HttpUtil.setContentLength(res, out.readableBytes());
// prevent XSS warnings on IE
// https://github.com/LearnBoost/socket.io/pull/1333
@@ -143,46 +153,62 @@ private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, String t
res.headers().add("X-XSS-Protection", "0");
}
- sendMessage(msg, channel, out, res);
+ sendMessage(msg, channel, out, res, promise);
}
- private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, HttpResponse res) {
+ private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, HttpResponse res, ChannelPromise promise) {
channel.write(res);
if (log.isTraceEnabled()) {
- log.trace("Out message: {} - sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
+ if (msg.getSessionId() != null) {
+ log.trace("Out message: {} - sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
+ } else {
+ log.trace("Out message: {}", out.toString(CharsetUtil.UTF_8));
+ }
}
if (out.isReadable()) {
- channel.write(out);
+ channel.write(new DefaultHttpContent(out));
} else {
out.release();
}
- channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
+ channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise).addListener(ChannelFutureListener.CLOSE);
+ }
+ private void sendError(HttpErrorMessage errorMsg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
+ final ByteBuf encBuf = encoder.allocateBuffer(ctx.alloc());
+ ByteBufOutputStream out = new ByteBufOutputStream(encBuf);
+ encoder.getJsonSupport().writeValue(out, errorMsg.getData());
+
+ sendMessage(errorMsg, ctx.channel(), encBuf, "application/json", promise, HttpResponseStatus.BAD_REQUEST);
}
- private void addOriginHeaders(Channel channel, HttpResponse res) {
+ private void addOriginHeaders(String origin, HttpResponse res) {
if (version != null) {
- res.headers().add(HttpHeaders.Names.SERVER, version);
+ res.headers().add(HttpHeaderNames.SERVER, version);
}
- if (configuration.getOrigin() != null) {
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, configuration.getOrigin());
- } else {
- String origin = channel.attr(ORIGIN).get();
- if (origin != null) {
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, origin);
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE);
+ if (configuration.isEnableCors()) {
+ if (configuration.getOrigin() != null) {
+ res.headers().add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, configuration.getOrigin());
+ res.headers().add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE);
} else {
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ if (origin != null) {
+ res.headers().add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
+ res.headers().add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE);
+ } else {
+ res.headers().add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ }
+ }
+ if (configuration.getAllowHeaders() != null) {
+ res.headers().add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, configuration.getAllowHeaders());
}
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (!(msg instanceof BaseMessage)) {
+ if (!(msg instanceof HttpMessage)) {
super.write(ctx, msg, promise);
return;
}
@@ -190,35 +216,59 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (msg instanceof OutPacketMessage) {
OutPacketMessage m = (OutPacketMessage) msg;
if (m.getTransport() == Transport.WEBSOCKET) {
- handleWebsocket((OutPacketMessage) msg, ctx);
+ handleWebsocket((OutPacketMessage) msg, ctx, promise);
}
if (m.getTransport() == Transport.POLLING) {
- handleHTTP((OutPacketMessage) msg, ctx);
+ handleHTTP((OutPacketMessage) msg, ctx, promise);
}
} else if (msg instanceof XHROptionsMessage) {
- write((XHROptionsMessage) msg, ctx);
+ write((XHROptionsMessage) msg, ctx, promise);
} else if (msg instanceof XHRPostMessage) {
- write((XHRPostMessage) msg, ctx);
+ write((XHRPostMessage) msg, ctx, promise);
+ } else if (msg instanceof HttpErrorMessage) {
+ sendError((HttpErrorMessage) msg, ctx, promise);
}
}
- private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx) throws IOException {
+
+ private static final int FRAME_BUFFER_SIZE = 8192;
+
+
+ private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
+ ChannelFutureList writeFutureList = new ChannelFutureList();
+
while (true) {
Queue queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
Packet packet = queue.poll();
if (packet == null) {
+ writeFutureList.setChannelPromise(promise);
break;
}
- final ByteBuf out = encoder.allocateBuffer(ctx.alloc());
- encoder.encodePacket(packet, out, ctx.alloc(), true, false);
+ ByteBuf out = encoder.allocateBuffer(ctx.alloc());
+ encoder.encodePacket(packet, out, ctx.alloc(), true);
- WebSocketFrame res = new TextWebSocketFrame(out);
if (log.isTraceEnabled()) {
log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
}
- ctx.channel().writeAndFlush(res);
- if (!out.isReadable()) {
+ if (out.isReadable() && out.readableBytes() > configuration.getMaxFramePayloadLength()) {
+ ByteBuf dstStart = out.readSlice(FRAME_BUFFER_SIZE);
+ dstStart.retain();
+ WebSocketFrame start = new TextWebSocketFrame(false, 0, dstStart);
+ ctx.channel().write(start);
+ while (out.isReadable()) {
+ int re = Math.min(out.readableBytes(), FRAME_BUFFER_SIZE);
+ ByteBuf dst = out.readSlice(re);
+ dst.retain();
+ WebSocketFrame res = new ContinuationWebSocketFrame(!out.isReadable(), 0, dst);
+ ctx.channel().write(res);
+ }
+ out.release();
+ ctx.channel().flush();
+ } else if (out.isReadable()){
+ WebSocketFrame res = new TextWebSocketFrame(out);
+ ctx.channel().writeAndFlush(res);
+ } else {
out.release();
}
@@ -229,18 +279,19 @@ private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext c
if (log.isTraceEnabled()) {
log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId());
}
- ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf));
+ writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf)));
}
}
}
- private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx) throws IOException {
+ private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
Channel channel = ctx.channel();
Attribute attr = channel.attr(WRITE_ONCE);
Queue queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
if (!channel.isActive() || queue.isEmpty() || !attr.compareAndSet(null, true)) {
+ promise.trySuccess();
return;
}
@@ -249,10 +300,67 @@ private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx) throws
if (b64 != null && b64) {
Integer jsonpIndex = ctx.channel().attr(EncoderHandler.JSONP_INDEX).get();
encoder.encodeJsonP(jsonpIndex, queue, out, ctx.alloc(), 50);
- sendMessage(msg, channel, out, "application/javascript");
+ String type = "application/javascript";
+ if (jsonpIndex == null) {
+ type = "text/plain";
+ }
+ sendMessage(msg, channel, out, type, promise, HttpResponseStatus.OK);
} else {
encoder.encodePackets(queue, out, ctx.alloc(), 50);
- sendMessage(msg, channel, out, "application/octet-stream");
+ sendMessage(msg, channel, out, "application/octet-stream", promise, HttpResponseStatus.OK);
+ }
+ }
+
+ /**
+ * Helper class for the handleWebsocket method, handles a list of ChannelFutures and
+ * sets the status of a promise when
+ * - any of the operations fail
+ * - all of the operations succeed
+ * The setChannelPromise method should be called after all the futures are added
+ */
+ private static class ChannelFutureList implements GenericFutureListener> {
+
+ private List futureList = new ArrayList();
+ private ChannelPromise promise = null;
+
+ private void cleanup() {
+ promise = null;
+ for (ChannelFuture f : futureList) f.removeListener(this);
+ }
+
+ private void validate() {
+ boolean allSuccess = true;
+ for (ChannelFuture f : futureList) {
+ if (f.isDone()) {
+ if (!f.isSuccess()) {
+ promise.tryFailure(f.cause());
+ cleanup();
+ return;
+ }
+ }
+ else {
+ allSuccess = false;
+ }
+ }
+ if (allSuccess) {
+ promise.trySuccess();
+ cleanup();
+ }
+ }
+
+ public void add(ChannelFuture f) {
+ futureList.add(f);
+ f.addListener(this);
+ }
+
+ public void setChannelPromise(ChannelPromise p) {
+ promise = p;
+ validate();
+ }
+
+ @Override
+ public void operationComplete(Future voidFuture) throws Exception {
+ if (promise != null) validate();
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java b/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java
index 2e8def00b..5e8503154 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,28 +15,29 @@
*/
package com.corundumstudio.socketio.handler;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.CharsetUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.corundumstudio.socketio.AuthTokenResult;
import com.corundumstudio.socketio.listener.ExceptionListener;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
+import com.corundumstudio.socketio.protocol.ConnPacket;
+import com.corundumstudio.socketio.protocol.EngineIOVersion;
import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketDecoder;
import com.corundumstudio.socketio.protocol.PacketType;
import com.corundumstudio.socketio.transport.NamespaceClient;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Sharable
public class InPacketHandler extends SimpleChannelInboundHandler {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(InPacketHandler.class);
private final PacketListener packetListener;
private final PacketDecoder decoder;
@@ -63,17 +64,51 @@ protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, PacketsM
while (content.isReadable()) {
try {
Packet packet = decoder.decodePackets(content, client);
- if (packet.hasAttachments() && !packet.isAttachmentsLoaded()) {
- return;
- }
+
Namespace ns = namespacesHub.get(packet.getNsp());
if (ns == null) {
+ if (packet.getSubType() == PacketType.CONNECT) {
+ Packet p = new Packet(PacketType.MESSAGE, client.getEngineIOVersion());
+ p.setSubType(PacketType.ERROR);
+ p.setNsp(packet.getNsp());
+ p.setData("Invalid namespace");
+ client.send(p);
+ return;
+ }
log.debug("Can't find namespace for endpoint: {}, sessionId: {} probably it was removed.", packet.getNsp(), client.getSessionId());
return;
}
if (packet.getSubType() == PacketType.CONNECT) {
client.addNamespaceClient(ns);
+ NamespaceClient nClient = client.getChildClient(ns);
+ //:TODO lyjnew client namespace send connect packet 0+namespace socket io v4
+ // https://socket.io/docs/v4/socket-io-protocol/#connection-to-a-namespace
+ if (EngineIOVersion.V4.equals(client.getEngineIOVersion())) {
+ // Check for an auth token
+ if (packet.getData() != null) {
+ final Object authData = packet.getData();
+ client.getHandshakeData().setAuthToken(authData);
+ // Call all authTokenListeners to see if one denies it
+ final AuthTokenResult allowAuth = ns.onAuthData(nClient, authData);
+ if (!allowAuth.isSuccess()) {
+ Packet p = new Packet(PacketType.MESSAGE, client.getEngineIOVersion());
+ p.setSubType(PacketType.ERROR);
+ p.setNsp(packet.getNsp());
+ final Object errorData = allowAuth.getErrorData();
+ if (errorData != null) {
+ p.setData(errorData);
+ }
+ client.send(p);
+ return;
+ }
+ }
+ Packet p = new Packet(PacketType.MESSAGE, client.getEngineIOVersion());
+ p.setSubType(PacketType.CONNECT);
+ p.setNsp(packet.getNsp());
+ p.setData(new ConnPacket(client.getSessionId()));
+ client.send(p);
+ }
}
NamespaceClient nClient = client.getChildClient(ns);
@@ -81,11 +116,14 @@ protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, PacketsM
log.debug("Can't find namespace client in namespace: {}, sessionId: {} probably it was disconnected.", ns.getName(), client.getSessionId());
return;
}
+ if (packet.hasAttachments() && !packet.isAttachmentsLoaded()) {
+ return;
+ }
packetListener.onPacket(packet, nClient, message.getTransport());
} catch (Exception ex) {
String c = content.toString(CharsetUtil.UTF_8);
log.error("Error during data processing. Client sessionId: " + client.getSessionId() + ", data: " + c, ex);
- return;
+ throw ex;
}
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/PacketListener.java b/src/main/java/com/corundumstudio/socketio/handler/PacketListener.java
index c3583ff3e..d0446339f 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/PacketListener.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/PacketListener.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
+import com.corundumstudio.socketio.protocol.EngineIOVersion;
import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketType;
import com.corundumstudio.socketio.scheduler.CancelableScheduler;
@@ -52,16 +53,23 @@ public void onPacket(Packet packet, NamespaceClient client, Transport transport)
switch (packet.getType()) {
case PING: {
- Packet outPacket = new Packet(PacketType.PONG);
+ Packet outPacket = new Packet(PacketType.PONG, client.getEngineIOVersion());
outPacket.setData(packet.getData());
// TODO use future
client.getBaseClient().send(outPacket, transport);
-
if ("probe".equals(packet.getData())) {
- client.getBaseClient().send(new Packet(PacketType.NOOP), Transport.POLLING);
+ client.getBaseClient().send(new Packet(PacketType.NOOP, client.getEngineIOVersion()), Transport.POLLING);
} else {
client.getBaseClient().schedulePingTimeout();
}
+ Namespace namespace = namespacesHub.get(packet.getNsp());
+ namespace.onPing(client);
+ break;
+ }
+ case PONG: {
+ client.getBaseClient().schedulePingTimeout();
+ Namespace namespace = namespacesHub.get(packet.getNsp());
+ namespace.onPong(client);
break;
}
@@ -86,10 +94,13 @@ public void onPacket(Packet packet, NamespaceClient client, Transport transport)
Namespace namespace = namespacesHub.get(packet.getNsp());
namespace.onConnect(client);
// send connect handshake packet back to client
- client.getBaseClient().send(packet, transport);
+ if (!EngineIOVersion.V4.equals(client.getEngineIOVersion())) {
+ client.getBaseClient().send(packet, transport);
+ }
}
- if (packet.getSubType() == PacketType.ACK) {
+ if (packet.getSubType() == PacketType.ACK
+ || packet.getSubType() == PacketType.BINARY_ACK) {
ackManager.onAck(client, packet);
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java b/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java
deleted file mode 100644
index 47eeb1878..000000000
--- a/src/main/java/com/corundumstudio/socketio/handler/ResourceHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Copyright 2012 Nikita Koksharov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.corundumstudio.socketio.handler;
-
-import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.stream.ChunkedStream;
-import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.util.CharsetUtil;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.net.URLConnection;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-
-import javax.activation.MimetypesFileTypeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.corundumstudio.socketio.SocketIOChannelInitializer;
-
-@Sharable
-public class ResourceHandler extends ChannelInboundHandlerAdapter {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
- public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
- public static final int HTTP_CACHE_SECONDS = 60;
-
- private final Map resources = new HashMap();
-
- public ResourceHandler(String context) {
- addResource(context + "/static/flashsocket/WebSocketMain.swf", "/static/flashsocket/WebSocketMain.swf");
- addResource(context + "/static/flashsocket/WebSocketMainInsecure.swf", "/static/flashsocket/WebSocketMainInsecure.swf");
- }
-
- public void addResource(String pathPart, String resourcePath) {
- URL resUrl = getClass().getResource(resourcePath);
- if (resUrl == null) {
- log.error("The specified resource was not found: " + resourcePath);
- return;
- }
- resources.put(pathPart, resUrl);
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof FullHttpRequest) {
- FullHttpRequest req = (FullHttpRequest) msg;
- QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
- URL resUrl = resources.get(queryDecoder.path());
- if (resUrl != null) {
- URLConnection fileUrl = resUrl.openConnection();
- long lastModified = fileUrl.getLastModified();
- // check if file has been modified since last request
- if (isNotModified(req, lastModified)) {
- sendNotModified(ctx);
- req.release();
- return;
- }
- // create resource input-stream and check existence
- final InputStream is = fileUrl.getInputStream();
- if (is == null) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- // create ok response
- HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
- // set Content-Length header
- setContentLength(res, fileUrl.getContentLength());
- // set Content-Type header
- setContentTypeHeader(res, fileUrl);
- // set Date, Expires, Cache-Control and Last-Modified headers
- setDateAndCacheHeaders(res, lastModified);
- // write initial response header
- ctx.write(res);
-
- // write the content stream
- ctx.pipeline().addBefore(SocketIOChannelInitializer.RESOURCE_HANDLER, "chunkedWriter", new ChunkedWriteHandler());
- ChannelFuture writeFuture = ctx.channel().write(new ChunkedStream(is, fileUrl.getContentLength()));
- // add operation complete listener so we can close the channel and the input stream
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- return;
- }
- }
- ctx.fireChannelRead(msg);
- }
-
- /*
- * Checks if the content has been modified sicne the date provided by the IF_MODIFIED_SINCE http header
- * */
- private boolean isNotModified(HttpRequest request, long lastModified) throws ParseException {
- String ifModifiedSince = request.headers().get(HttpHeaders.Names.IF_MODIFIED_SINCE);
- if (ifModifiedSince != null && !ifModifiedSince.equals("")) {
- SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
- Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
-
- // Only compare up to the second because the datetime format we send to the client does
- // not have milliseconds
- long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
- long fileLastModifiedSeconds = lastModified / 1000;
- return ifModifiedSinceDateSeconds == fileLastModifiedSeconds;
- }
- return false;
- }
-
- /*
- * Sends a Not Modified response to the client
- *
- * */
- private void sendNotModified(ChannelHandlerContext ctx) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NOT_MODIFIED);
- setDateHeader(response);
-
- // Close the connection as soon as the error message is sent.
- ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Sets the Date header for the HTTP response
- *
- * @param response
- * HTTP response
- */
- private void setDateHeader(HttpResponse response) {
- SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
- dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
-
- Calendar time = new GregorianCalendar();
- HttpHeaders.setHeader(response, HttpHeaders.Names.DATE, dateFormatter.format(time.getTime()));
- }
-
-
- /**
- * Sends an Error response with status message
- *
- * @param ctx
- * @param status
- */
- private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- HttpHeaders.setHeader(response, CONTENT_TYPE, "text/plain; charset=UTF-8");
- ByteBuf content = Unpooled.copiedBuffer( "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8);
-
- ctx.channel().write(response);
- // Close the connection as soon as the error message is sent.
- ctx.channel().write(content).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Sets the Date and Cache headers for the HTTP Response
- *
- * @param response
- * HTTP response
- * @param fileToCache
- * file to extract content type
- */
- private void setDateAndCacheHeaders(HttpResponse response, long lastModified) {
- SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
- dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
-
- // Date header
- Calendar time = new GregorianCalendar();
- HttpHeaders.setHeader(response, HttpHeaders.Names.DATE, dateFormatter.format(time.getTime()));
-
- // Add cache headers
- time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
- HttpHeaders.setHeader(response, HttpHeaders.Names.EXPIRES, dateFormatter.format(time.getTime()));
- HttpHeaders.setHeader(response, HttpHeaders.Names.CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
- HttpHeaders.setHeader(response, HttpHeaders.Names.LAST_MODIFIED, dateFormatter.format(new Date(lastModified)));
- }
-
- /**
- * Sets the content type header for the HTTP Response
- *
- * @param response
- * HTTP response
- * @param file
- * file to extract content type
- */
- private void setContentTypeHeader(HttpResponse response, URLConnection resUrlConnection) {
- MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
- String resName = resUrlConnection.getURL().getFile();
- HttpHeaders.setHeader(response, HttpHeaders.Names.CONTENT_TYPE, mimeTypesMap.getContentType(resName));
- }
-}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/SocketIOException.java b/src/main/java/com/corundumstudio/socketio/handler/SocketIOException.java
index b5638e798..dbbfbde43 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/SocketIOException.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/SocketIOException.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java b/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java
index dfded95ce..eb536f293 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/SuccessAuthorizationListener.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,13 +16,14 @@
package com.corundumstudio.socketio.handler;
import com.corundumstudio.socketio.AuthorizationListener;
+import com.corundumstudio.socketio.AuthorizationResult;
import com.corundumstudio.socketio.HandshakeData;
public class SuccessAuthorizationListener implements AuthorizationListener {
@Override
- public boolean isAuthorized(HandshakeData data) {
- return true;
+ public AuthorizationResult getAuthorizationResult(HandshakeData data) {
+ return AuthorizationResult.SUCCESSFUL_AUTHORIZATION;
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/TransportState.java b/src/main/java/com/corundumstudio/socketio/handler/TransportState.java
index 4850639e0..6134aacfe 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/TransportState.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/TransportState.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java b/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java
index 078767f66..bd76f783a 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/WrongUrlHandler.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,9 +23,9 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponse;
@@ -35,20 +35,23 @@
@Sharable
public class WrongUrlHandler extends ChannelInboundHandlerAdapter {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(WrongUrlHandler.class);
+ @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
Channel channel = ctx.channel();
- QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
+ QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri());
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
ChannelFuture f = channel.writeAndFlush(res);
f.addListener(ChannelFutureListener.CLOSE);
req.release();
log.warn("Blocked wrong socket.io-context request! url: {}, params: {}, ip: {}", queryDecoder.path(), queryDecoder.parameters(), channel.remoteAddress());
+ return;
}
+ super.channelRead(ctx, msg);
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/listener/ClientListeners.java b/src/main/java/com/corundumstudio/socketio/listener/ClientListeners.java
index 284092043..a3f24c919 100644
--- a/src/main/java/com/corundumstudio/socketio/listener/ClientListeners.java
+++ b/src/main/java/com/corundumstudio/socketio/listener/ClientListeners.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,18 +15,36 @@
*/
package com.corundumstudio.socketio.listener;
+
public interface ClientListeners {
void addMultiTypeEventListener(String eventName, MultiTypeEventListener listener, Class> ... eventClass);
void addEventListener(String eventName, Class eventClass, DataListener listener);
+ void addEventInterceptor(EventInterceptor eventInterceptor);
+
void addDisconnectListener(DisconnectListener listener);
void addConnectListener(ConnectListener listener);
+ /**
+ * from v4, ping will always be sent by server except probe ping packet sent from client,
+ * and pong will always be responded by client while receiving ping except probe pong packet responded from server
+ * it makes no more sense to listen to ping packet, instead you can listen to pong packet
+ * @deprecated use addPongListener instead
+ * @param listener
+ */
+ @Deprecated
+ void addPingListener(PingListener listener);
+ void addPongListener(PongListener listener);
+
void addListeners(Object listeners);
- void addListeners(Object listeners, Class listenersClass);
+ void addListeners(Iterable listeners);
+
+ void addListeners(Object listeners, Class> listenersClass);
+ void removeAllListeners(String eventName);
+
}
diff --git a/src/main/java/com/corundumstudio/socketio/listener/ConnectListener.java b/src/main/java/com/corundumstudio/socketio/listener/ConnectListener.java
index fff737383..f4f4b22f8 100644
--- a/src/main/java/com/corundumstudio/socketio/listener/ConnectListener.java
+++ b/src/main/java/com/corundumstudio/socketio/listener/ConnectListener.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/main/java/com/corundumstudio/socketio/listener/DataListener.java b/src/main/java/com/corundumstudio/socketio/listener/DataListener.java
index c0f100de6..4809da3af 100644
--- a/src/main/java/com/corundumstudio/socketio/listener/DataListener.java
+++ b/src/main/java/com/corundumstudio/socketio/listener/DataListener.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ public interface DataListener {
*
* @param client - receiver
* @param data - received object
+ * @param ackSender - ack request
+ *
*/
void onData(SocketIOClient client, T data, AckRequest ackSender) throws Exception;
diff --git a/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java b/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java
index 15b1969f8..1662708f9 100644
--- a/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java
+++ b/src/main/java/com/corundumstudio/socketio/listener/DefaultExceptionListener.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2012 Nikita Koksharov
+ * Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@
public class DefaultExceptionListener extends ExceptionListenerAdapter {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log = LoggerFactory.getLogger(DefaultExceptionListener.class);
@Override
public void onEventException(Exception e, List