Skip to content

Commit 22ac9b4

Browse files
committed
Merge pull request AsyncHttpClient#336 from mgreene/master
Add a rate limited request filter
2 parents 67e9bc7 + 358a58e commit 22ac9b4

File tree

4 files changed

+209
-113
lines changed

4 files changed

+209
-113
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.asynchttpclient.extra;
2+
3+
import java.util.concurrent.Semaphore;
4+
5+
import org.asynchttpclient.AsyncHandler;
6+
import org.asynchttpclient.HttpResponseBodyPart;
7+
import org.asynchttpclient.HttpResponseHeaders;
8+
import org.asynchttpclient.HttpResponseStatus;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
public class AsyncHandlerWrapper<T> implements AsyncHandler<T> {
13+
14+
private final static Logger logger = LoggerFactory.getLogger(AsyncHandlerWrapper.class);
15+
private final AsyncHandler<T> asyncHandler;
16+
private final Semaphore available;
17+
18+
public AsyncHandlerWrapper(AsyncHandler<T> asyncHandler, Semaphore available) {
19+
this.asyncHandler = asyncHandler;
20+
this.available = available;
21+
}
22+
23+
/**
24+
* {@inheritDoc}
25+
*/
26+
/* @Override */
27+
public void onThrowable(Throwable t) {
28+
try {
29+
asyncHandler.onThrowable(t);
30+
} finally {
31+
available.release();
32+
if (logger.isDebugEnabled()) {
33+
logger.debug("Current Throttling Status after onThrowable {}", available.availablePermits());
34+
}
35+
}
36+
}
37+
38+
/**
39+
* {@inheritDoc}
40+
*/
41+
/* @Override */
42+
public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
43+
return asyncHandler.onBodyPartReceived(bodyPart);
44+
}
45+
46+
/**
47+
* {@inheritDoc}
48+
*/
49+
/* @Override */
50+
public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
51+
return asyncHandler.onStatusReceived(responseStatus);
52+
}
53+
54+
/**
55+
* {@inheritDoc}
56+
*/
57+
/* @Override */
58+
public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
59+
return asyncHandler.onHeadersReceived(headers);
60+
}
61+
62+
/**
63+
* {@inheritDoc}
64+
*/
65+
/* @Override */
66+
public T onCompleted() throws Exception {
67+
available.release();
68+
if (logger.isDebugEnabled()) {
69+
logger.debug("Current Throttling Status {}", available.availablePermits());
70+
}
71+
return asyncHandler.onCompleted();
72+
}
73+
}

api/src/main/java/org/asynchttpclient/extra/ThrottleRequestFilter.java

Lines changed: 40 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -12,126 +12,54 @@
1212
*/
1313
package org.asynchttpclient.extra;
1414

15-
import org.asynchttpclient.AsyncHandler;
16-
import org.asynchttpclient.HttpResponseBodyPart;
17-
import org.asynchttpclient.HttpResponseHeaders;
18-
import org.asynchttpclient.HttpResponseStatus;
19-
import org.asynchttpclient.filter.FilterContext;
20-
import org.asynchttpclient.filter.FilterException;
21-
import org.asynchttpclient.filter.RequestFilter;
15+
import java.util.concurrent.Semaphore;
16+
import java.util.concurrent.TimeUnit;
17+
2218
import org.asynchttpclient.filter.FilterContext;
2319
import org.asynchttpclient.filter.FilterException;
2420
import org.asynchttpclient.filter.RequestFilter;
2521
import org.slf4j.Logger;
2622
import org.slf4j.LoggerFactory;
2723

