1
+ package org .asynchttpclient .extra ;
2
+
3
+ import java .util .concurrent .Semaphore ;
4
+ import java .util .concurrent .TimeUnit ;
5
+
6
+ import org .asynchttpclient .filter .FilterContext ;
7
+ import org .asynchttpclient .filter .FilterException ;
8
+ import org .asynchttpclient .filter .RequestFilter ;
9
+ import org .slf4j .Logger ;
10
+ import org .slf4j .LoggerFactory ;
11
+
12
+ import com .google .common .util .concurrent .RateLimiter ;
13
+
14
+ /**
15
+ * A {@link org.asynchttpclient.filter.RequestFilter} that extends the capability of
16
+ * {@link ThrottleRequestFilter} by allowing rate limiting per second in addition to the
17
+ * number of concurrent connections.
18
+ *
19
+ * The <code>maxWaitMs</code> argument is respected accross both permit acquistions. For
20
+ * example, if 1000 ms is given, and the filter spends 500 ms waiting for a connection,
21
+ * it will only spend another 500 ms waiting for the rate limiter.
22
+ */
23
+ public class RateLimitedThrottleRequestFilter implements RequestFilter {
24
+ private final static Logger logger = LoggerFactory .getLogger (RateLimitedThrottleRequestFilter .class );
25
+ private final Semaphore available ;
26
+ private final int maxWaitMs ;
27
+ private final RateLimiter rateLimiter ;
28
+
29
+ public RateLimitedThrottleRequestFilter (int maxConnections , double rateLimitPerSecond ) {
30
+ this (maxConnections , rateLimitPerSecond , Integer .MAX_VALUE );
31
+ }
32
+
33
+ public RateLimitedThrottleRequestFilter (int maxConnections , double rateLimitPerSecond , int maxWaitMs ) {
34
+ this .maxWaitMs = maxWaitMs ;
35
+ this .rateLimiter = RateLimiter .create (rateLimitPerSecond );
36
+ available = new Semaphore (maxConnections , true );
37
+ }
38
+
39
+ /**
40
+ * {@inheritDoc}
41
+ */
42
+ @ Override
43
+ public <T > FilterContext <T > filter (FilterContext <T > ctx ) throws FilterException {
44
+ try {
45
+ if (logger .isDebugEnabled ()) {
46
+ logger .debug ("Current Throttling Status {}" , available .availablePermits ());
47
+ }
48
+
49
+ long startOfWait = System .currentTimeMillis ();
50
+ attemptConcurrencyPermitAcquistion (ctx );
51
+
52
+ attemptRateLimitedPermitAcquistion (ctx , startOfWait );
53
+ } catch (InterruptedException e ) {
54
+ throw new FilterException (String .format ("Interrupted Request %s with AsyncHandler %s" , ctx .getRequest (),
55
+ ctx .getAsyncHandler ()));
56
+ }
57
+
58
+ return new FilterContext .FilterContextBuilder <T >(ctx ).asyncHandler (
59
+ new AsyncHandlerWrapper <T >(ctx .getAsyncHandler (), available )).build ();
60
+ }
61
+
62
+ private <T > void attemptRateLimitedPermitAcquistion (FilterContext <T > ctx , long startOfWait ) throws FilterException {
63
+ long wait = getMillisRemainingInMaxWait (startOfWait );
64
+
65
+ if (!rateLimiter .tryAcquire (wait , TimeUnit .MILLISECONDS )) {
66
+ throw new FilterException (String .format (
67
+ "Wait for rate limit exceeded during processing Request %s with AsyncHandler %s" , ctx .getRequest (),
68
+ ctx .getAsyncHandler ()));
69
+ }
70
+ }
71
+
72
+ private <T > void attemptConcurrencyPermitAcquistion (FilterContext <T > ctx ) throws InterruptedException ,
73
+ FilterException {
74
+ if (!available .tryAcquire (maxWaitMs , TimeUnit .MILLISECONDS )) {
75
+ throw new FilterException (String .format ("No slot available for processing Request %s with AsyncHandler %s" ,
76
+ ctx .getRequest (), ctx .getAsyncHandler ()));
77
+ }
78
+ }
79
+
80
+ private long getMillisRemainingInMaxWait (long startOfWait ) {
81
+ int MINUTE_IN_MILLIS = 60000 ;
82
+ long durationLeft = maxWaitMs - (System .currentTimeMillis () - startOfWait );
83
+ long nonNegativeDuration = Math .max (durationLeft , 0 );
84
+
85
+ // have to reduce the duration because there is a boundary case inside the Guava
86
+ // rate limiter where if the duration to wait is near Long.MAX_VALUE, the rate
87
+ // limiter's internal calculations can exceed Long.MAX_VALUE resulting in a
88
+ // negative number which causes the tryAcquire() method to fail unexpectedly
89
+ if (Long .MAX_VALUE - nonNegativeDuration < MINUTE_IN_MILLIS ) {
90
+ return nonNegativeDuration - MINUTE_IN_MILLIS ;
91
+ }
92
+
93
+ return nonNegativeDuration ;
94
+ }
95
+ }
0 commit comments