Skip to content

Commit 3e78a04

Browse files
Markus Heidenslandelle
Markus Heiden
authored andcommitted
Make ThrottleRequestFilter preserve interfaces of AsyncHandlers, close AsyncHttpClient#1314
1 parent d59fd20 commit 3e78a04

File tree

4 files changed

+67
-94
lines changed

4 files changed

+67
-94
lines changed

client/src/main/java/org/asynchttpclient/filter/AsyncHandlerWrapper.java

Lines changed: 0 additions & 79 deletions
This file was deleted.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.asynchttpclient.filter;
2+
3+
import java.lang.reflect.InvocationHandler;
4+
import java.lang.reflect.Method;
5+
import java.lang.reflect.Proxy;
6+
import java.util.Collections;
7+
import java.util.HashSet;
8+
import java.util.Set;
9+
import java.util.concurrent.Semaphore;
10+
11+
import org.asynchttpclient.AsyncHandler;
12+
13+
/**
14+
* Wrapper for {@link AsyncHandler}s to release a permit on {@link AsyncHandler#onCompleted()}.
15+
* This is done via a dynamic proxy to preserve all interfaces of the wrapped handler.
16+
*/
17+
public class ReleasePermitOnComplete {
18+
/**
19+
* Wrap handler to release the permit of the semaphore on {@link AsyncHandler#onCompleted()}.
20+
*/
21+
public static <T> AsyncHandler<T> wrap(final AsyncHandler<T> handler, final Semaphore available) {
22+
Class<?> handlerClass = handler.getClass();
23+
ClassLoader classLoader = handlerClass.getClassLoader();
24+
Class<?>[] interfaces = allInterfaces(handlerClass);
25+
26+
return (AsyncHandler<T>) Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler() {
27+
@Override
28+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
29+
try {
30+
return method.invoke(handler, args);
31+
} finally {
32+
if ("onCompleted".equals(method.getName())) {
33+
available.release();
34+
}
35+
}
36+
}
37+
});
38+
}
39+
40+
/**
41+
* Extract all interfaces of a class.
42+
*/
43+
static Class<?>[] allInterfaces(Class<?> handlerClass) {
44+
Set<Class<?>> allInterfaces = new HashSet<>();
45+
for (Class<?> clazz = handlerClass; clazz != null; clazz = clazz.getSuperclass()) {
46+
Collections.addAll(allInterfaces, clazz.getInterfaces());
47+
}
48+
return allInterfaces.toArray(new Class[allInterfaces.size()]);
49+
}
50+
}

client/src/main/java/org/asynchttpclient/filter/ThrottleRequestFilter.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import java.util.concurrent.TimeUnit;
2020

2121
/**
22-
* A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached, waiting for
23-
* the response to arrives before executing the next request.
22+
* A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached,
23+
* waiting for the response to arrives before executing the next request.
2424
*/
2525
public class ThrottleRequestFilter implements RequestFilter {
26-
private final static Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);
26+
private static final Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);
2727
private final Semaphore available;
2828
private final int maxWait;
2929

@@ -32,33 +32,34 @@ public ThrottleRequestFilter(int maxConnections) {
3232
}
3333

3434
public ThrottleRequestFilter(int maxConnections, int maxWait) {
35-
this(maxConnections, maxWait, false);
35+
this(maxConnections, maxWait, false);
3636
}
3737

3838
public ThrottleRequestFilter(int maxConnections, int maxWait, boolean fair) {
39-
this.maxWait = maxWait;
40-
available = new Semaphore(maxConnections, fair);
39+
this.maxWait = maxWait;
40+
available = new Semaphore(maxConnections, fair);
4141
}
4242

4343
/**
4444
* {@inheritDoc}
4545
*/
4646
@Override
4747
public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
48-
4948
try {
5049
if (logger.isDebugEnabled()) {
5150
logger.debug("Current Throttling Status {}", available.availablePermits());
5251
}
5352
if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
5453
throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s",
55-
ctx.getRequest(), ctx.getAsyncHandler()));
54+
ctx.getRequest(), ctx.getAsyncHandler()));
5655
}
5756
} catch (InterruptedException e) {
58-
throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler()));
57+
throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s",
58+
ctx.getRequest(), ctx.getAsyncHandler()));
5959
}
6060

61-
return new FilterContext.FilterContextBuilder<>(ctx).asyncHandler(new AsyncHandlerWrapper<>(ctx.getAsyncHandler(), available))
62-
.build();
61+
return new FilterContext.FilterContextBuilder<>(ctx)
62+
.asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available))
63+
.build();
6364
}
64-
}
65+
}

extras/guava/src/main/java/org/asynchttpclient/extras/guava/RateLimitedThrottleRequestFilter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package org.asynchttpclient.extras.guava;
22

3-
import org.asynchttpclient.filter.AsyncHandlerWrapper;
43
import org.asynchttpclient.filter.FilterContext;
54
import org.asynchttpclient.filter.FilterException;
5+
import org.asynchttpclient.filter.ReleasePermitOnComplete;
66
import org.asynchttpclient.filter.RequestFilter;
77
import org.asynchttpclient.filter.ThrottleRequestFilter;
88
import org.slf4j.Logger;
@@ -56,8 +56,9 @@ public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException
5656
throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler()));
5757
}
5858

59-
return new FilterContext.FilterContextBuilder<>(ctx).asyncHandler(new AsyncHandlerWrapper<>(ctx.getAsyncHandler(), available))
60-
.build();
59+
return new FilterContext.FilterContextBuilder<>(ctx)
60+
.asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available))
61+
.build();
6162
}
6263

6364
private <T> void attemptRateLimitedPermitAcquistion(FilterContext<T> ctx, long startOfWait) throws FilterException {

0 commit comments

Comments
 (0)