diff --git a/src/main/java/com/ning/http/client/AsyncHttpClient.java b/src/main/java/com/ning/http/client/AsyncHttpClient.java index d19c048523..1433805bb3 100755 --- a/src/main/java/com/ning/http/client/AsyncHttpClient.java +++ b/src/main/java/com/ning/http/client/AsyncHttpClient.java @@ -275,7 +275,7 @@ public Request build() { if (i >= 0) { url = url.substring(0, i); } - signatureCalculator.calculateAndAddSignature(baseURL, request, this); + signatureCalculator.calculateAndAddSignature(url, request, this); } return super.build(); } diff --git a/src/main/java/com/ning/http/client/generators/InputStreamBodyGenerator.java b/src/main/java/com/ning/http/client/generators/InputStreamBodyGenerator.java index c9787586d0..2672041da0 100644 --- a/src/main/java/com/ning/http/client/generators/InputStreamBodyGenerator.java +++ b/src/main/java/com/ning/http/client/generators/InputStreamBodyGenerator.java @@ -67,7 +67,7 @@ public long getContentLength() { public long read(ByteBuffer buffer) throws IOException { // To be safe. - chunk = new byte[buffer.capacity() - 10]; + chunk = new byte[buffer.remaining() - 10]; int read = -1; diff --git a/src/main/java/com/ning/http/client/ntlm/NTLMEngine.java b/src/main/java/com/ning/http/client/ntlm/NTLMEngine.java index 386360f1ef..1d532f3575 100644 --- a/src/main/java/com/ning/http/client/ntlm/NTLMEngine.java +++ b/src/main/java/com/ning/http/client/ntlm/NTLMEngine.java @@ -76,7 +76,7 @@ public class NTLMEngine { java.security.SecureRandom rnd = null; try { rnd = java.security.SecureRandom.getInstance("SHA1PRNG"); - } catch (Exception e) { + } catch (Exception ignored) { } RND_GEN = rnd; } @@ -753,7 +753,7 @@ String getResponse() throws UnsupportedEncodingException { } else { resp = messageContents; } - return new String(Base64.encode(resp)); + return Base64.encode(resp); } } @@ -996,8 +996,7 @@ String getResponse() throws UnsupportedEncodingException { int domainOffset = ntRespOffset + ntRespLen; int userOffset = domainOffset + domainLen; int hostOffset = userOffset + userLen; - int sessionKeyOffset = hostOffset + hostLen; - int finalLength = sessionKeyOffset + 0; + int finalLength = hostOffset + hostLen; // Start the response. Length includes signature and type prepareResponse(finalLength, 3); @@ -1127,7 +1126,6 @@ void update(byte[] input) { int transferAmt = input.length - inputIndex; System.arraycopy(input, inputIndex, dataBuffer, curBufferPos, transferAmt); count += transferAmt; - curBufferPos += transferAmt; } } diff --git a/src/main/java/com/ning/http/client/oauth/OAuthSignatureCalculator.java b/src/main/java/com/ning/http/client/oauth/OAuthSignatureCalculator.java index 9e40363d25..e33ad87c73 100644 --- a/src/main/java/com/ning/http/client/oauth/OAuthSignatureCalculator.java +++ b/src/main/java/com/ning/http/client/oauth/OAuthSignatureCalculator.java @@ -45,16 +45,16 @@ public class OAuthSignatureCalculator implements SignatureCalculator { public final static String HEADER_AUTHORIZATION = "Authorization"; - private final String KEY_OAUTH_CONSUMER_KEY = "oauth_consumer_key"; - private final String KEY_OAUTH_NONCE = "oauth_nonce"; - private final String KEY_OAUTH_SIGNATURE = "oauth_signature"; - private final String KEY_OAUTH_SIGNATURE_METHOD = "oauth_signature_method"; - private final String KEY_OAUTH_TIMESTAMP = "oauth_timestamp"; - private final String KEY_OAUTH_TOKEN = "oauth_token"; - private final String KEY_OAUTH_VERSION = "oauth_version"; + private static final String KEY_OAUTH_CONSUMER_KEY = "oauth_consumer_key"; + private static final String KEY_OAUTH_NONCE = "oauth_nonce"; + private static final String KEY_OAUTH_SIGNATURE = "oauth_signature"; + private static final String KEY_OAUTH_SIGNATURE_METHOD = "oauth_signature_method"; + private static final String KEY_OAUTH_TIMESTAMP = "oauth_timestamp"; + private static final String KEY_OAUTH_TOKEN = "oauth_token"; + private static final String KEY_OAUTH_VERSION = "oauth_version"; - private final String OAUTH_VERSION_1_0 = "1.0"; - private final String OAUTH_SIGNATURE_METHOD = "HMAC-SHA1"; + private static final String OAUTH_VERSION_1_0 = "1.0"; + private static final String OAUTH_SIGNATURE_METHOD = "HMAC-SHA1"; /** * To generate Nonce, need some (pseudo)randomness; no need for @@ -84,12 +84,12 @@ public OAuthSignatureCalculator(ConsumerKey consumerAuth, RequestToken userAuth) //@Override // silly 1.5; doesn't allow this for interfaces public void calculateAndAddSignature(String baseURL, Request request, RequestBuilderBase requestBuilder) { - String method = request.getMethod().toString(); // POST etc + String method = request.getMethod(); // POST etc String nonce = generateNonce(); long timestamp = System.currentTimeMillis() / 1000L; String signature = calculateSignature(method, baseURL, timestamp, nonce, request.getParams(), request.getQueryParams()); String headerValue = constructAuthHeader(signature, nonce, timestamp); - requestBuilder = requestBuilder.setHeader(HEADER_AUTHORIZATION, headerValue); + requestBuilder.setHeader(HEADER_AUTHORIZATION, headerValue); } /** @@ -260,5 +260,25 @@ public int compareTo(Parameter other) { public String toString() { return key + "=" + value; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Parameter parameter = (Parameter) o; + + if (!key.equals(parameter.key)) return false; + if (!value.equals(parameter.value)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = key.hashCode(); + result = 31 * result + value.hashCode(); + return result; + } } } diff --git a/src/main/java/com/ning/http/client/providers/grizzly/FeedableBodyGenerator.java b/src/main/java/com/ning/http/client/providers/grizzly/FeedableBodyGenerator.java new file mode 100644 index 0000000000..7a2601c0ee --- /dev/null +++ b/src/main/java/com/ning/http/client/providers/grizzly/FeedableBodyGenerator.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2011 Sonatype, Inc. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.ning.http.client.providers.grizzly; + +import com.ning.http.client.Body; +import com.ning.http.client.BodyGenerator; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.glassfish.grizzly.Buffer; +import org.glassfish.grizzly.filterchain.FilterChainContext; +import org.glassfish.grizzly.http.HttpContent; +import org.glassfish.grizzly.http.HttpRequestPacket; + +/** + * {@link BodyGenerator} which may return just part of the payload at the time + * handler is requesting it. If it happens - PartialBodyGenerator becomes responsible + * for finishing payload transferring asynchronously. + * + * @author The Grizzly Team + * @since 1.7.0 + */ +public class FeedableBodyGenerator implements BodyGenerator { + private final Queue queue = new ConcurrentLinkedQueue(); + private final AtomicInteger queueSize = new AtomicInteger(); + + private volatile HttpRequestPacket requestPacket; + private volatile FilterChainContext context; + + @Override + public Body createBody() throws IOException { + return new EmptyBody(); + } + + public void feed(final Buffer buffer, final boolean isLast) + throws IOException { + queue.offer(new BodyPart(buffer, isLast)); + queueSize.incrementAndGet(); + + if (context != null) { + flushQueue(); + } + } + + void initializeAsynchronousTransfer(final FilterChainContext context, + final HttpRequestPacket requestPacket) throws IOException { + this.context = context; + this.requestPacket = requestPacket; + flushQueue(); + } + + private void flushQueue() throws IOException { + if (queueSize.get() > 0) { + synchronized(this) { + while(queueSize.get() > 0) { + final BodyPart bodyPart = queue.poll(); + queueSize.decrementAndGet(); + final HttpContent content = + requestPacket.httpContentBuilder() + .content(bodyPart.buffer) + .last(bodyPart.isLast) + .build(); + context.write(content, ((!requestPacket.isCommitted()) ? + context.getTransportContext().getCompletionHandler() : + null)); + + } + } + } + } + + private final class EmptyBody implements Body { + + @Override + public long getContentLength() { + return -1; + } + + @Override + public long read(final ByteBuffer buffer) throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + context.completeAndRecycle(); + context = null; + requestPacket = null; + } + } + + private final static class BodyPart { + private final boolean isLast; + private final Buffer buffer; + + public BodyPart(final Buffer buffer, final boolean isLast) { + this.buffer = buffer; + this.isLast = isLast; + } + } +} diff --git a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java index 8623887355..0d1231fbf7 100644 --- a/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java +++ b/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java @@ -97,7 +97,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URLEncoder; -import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.Collection; import java.util.HashMap; @@ -279,57 +278,47 @@ protected ListenableFuture execute(final Connection c, protected void initializeTransport(AsyncHttpClientConfig clientConfig) { - GrizzlyAsyncHttpProviderConfig providerConfig = - (GrizzlyAsyncHttpProviderConfig) clientConfig.getAsyncHttpProviderConfig(); - if (providerConfig != null) { - final TransportCustomizer customizer = (TransportCustomizer) - providerConfig.getProperty(TRANSPORT_CUSTOMIZER); - if (customizer != null) { - customizer.customize(clientTransport); - } else { - clientTransport.setIOStrategy(SameThreadIOStrategy.getInstance()); - } - } else { - clientTransport.setIOStrategy(SameThreadIOStrategy.getInstance()); - } + final FilterChainBuilder fcb = FilterChainBuilder.stateless(); fcb.add(new AsyncHttpClientTransportFilter()); final int timeout = clientConfig.getRequestTimeoutInMs(); - int delay = 500; - if (timeout < delay) { - delay = timeout - 10; - } - timeoutExecutor = IdleTimeoutFilter.createDefaultIdleDelayedExecutor(delay, TimeUnit.MILLISECONDS); - timeoutExecutor.start(); - final IdleTimeoutFilter.TimeoutResolver timeoutResolver = - new IdleTimeoutFilter.TimeoutResolver() { - @Override - public long getTimeout(FilterChainContext ctx) { - final HttpTransactionContext context = - GrizzlyAsyncHttpProvider.this.getHttpTransactionContext(ctx.getConnection()); - if (context != null) { - final PerRequestConfig config = context.request.getPerRequestConfig(); - if (config != null) { - final long timeout = config.getRequestTimeoutInMs(); - if (timeout > 0) { - return timeout; + if (timeout > 0) { + int delay = 500; + if (timeout < delay) { + delay = timeout - 10; + } + timeoutExecutor = IdleTimeoutFilter.createDefaultIdleDelayedExecutor(delay, TimeUnit.MILLISECONDS); + timeoutExecutor.start(); + final IdleTimeoutFilter.TimeoutResolver timeoutResolver = + new IdleTimeoutFilter.TimeoutResolver() { + @Override + public long getTimeout(FilterChainContext ctx) { + final HttpTransactionContext context = + GrizzlyAsyncHttpProvider.this.getHttpTransactionContext(ctx.getConnection()); + if (context != null) { + final PerRequestConfig config = context.request.getPerRequestConfig(); + if (config != null) { + final long timeout = config.getRequestTimeoutInMs(); + if (timeout > 0) { + return timeout; + } } } + return timeout; } - return timeout; - } - }; - final IdleTimeoutFilter timeoutFilter = new IdleTimeoutFilter(timeoutExecutor, - timeoutResolver, - new IdleTimeoutFilter.TimeoutHandler() { - public void onTimeout(Connection connection) { - timeout(connection); - } - }); - fcb.add(timeoutFilter); - resolver = timeoutFilter.getResolver(); + }; + final IdleTimeoutFilter timeoutFilter = new IdleTimeoutFilter(timeoutExecutor, + timeoutResolver, + new IdleTimeoutFilter.TimeoutHandler() { + public void onTimeout(Connection connection) { + timeout(connection); + } + }); + fcb.add(timeoutFilter); + resolver = timeoutFilter.getResolver(); + } SSLContext context = clientConfig.getSSLContext(); boolean defaultSecState = (context != null); @@ -365,6 +354,21 @@ public void onTimeout(Connection connection) { } fcb.add(eventFilter); fcb.add(clientFilter); + + GrizzlyAsyncHttpProviderConfig providerConfig = + (GrizzlyAsyncHttpProviderConfig) clientConfig.getAsyncHttpProviderConfig(); + if (providerConfig != null) { + final TransportCustomizer customizer = (TransportCustomizer) + providerConfig.getProperty(TRANSPORT_CUSTOMIZER); + if (customizer != null) { + customizer.customize(clientTransport, fcb); + } else { + clientTransport.setIOStrategy(SameThreadIOStrategy.getInstance()); + } + } else { + clientTransport.setIOStrategy(SameThreadIOStrategy.getInstance()); + } + clientTransport.setProcessor(fcb.build()); } @@ -380,12 +384,16 @@ void touchConnection(final Connection c, final Request request) { final long timeout = config.getRequestTimeoutInMs(); if (timeout > 0) { final long newTimeout = System.currentTimeMillis() + timeout; - resolver.setTimeoutMillis(c, newTimeout); + if (resolver != null) { + resolver.setTimeoutMillis(c, newTimeout); + } } } else { final long timeout = clientConfig.getRequestTimeoutInMs(); if (timeout > 0) { - resolver.setTimeoutMillis(c, System.currentTimeMillis() + timeout); + if (resolver != null) { + resolver.setTimeoutMillis(c, System.currentTimeMillis() + timeout); + } } } @@ -460,11 +468,13 @@ static int getPort(final URI uri, final int p) { @SuppressWarnings({"unchecked"}) - void sendRequest(final FilterChainContext ctx, + boolean sendRequest(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { + boolean isWriteComplete = true; + if (requestHasEntityBody(request)) { final HttpTransactionContext context = getHttpTransactionContext(ctx.getConnection()); BodyHandler handler = bodyHandlerFactory.getBodyHandler(request); @@ -473,13 +483,15 @@ void sendRequest(final FilterChainContext ctx, handler = new ExpectHandler(handler); } context.bodyHandler = handler; - handler.doHandle(ctx, request, requestPacket); + isWriteComplete = handler.doHandle(ctx, request, requestPacket); } else { ctx.write(requestPacket, ctx.getTransportContext().getCompletionHandler()); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("REQUEST: " + requestPacket.toString()); } + + return isWriteComplete; } @@ -686,7 +698,9 @@ public NextAction handleWrite(final FilterChainContext ctx) Object message = ctx.getMessage(); if (message instanceof Request) { ctx.setMessage(null); - sendAsGrizzlyRequest((Request) message, ctx); + if (!sendAsGrizzlyRequest((Request) message, ctx)) { + return ctx.getSuspendAction(); + } } return ctx.getStopAction(); @@ -713,7 +727,7 @@ public NextAction handleEvent(final FilterChainContext ctx, - private void sendAsGrizzlyRequest(final Request request, + private boolean sendAsGrizzlyRequest(final Request request, final FilterChainContext ctx) throws IOException { @@ -781,7 +795,7 @@ private void sendAsGrizzlyRequest(final Request request, new FluentCaseInsensitiveStringsMap(request.getHeaders()); TransferCompletionHandler.class.cast(h).transferAdapter(new GrizzlyTransferAdapter(map)); } - sendRequest(ctx, request, requestPacket); + return sendRequest(ctx, request, requestPacket); } @@ -1056,6 +1070,7 @@ protected void onInitialLineParsed(HttpHeader httpHeader, protected void onHttpError(final HttpHeader httpHeader, final FilterChainContext ctx, final Throwable t) throws IOException { + t.printStackTrace(); httpHeader.setSkipRemainder(true); final HttpTransactionContext context = provider.getHttpTransactionContext(ctx.getConnection()); @@ -1513,7 +1528,7 @@ private static interface BodyHandler { boolean handlesBodyType(final Request request); - void doHandle(final FilterChainContext ctx, + boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException; @@ -1569,10 +1584,11 @@ public boolean handlesBodyType(Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(FilterChainContext ctx, Request request, HttpRequestPacket requestPacket) throws IOException { + public boolean doHandle(FilterChainContext ctx, Request request, HttpRequestPacket requestPacket) throws IOException { this.request = request; this.requestPacket = requestPacket; ctx.write(requestPacket, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); + return true; } public void finish(final FilterChainContext ctx) throws IOException { @@ -1592,7 +1608,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1612,6 +1628,7 @@ public void doHandle(final FilterChainContext ctx, final HttpContent content = requestPacket.httpContentBuilder().content(gBuffer).build(); content.setLast(true); ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); + return true; } } @@ -1627,7 +1644,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1647,6 +1664,7 @@ public void doHandle(final FilterChainContext ctx, final HttpContent content = requestPacket.httpContentBuilder().content(gBuffer).build(); content.setLast(true); ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); + return true; } } // END StringBodyHandler @@ -1663,7 +1681,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1671,6 +1689,7 @@ public void doHandle(final FilterChainContext ctx, final HttpContent content = requestPacket.httpContentBuilder().content(Buffers.EMPTY_BUFFER).build(); content.setLast(true); ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); + return true; } } // END NoBodyHandler @@ -1688,7 +1707,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1733,6 +1752,7 @@ public void doHandle(final FilterChainContext ctx, content.setLast(true); ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } + return true; } } // END ParamsBodyHandler @@ -1748,7 +1768,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1766,6 +1786,7 @@ public void doHandle(final FilterChainContext ctx, ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } + return true; } } // END EntityWriterBodyHandler @@ -1781,7 +1802,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1815,6 +1836,8 @@ public void doHandle(final FilterChainContext ctx, content.setLast(true); ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } + + return true; } } // END StreamDataBodyHandler @@ -1831,7 +1854,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1854,6 +1877,7 @@ public void doHandle(final FilterChainContext ctx, ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } + return true; } } // END PartsBodyHandler @@ -1869,7 +1893,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1880,23 +1904,32 @@ public void doHandle(final FilterChainContext ctx, AtomicInteger written = new AtomicInteger(); boolean last = false; requestPacket.setContentLengthLong(f.length()); - for (byte[] buf = new byte[MAX_CHUNK_SIZE]; !last; ) { - Buffer b = null; - int read; - if ((read = fis.read(buf)) < 0) { - last = true; - b = Buffers.EMPTY_BUFFER; + try { + for (byte[] buf = new byte[MAX_CHUNK_SIZE]; !last; ) { + Buffer b = null; + int read; + if ((read = fis.read(buf)) < 0) { + last = true; + b = Buffers.EMPTY_BUFFER; + } + if (b != Buffers.EMPTY_BUFFER) { + written.addAndGet(read); + b = Buffers.wrap(mm, buf, 0, read); + } + + final HttpContent content = + requestPacket.httpContentBuilder().content(b). + last(last).build(); + ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } - if (b != Buffers.EMPTY_BUFFER) { - written.addAndGet(read); - b = Buffers.wrap(mm, buf, 0, read); + } finally { + try { + fis.close(); + } catch (IOException ignored) { } - - final HttpContent content = - requestPacket.httpContentBuilder().content(b). - last(last).build(); - ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } + + return true; } } // END FileBodyHandler @@ -1912,7 +1945,7 @@ public boolean handlesBodyType(final Request request) { } @SuppressWarnings({"unchecked"}) - public void doHandle(final FilterChainContext ctx, + public boolean doHandle(final FilterChainContext ctx, final Request request, final HttpRequestPacket requestPacket) throws IOException { @@ -1926,21 +1959,42 @@ public void doHandle(final FilterChainContext ctx, requestPacket.setChunked(true); } + final MemoryManager mm = ctx.getMemoryManager(); boolean last = false; - for (ByteBuffer buffer = ByteBuffer.allocate(MAX_CHUNK_SIZE); !last; ) { - buffer.clear(); - if (bodyLocal.read(buffer) < 0) { - last = true; - buffer = ByteBuffer.allocate(0); + + while (!last) { + Buffer buffer = mm.allocate(MAX_CHUNK_SIZE); + buffer.allowBufferDispose(true); + + final long readBytes = bodyLocal.read(buffer.toByteBuffer()); + if (readBytes > 0) { + buffer.position((int) readBytes); + buffer.trim(); + } else { + buffer.dispose(); + + if (readBytes < 0) { + last = true; + buffer = Buffers.EMPTY_BUFFER; + } else { + // pass the context to bodyLocal to be able to + // continue body transferring once more data is available + if (generator instanceof FeedableBodyGenerator) { + ((FeedableBodyGenerator) generator).initializeAsynchronousTransfer(ctx, requestPacket); + return false; + } else { + throw new IllegalStateException("BodyGenerator unexpectedly returned 0 bytes available"); + } + } } - final MemoryManager mm = ctx.getMemoryManager(); - buffer.flip(); - Buffer b = Buffers.wrap(mm, buffer); + final HttpContent content = - requestPacket.httpContentBuilder().content(b). + requestPacket.httpContentBuilder().content(buffer). last(last).build(); ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); } + + return true; } } // END BodyGeneratorBodyHandler @@ -2082,7 +2136,9 @@ boolean returnConnection(final String url, final Connection c) { final boolean result = (DO_NOT_CACHE.get(c) == null && pool.offer(AsyncHttpProviderUtils.getBaseUrl(url), c)); if (result) { - provider.resolver.setTimeoutMillis(c, IdleTimeoutFilter.FOREVER); + if (provider.resolver != null) { + provider.resolver.setTimeoutMillis(c, IdleTimeoutFilter.FOREVER); + } } return result; diff --git a/src/main/java/com/ning/http/client/providers/grizzly/TransportCustomizer.java b/src/main/java/com/ning/http/client/providers/grizzly/TransportCustomizer.java index c72e5ff275..8c673c3add 100644 --- a/src/main/java/com/ning/http/client/providers/grizzly/TransportCustomizer.java +++ b/src/main/java/com/ning/http/client/providers/grizzly/TransportCustomizer.java @@ -13,6 +13,7 @@ package com.ning.http.client.providers.grizzly; +import org.glassfish.grizzly.filterchain.FilterChainBuilder; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; /** @@ -27,10 +28,17 @@ public interface TransportCustomizer { /** * Customizes the configuration of the provided {@link TCPNIOTransport} - * instance. + * and {@link FilterChainBuilder} instances. * * @param transport the {@link TCPNIOTransport} instance for this client. + * @param filterChainBuilder the {@link FilterChainBuilder} that will + * produce the {@link org.glassfish.grizzly.filterchain.FilterChain} that + * will be used to send/receive data. The FilterChain will be populated + * with the Filters typically used for processing HTTP client requests. + * These filters should generally be left alone. But this does allow + * adding additional filters to the chain to add additional features. */ - void customize(final TCPNIOTransport transport); + void customize(final TCPNIOTransport transport, + final FilterChainBuilder filterChainBuilder); } diff --git a/src/main/java/com/ning/http/client/resumable/PropertiesBasedResumableProcessor.java b/src/main/java/com/ning/http/client/resumable/PropertiesBasedResumableProcessor.java index c0995ac012..0eba5b8824 100644 --- a/src/main/java/com/ning/http/client/resumable/PropertiesBasedResumableProcessor.java +++ b/src/main/java/com/ning/http/client/resumable/PropertiesBasedResumableProcessor.java @@ -60,9 +60,13 @@ public void save(Map map) { FileOutputStream os = null; try { - TMP.mkdirs(); + if (!TMP.mkdirs()) { + throw new IllegalStateException("Unable to create directory: " + TMP.getAbsolutePath()); + } File f = new File(TMP, storeName); - f.createNewFile(); + if (!f.createNewFile()) { + throw new IllegalStateException("Unable to create temp file: " + f.getAbsolutePath()); + } if (!f.canWrite()) { throw new IllegalStateException(); } @@ -79,7 +83,7 @@ public void save(Map map) { if (os != null) { try { os.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } } diff --git a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyAsyncProviderBasicTest.java b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyAsyncProviderBasicTest.java index b45d01fa1a..6c68ff867a 100644 --- a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyAsyncProviderBasicTest.java +++ b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyAsyncProviderBasicTest.java @@ -20,6 +20,7 @@ import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig; import com.ning.http.client.providers.grizzly.TransportCustomizer; +import org.glassfish.grizzly.filterchain.FilterChainBuilder; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.glassfish.grizzly.strategies.SameThreadIOStrategy; @@ -41,7 +42,7 @@ protected AsyncHttpProviderConfig getProviderConfig() { final GrizzlyAsyncHttpProviderConfig config = new GrizzlyAsyncHttpProviderConfig(); config.addProperty(TRANSPORT_CUSTOMIZER, new TransportCustomizer() { @Override - public void customize(TCPNIOTransport transport) { + public void customize(TCPNIOTransport transport, FilterChainBuilder builder) { transport.setTcpNoDelay(true); transport.setIOStrategy(SameThreadIOStrategy.getInstance()); } diff --git a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyChunkingTest.java b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyChunkingTest.java index 8240993401..fe44f75490 100644 --- a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyChunkingTest.java +++ b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyChunkingTest.java @@ -18,10 +18,6 @@ import com.ning.http.client.async.ChunkingTest; import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; -/** - * TODO: This test current fails as InputStreamBodyGenerator adds chunk - * information. - */ public class GrizzlyChunkingTest extends ChunkingTest { @Override diff --git a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyRedirectConnectionUsageTest.java b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyRedirectConnectionUsageTest.java index 9d78d31a9f..036220c4ad 100644 --- a/src/test/java/com/ning/http/client/async/grizzly/GrizzlyRedirectConnectionUsageTest.java +++ b/src/test/java/com/ning/http/client/async/grizzly/GrizzlyRedirectConnectionUsageTest.java @@ -20,6 +20,7 @@ import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig; import com.ning.http.client.providers.grizzly.TransportCustomizer; +import org.glassfish.grizzly.filterchain.FilterChainBuilder; import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.glassfish.grizzly.strategies.SameThreadIOStrategy; @@ -40,7 +41,7 @@ protected AsyncHttpProviderConfig getProviderConfig() { final GrizzlyAsyncHttpProviderConfig config = new GrizzlyAsyncHttpProviderConfig(); config.addProperty(TRANSPORT_CUSTOMIZER, new TransportCustomizer() { @Override - public void customize(TCPNIOTransport transport) { + public void customize(TCPNIOTransport transport, FilterChainBuilder builder) { if (System.getProperty("blockingio") != null) { transport.configureBlocking(true); }