28-
import java.util.concurrent.Semaphore;
29-
import java.util.concurrent.TimeUnit;
30-
3124
/**
3225
* A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached, waiting for
3326
* the response to arrives before executing the next request.
3427
*/
3528
public class ThrottleRequestFilter implements RequestFilter {
36-
private final static Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);
37-
@SuppressWarnings("unused")
38-
private final int maxConnections;
39-
private final Semaphore available;
40-
private final int maxWait;
41-
42-
public ThrottleRequestFilter(int maxConnections) {
43-
this.maxConnections = maxConnections;
44-
this.maxWait = Integer.MAX_VALUE;
45-
available = new Semaphore(maxConnections, true);
46-
}
47-
48-
public ThrottleRequestFilter(int maxConnections, int maxWait) {
49-
this.maxConnections = maxConnections;
50-
this.maxWait = maxWait;
51-
available = new Semaphore(maxConnections, true);
52-
}
53-
54-
/**
55-
* {@inheritDoc}
56-
*/
57-
/* @Override */
58-
public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
59-
60-
try {
61-
if (logger.isDebugEnabled()) {
62-
logger.debug("Current Throttling Status {}", available.availablePermits());
63-
}
64-
if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
65-
throw new FilterException(
66-
String.format("No slot available for processing Request %s with AsyncHandler %s",
67-
ctx.getRequest(), ctx.getAsyncHandler()));
68-
}
69-
;
70-
} catch (InterruptedException e) {
71-
throw new FilterException(
72-
String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler()));
73-
}
74-
75-
return new FilterContext.FilterContextBuilder<T>(ctx).asyncHandler(new AsyncHandlerWrapper<T>(ctx.getAsyncHandler())).build();
76-
}
77-
78-
private class AsyncHandlerWrapper<T> implements AsyncHandler<T> {
79-
80-
private final AsyncHandler<T> asyncHandler;
81-
82-
public AsyncHandlerWrapper(AsyncHandler<T> asyncHandler) {
83-
this.asyncHandler = asyncHandler;
84-
}
85-
86-
/**
87-
* {@inheritDoc}
88-
*/
89-
/* @Override */
90-
public void onThrowable(Throwable t) {
91-
try {
92-
asyncHandler.onThrowable(t);
93-
} finally {
94-
available.release();
95-
if (logger.isDebugEnabled()) {
96-
logger.debug("Current Throttling Status after onThrowable {}", available.availablePermits());
97-
}
98-
}
99-
}
100-
101-
/**
102-
* {@inheritDoc}
103-
*/
104-
/* @Override */
105-
public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
106-
return asyncHandler.onBodyPartReceived(bodyPart);
107-
}
108-
109-
/**
110-
* {@inheritDoc}
111-
*/
112-
/* @Override */
113-
public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
114-
return asyncHandler.onStatusReceived(responseStatus);
115-
}
116-
117-
/**
118-
* {@inheritDoc}
119-
*/
120-
/* @Override */
121-
public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
122-
return asyncHandler.onHeadersReceived(headers);
123-
}
124-
125-
/**
126-
* {@inheritDoc}
127-
*/
128-
/* @Override */
129-
public T onCompleted() throws Exception {
130-
available.release();
131-
if (logger.isDebugEnabled()) {
132-
logger.debug("Current Throttling Status {}", available.availablePermits());
133-
}
134-
return asyncHandler.onCompleted();
135-
}
136-
}
137-
}
29+
private final static Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);
30+
private final Semaphore available;
31+
private final int maxWait;
32+
33+
public ThrottleRequestFilter(int maxConnections) {
34+
this(maxConnections, Integer.MAX_VALUE);
35+
}
36+
37+
public ThrottleRequestFilter(int maxConnections, int maxWait) {
38+
this.maxWait = maxWait;
39+
available = new Semaphore(maxConnections, true);
40+
}
41+
42+
/**
43+
* {@inheritDoc}
44+
*/
45+
/* @Override */
46+
public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
47+
48+
try {
49+
if (logger.isDebugEnabled()) {
50+
logger.debug("Current Throttling Status {}", available.availablePermits());
51+
}
52+
if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
53+
throw new FilterException(String.format(
54+
"No slot available for processing Request %s with AsyncHandler %s", ctx.getRequest(),
55+
ctx.getAsyncHandler()));
56+
}
57+
} catch (InterruptedException e) {
58+
throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(),
59+
ctx.getAsyncHandler()));
60+
}
61+
62+
return new FilterContext.FilterContextBuilder<T>(ctx).asyncHandler(
63+
new AsyncHandlerWrapper<T>(ctx.getAsyncHandler(), available)).build();
64+
}
65+
}

