|
| 1 | + ------ |
| 2 | + Async Http Client - Using Filters |
| 3 | + ------ |
| 4 | + Jeanfrancois Arcand |
| 5 | + ------ |
| 6 | + 2012 |
| 7 | + |
| 8 | +Using Filters |
| 9 | + |
| 10 | + The library supports three types of <<<Filter>>> who can intercept, transform, decorate and replay transactions: |
| 11 | + <<<Request>>>, <<<Response>>> and <<<IOException>>>. |
| 12 | + |
| 13 | +* Request Filter |
| 14 | + |
| 15 | + Request Filters are useful if you need to manipulate the Request or AsyncHandler object before the request is made. As an example, you can throttle requests using the following RequestFilter implementation: |
| 16 | + |
| 17 | ++-----+ |
| 18 | +public class ThrottleRequestFilter implements RequestFilter { |
| 19 | + private final int maxConnections; |
| 20 | + private final Semaphore available; |
| 21 | + private final int maxWait; |
| 22 | + |
| 23 | + public ThrottleRequestFilter(int maxConnections) { |
| 24 | + this.maxConnections = maxConnections; |
| 25 | + this.maxWait = Integer.MAX_VALUE; |
| 26 | + available = new Semaphore(maxConnections, true); |
| 27 | + } |
| 28 | + |
| 29 | + public ThrottleRequestFilter(int maxConnections, int maxWait) { |
| 30 | + this.maxConnections = maxConnections; |
| 31 | + this.maxWait = maxWait; |
| 32 | + available = new Semaphore(maxConnections, true); |
| 33 | + } |
| 34 | + |
| 35 | + public FilterContext filter(FilterContext ctx) throws FilterException { |
| 36 | + try { |
| 37 | + if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) |
| 38 | + throw new FilterException(String.format("No slot available for Request %s with AsyncHandler %s", |
| 39 | + ctx.getRequest(), |
| 40 | + ctx.getAsyncHandler())); |
| 41 | + } |
| 42 | + } catch (InterruptedException e) { |
| 43 | + throw new FilterException( String.format("Interrupted Request %s with AsyncHandler %s", |
| 44 | + ctx.getRequest(), |
| 45 | + ctx.getAsyncHandler())); |
| 46 | + } |
| 47 | + |
| 48 | + return new FilterContext(new AsyncHandlerWrapper(ctx.getAsyncHandler()), ctx.getRequest()); |
| 49 | + } |
| 50 | + |
| 51 | +} |
| 52 | + |
| 53 | +private class AsyncHandlerWrapper implements AsyncHandler<T> { |
| 54 | + private final AsyncHandler asyncHandler; |
| 55 | + |
| 56 | + public AsyncHandlerWrapper(AsyncHandler asyncHandler) { |
| 57 | + this.asyncHandler = asyncHandler; |
| 58 | + } |
| 59 | + |
| 60 | + public void onThrowable(Throwable t) { |
| 61 | + asyncHandler.onThrowable(t); |
| 62 | + } |
| 63 | + |
| 64 | + public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { |
| 65 | + return asyncHandler.onBodyPartReceived(bodyPart); |
| 66 | + } |
| 67 | + |
| 68 | + public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception { |
| 69 | + return asyncHandler.onStatusReceived(responseStatus); |
| 70 | + } |
| 71 | + |
| 72 | + public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception { |
| 73 | + return asyncHandler.onHeadersReceived(headers); } |
| 74 | + |
| 75 | + public T onCompleted() throws Exception { |
| 76 | + available.release(); |
| 77 | + return asyncHandler.onCompleted(); |
| 78 | + } |
| 79 | +} |
| 80 | ++-----+ |
| 81 | + |
| 82 | + In the above, we decorate the original <<<AsyncHandler>>> and use semaphore to throttle requests. |
| 83 | + To add <<<RequestFilter>>>, all you need to do is to configure it on the <<<AsyncHttpClientConfig>>>: |
| 84 | + |
| 85 | ++-----+ |
| 86 | +AsyncHttpClientConfig.Builder b = new AsyncHttpClientConfig.Builder(); |
| 87 | +b.addRequestFilter(new ThrottleRequestFilter(100)); |
| 88 | +AsyncHttpClient c = new AsyncHttpClient(b.build()); |
| 89 | ++-----+ |
| 90 | + |
| 91 | + * Response Filter |
| 92 | + |
| 93 | + Like with <<<Request>>>, you can also filter the <<<Response>>>'s bytes before an <<<AsyncHandler>>> gets called. |
| 94 | + <<<Response Filters>>> are always invoked before the library executes the logic for authentication, proxy challenging, |
| 95 | + redirection etc. That means an application can takes control of those operations at any moment using a <<<Response Filter>>>. |
| 96 | + |
| 97 | + As an example, the following <<<Response Filter>>> redirect request from <<<google.ca>>> to <<<google.com>>> in case |
| 98 | + <<<.ca>>> is not responding: |
| 99 | + |
| 100 | ++-----+ |
| 101 | +AsyncHttpClientConfig.Builder b = new AsyncHttpClientConfig.Builder(); |
| 102 | +b.addResponseFilter(new ResponseFilter() { |
| 103 | + public FilterContext filter(FilterContext ctx) throws FilterException { |
| 104 | + if (ctx.getResponseStatus().getStatusCode() == 503) { |
| 105 | + return new FilterContext.FilterContextBuilder(ctx) |
| 106 | + .request(new RequestBuilder("GET") |
| 107 | + .setUrl("http://google.com").build()) |
| 108 | + .build(); |
| 109 | + } |
| 110 | + } |
| 111 | +}); |
| 112 | +AsyncHttpClient c = new AsyncHttpClient(b.build()); |
| 113 | ++-----+ |
| 114 | + |
| 115 | +* IOException Filter |
| 116 | + |
| 117 | + The AsyncHttpClient library support <<<IOExceptionFilter>>> that can be used to replay a request in case server a |
| 118 | + server goes down or unresponsive, a network outage occurs, or nay kind of I/O abnormal situation. |
| 119 | + |
| 120 | + In those cases, the library will catch the <<<IOException>>> and delegate the <<<IOException>>> handling to the <<<Filter>>>. |
| 121 | + |
| 122 | + As an example, the following filter will resume an interrupted download instead of restarting downloading the file |
| 123 | + from the beginning: |
| 124 | + |
| 125 | ++-----+ |
| 126 | +AsyncHttpClient c = new AsyncHttpClient( |
| 127 | + new AsyncHttpClientConfig.Builder() |
| 128 | + .addIOExceptionFilter(new ResumableIOExceptionFilter()).build()); |
| 129 | + |
| 130 | +Response r = c.prepareGet("http://host:port/LargeFile.avi").execute(new AsyncHandler(){...}).get(); |
| 131 | ++-----+ |
| 132 | + |
| 133 | + The <<<IOExceptionFilter>>> is defined as |
| 134 | + |
| 135 | ++-----+ |
| 136 | +public class ResumableIOExceptionFilter implements IOExceptionFilter { |
| 137 | + public FilterContext filter(FilterContext ctx) throws FilterException { |
| 138 | + if (ctx.getIOException() != null ) { |
| 139 | + Request request = new RequestBuilder(ctx.getRequest()).setRangeOffset(file.length()); |
| 140 | + return new FilterContext.FilterContextBuilder(ctx) |
| 141 | + .request(request) |
| 142 | + .replayRequest(true) |
| 143 | + .build(); |
| 144 | + } |
| 145 | + return ctx; |
| 146 | + } |
| 147 | +} |
| 148 | ++-----+ |
| 149 | + |
| 150 | +In the above we just catch any <<<IOException>>> and replay the request using the <<<Range>>> header to tell the remote |
| 151 | +server to restart sending bytes at that position. This way we don't need to re download the entire file. |
0 commit comments