21
21
* it will only spend another 500 ms waiting for the rate limiter.
22
22
*/
23
23
public class RateLimitedThrottleRequestFilter implements RequestFilter {
24
- private final static Logger logger = LoggerFactory .getLogger (RateLimitedThrottleRequestFilter .class );
25
- private final Semaphore available ;
26
- private final int maxWaitMs ;
27
- private final RateLimiter rateLimiter ;
28
-
29
- public RateLimitedThrottleRequestFilter (int maxConnections , double rateLimitPerSecond ) {
30
- this (maxConnections , rateLimitPerSecond , Integer .MAX_VALUE );
31
- }
32
-
33
- public RateLimitedThrottleRequestFilter (int maxConnections , double rateLimitPerSecond , int maxWaitMs ) {
34
- this .maxWaitMs = maxWaitMs ;
35
- this .rateLimiter = RateLimiter .create (rateLimitPerSecond );
36
- available = new Semaphore (maxConnections , true );
37
- }
38
-
39
- /**
40
- * {@inheritDoc}
41
- */
42
- @ Override
43
- public <T > FilterContext <T > filter (FilterContext <T > ctx ) throws FilterException {
44
- try {
45
- if (logger .isDebugEnabled ()) {
46
- logger .debug ("Current Throttling Status {}" , available .availablePermits ());
47
- }
48
-
49
- long startOfWait = System .currentTimeMillis ();
50
- attemptConcurrencyPermitAcquistion (ctx );
51
-
52
- attemptRateLimitedPermitAcquistion (ctx , startOfWait );
53
- } catch (InterruptedException e ) {
54
- throw new FilterException (String .format ("Interrupted Request %s with AsyncHandler %s" , ctx .getRequest (),
55
- ctx .getAsyncHandler ()));
56
- }
57
-
58
- return new FilterContext .FilterContextBuilder <T >(ctx ).asyncHandler (
59
- new AsyncHandlerWrapper <T >(ctx .getAsyncHandler (), available )).build ();
60
- }
61
-
62
- private <T > void attemptRateLimitedPermitAcquistion (FilterContext <T > ctx , long startOfWait ) throws FilterException {
63
- long wait = getMillisRemainingInMaxWait (startOfWait );
64
-
65
- if (!rateLimiter .tryAcquire (wait , TimeUnit .MILLISECONDS )) {
66
- throw new FilterException (String .format (
67
- "Wait for rate limit exceeded during processing Request %s with AsyncHandler %s" , ctx .getRequest (),
68
- ctx .getAsyncHandler ()));
69
- }
70
- }
71
-
72
- private <T > void attemptConcurrencyPermitAcquistion (FilterContext <T > ctx ) throws InterruptedException ,
73
- FilterException {
74
- if (!available .tryAcquire (maxWaitMs , TimeUnit .MILLISECONDS )) {
75
- throw new FilterException (String .format ("No slot available for processing Request %s with AsyncHandler %s" ,
76
- ctx .getRequest (), ctx .getAsyncHandler ()));
77
- }
78
- }
79
-
80
- private long getMillisRemainingInMaxWait (long startOfWait ) {
81
- int MINUTE_IN_MILLIS = 60000 ;
82
- long durationLeft = maxWaitMs - (System .currentTimeMillis () - startOfWait );
83
- long nonNegativeDuration = Math .max (durationLeft , 0 );
84
-
85
- // have to reduce the duration because there is a boundary case inside the Guava
86
- // rate limiter where if the duration to wait is near Long.MAX_VALUE, the rate
87
- // limiter's internal calculations can exceed Long.MAX_VALUE resulting in a
88
- // negative number which causes the tryAcquire() method to fail unexpectedly
89
- if (Long .MAX_VALUE - nonNegativeDuration < MINUTE_IN_MILLIS ) {
90
- return nonNegativeDuration - MINUTE_IN_MILLIS ;
91
- }
92
-
93
- return nonNegativeDuration ;
94
- }
95
- }
24
+ private final static Logger logger = LoggerFactory .getLogger (RateLimitedThrottleRequestFilter .class );
25
+ private final Semaphore available ;
26
+ private final int maxWaitMs ;
27
+ private final RateLimiter rateLimiter ;
28
+
29
+ public RateLimitedThrottleRequestFilter (int maxConnections , double rateLimitPerSecond ) {
30
+ this (maxConnections , rateLimitPerSecond , Integer .MAX_VALUE );
31
+ }
32
+
33
+ public RateLimitedThrottleRequestFilter (int maxConnections , double rateLimitPerSecond , int maxWaitMs ) {
34
+ this .maxWaitMs = maxWaitMs ;
35
+ this .rateLimiter = RateLimiter .create (rateLimitPerSecond );
36
+ available = new Semaphore (maxConnections , true );
37
+ }
38
+
39
+ /**
40
+ * {@inheritDoc}
41
+ */
42
+ @ Override
43
+ public <T > FilterContext <T > filter (FilterContext <T > ctx ) throws FilterException {
44
+ try {
45
+ if (logger .isDebugEnabled ()) {
46
+ logger .debug ("Current Throttling Status {}" , available .availablePermits ());
47
+ }
48
+
49
+ long startOfWait = System .currentTimeMillis ();
50
+ attemptConcurrencyPermitAcquistion (ctx );
51
+
52
+ attemptRateLimitedPermitAcquistion (ctx , startOfWait );
53
+ } catch (InterruptedException e ) {
54
+ throw new FilterException (String .format ("Interrupted Request %s with AsyncHandler %s" , ctx .getRequest (), ctx .getAsyncHandler ()));
55
+ }
56
+
57
+ return new FilterContext .FilterContextBuilder <T >(ctx ).asyncHandler (new AsyncHandlerWrapper <T >(ctx .getAsyncHandler (), available ))
58
+ .build ();
59
+ }
60
+
61
+ private <T > void attemptRateLimitedPermitAcquistion (FilterContext <T > ctx , long startOfWait ) throws FilterException {
62
+ long wait = getMillisRemainingInMaxWait (startOfWait );
63
+
64
+ if (!rateLimiter .tryAcquire (wait , TimeUnit .MILLISECONDS )) {
65
+ throw new FilterException (String .format ("Wait for rate limit exceeded during processing Request %s with AsyncHandler %s" ,
66
+ ctx .getRequest (), ctx .getAsyncHandler ()));
67
+ }
68
+ }
69
+
70
+ private <T > void attemptConcurrencyPermitAcquistion (FilterContext <T > ctx ) throws InterruptedException , FilterException {
71
+ if (!available .tryAcquire (maxWaitMs , TimeUnit .MILLISECONDS )) {
72
+ throw new FilterException (String .format ("No slot available for processing Request %s with AsyncHandler %s" , ctx .getRequest (),
73
+ ctx .getAsyncHandler ()));
74
+ }
75
+ }
76
+
77
+ private long getMillisRemainingInMaxWait (long startOfWait ) {
78
+ int MINUTE_IN_MILLIS = 60000 ;
79
+ long durationLeft = maxWaitMs - (System .currentTimeMillis () - startOfWait );
80
+ long nonNegativeDuration = Math .max (durationLeft , 0 );
81
+
82
+ // have to reduce the duration because there is a boundary case inside the Guava
83
+ // rate limiter where if the duration to wait is near Long.MAX_VALUE, the rate
84
+ // limiter's internal calculations can exceed Long.MAX_VALUE resulting in a
85
+ // negative number which causes the tryAcquire() method to fail unexpectedly
86
+ if (Long .MAX_VALUE - nonNegativeDuration < MINUTE_IN_MILLIS ) {
87
+ return nonNegativeDuration - MINUTE_IN_MILLIS ;
88
+ }
89
+
90
+ return nonNegativeDuration ;
91
+ }
92
+ }
0 commit comments