extras/guava/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<dependency>
1818
<groupId>com.google.guava</groupId>
1919
<artifactId>guava</artifactId>
20-
<version>11.0.2</version>
20+
<version>14.0.1</version>
2121
</dependency>
2222
</dependencies>
2323
</project>
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.asynchttpclient.extra;
2+
3+
import java.util.concurrent.Semaphore;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.asynchttpclient.filter.FilterContext;
7+
import org.asynchttpclient.filter.FilterException;
8+
import org.asynchttpclient.filter.RequestFilter;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import com.google.common.util.concurrent.RateLimiter;
13+
14+
/**
15+
* A {@link org.asynchttpclient.filter.RequestFilter} that extends the capability of
16+
* {@link ThrottleRequestFilter} by allowing rate limiting per second in addition to the
17+
* number of concurrent connections.
18+
*
19+
* The <code>maxWaitMs</code> argument is respected accross both permit acquistions. For
20+
* example, if 1000 ms is given, and the filter spends 500 ms waiting for a connection,
21+
* it will only spend another 500 ms waiting for the rate limiter.
22+
*/
23+
public class RateLimitedThrottleRequestFilter implements RequestFilter {
24+
private final static Logger logger = LoggerFactory.getLogger(RateLimitedThrottleRequestFilter.class);
25+
private final Semaphore available;
26+
private final int maxWaitMs;
27+
private final RateLimiter rateLimiter;
28+
29+
public RateLimitedThrottleRequestFilter(int maxConnections, double rateLimitPerSecond) {
30+
this(maxConnections, rateLimitPerSecond, Integer.MAX_VALUE);
31+
}
32+
33+
public RateLimitedThrottleRequestFilter(int maxConnections, double rateLimitPerSecond, int maxWaitMs) {
34+
this.maxWaitMs = maxWaitMs;
35+
this.rateLimiter = RateLimiter.create(rateLimitPerSecond);
36+
available = new Semaphore(maxConnections, true);
37+
}
38+
39+
/**
40+
* {@inheritDoc}
41+
*/
42+
@Override
43+
public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
44+
try {
45+
if (logger.isDebugEnabled()) {
46+
logger.debug("Current Throttling Status {}", available.availablePermits());
47+
}
48+
49+
long startOfWait = System.currentTimeMillis();
50+
attemptConcurrencyPermitAcquistion(ctx);
51+
52+
attemptRateLimitedPermitAcquistion(ctx, startOfWait);
53+
} catch (InterruptedException e) {
54+
throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(),
55+
ctx.getAsyncHandler()));
56+
}
57+
58+
return new FilterContext.FilterContextBuilder<T>(ctx).asyncHandler(
59+
new AsyncHandlerWrapper<T>(ctx.getAsyncHandler(), available)).build();
60+
}
61+
62+
private <T> void attemptRateLimitedPermitAcquistion(FilterContext<T> ctx, long startOfWait) throws FilterException {
63+
long wait = getMillisRemainingInMaxWait(startOfWait);
64+
65+
if (!rateLimiter.tryAcquire(wait, TimeUnit.MILLISECONDS)) {
66+
throw new FilterException(String.format(
67+
"Wait for rate limit exceeded during processing Request %s with AsyncHandler %s", ctx.getRequest(),
68+
ctx.getAsyncHandler()));
69+
}
70+
}
71+
72+
private <T> void attemptConcurrencyPermitAcquistion(FilterContext<T> ctx) throws InterruptedException,
73+
FilterException {
74+
if (!available.tryAcquire(maxWaitMs, TimeUnit.MILLISECONDS)) {
75+
throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s",
76+
ctx.getRequest(), ctx.getAsyncHandler()));
77+
}
78+
}
79+
80+
private long getMillisRemainingInMaxWait(long startOfWait) {
81+
int MINUTE_IN_MILLIS = 60000;
82+
long durationLeft = maxWaitMs - (System.currentTimeMillis() - startOfWait);
83+
long nonNegativeDuration = Math.max(durationLeft, 0);
84+
85+
// have to reduce the duration because there is a boundary case inside the Guava
86+
// rate limiter where if the duration to wait is near Long.MAX_VALUE, the rate
87+
// limiter's internal calculations can exceed Long.MAX_VALUE resulting in a
88+
// negative number which causes the tryAcquire() method to fail unexpectedly
89+
if (Long.MAX_VALUE - nonNegativeDuration < MINUTE_IN_MILLIS) {
90+
return nonNegativeDuration - MINUTE_IN_MILLIS;
91+
}
92+
93+
return nonNegativeDuration;
94+
}
95+
}

0 commit comments

Comments
 (0)