Skip to content

Several fixes for the Grizzly provider. #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 22, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,8 +68,10 @@ public GrizzlyConnectionsPool(final AsyncHttpClientConfig config) {
listener = new Connection.CloseListener() {
@Override
public void onClosed(Connection connection, Connection.CloseType closeType) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("Remote closed connection ({}). Removing from cache", connection.toString());
if (closeType == Connection.CloseType.REMOTELY) {
if (LOG.isInfoEnabled()) {
LOG.info("Remote closed connection ({}). Removing from cache", connection.toString());
}
}
GrizzlyConnectionsPool.this.removeAll(connection);
}
Expand All @@ -91,6 +94,10 @@ public boolean offer(String uri, Connection connection) {

DelayedExecutor.IdleConnectionQueue conQueue = connectionsPool.get(uri);
if (conQueue == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new Connection queue for uri [{}] and connection [{}]",
new Object[]{uri, connection});
}
DelayedExecutor.IdleConnectionQueue newPool =
delayedExecutor.createIdleConnectionQueue(timeout);
conQueue = connectionsPool.putIfAbsent(uri, newPool);
Expand All @@ -100,13 +107,21 @@ public boolean offer(String uri, Connection connection) {
}

final int size = conQueue.size();

if (maxConnectionsPerHost == -1 || size < maxConnectionsPerHost) {
conQueue.offer(connection);
connection.addCloseListener(listener);
totalCachedConnections.incrementAndGet();
final int total = totalCachedConnections.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("[offer] Pooling connection [{}] for uri [{}]. Current size (for host; before pooling): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}].",
new Object[]{connection, uri, size, maxConnectionsPerHost, total});
}
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("[offer] Unable to pool connection [{}] for uri [{}]. Current size (for host): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}].",
new Object[]{connection, uri, size, maxConnectionsPerHost, totalCachedConnections.get()});
}

return false;
}

Expand All @@ -125,7 +140,7 @@ public Connection poll(String uri) {
if (conQueue != null) {
boolean poolEmpty = false;
while (!poolEmpty && connection == null) {
if (conQueue.size() > 0) {
if (!conQueue.isEmpty()) {
connection = conQueue.poll();
}

Expand All @@ -136,11 +151,18 @@ public Connection poll(String uri) {
connection = null;
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[poll] No existing queue for uri [{}].",
new Object[]{uri});
}
}
if (connection != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("[poll] Found pooled connection [{}] for uri [{}].",
new Object[]{connection, uri});
}
totalCachedConnections.decrementAndGet();
}
if (connection != null) {
connection.removeCloseListener(listener);
}
return connection;
Expand Down Expand Up @@ -306,8 +328,8 @@ public void run() {
delayQueue.queue.offer(element);
} else {
try {
if (LOG.isInfoEnabled()) {
LOG.info("Idle connection ({}) detected. Removing from cache.", element.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Idle connection ({}) detected. Removing from cache.", element.toString());
}
element.close().markForRecycle(true);
} catch (Exception ignored) {
Expand All @@ -332,8 +354,8 @@ public void run() {


final class IdleConnectionQueue {
final BlockingQueue<Connection> queue =
DataStructures.getLTQInstance(Connection.class);
final ConcurrentLinkedQueue<Connection> queue =
new ConcurrentLinkedQueue<Connection>();


final TimeoutResolver resolver = new TimeoutResolver();
Expand All @@ -350,30 +372,34 @@ public IdleConnectionQueue(final long timeout) {
// ------------------------------------------------- Private Methods


private void offer(final Connection c) {
void offer(final Connection c) {
if (timeout >= 0) {
resolver.setTimeoutMs(c, System.currentTimeMillis() + timeout);
}
queue.offer(c);
}

private Connection poll() {
Connection poll() {
return queue.poll();
}

public boolean remove(final Connection c) {
boolean remove(final Connection c) {
if (timeout >= 0) {
resolver.removeTimeout(c);

}
return queue.remove(c);
}

public int size() {
int size() {
return queue.size();
}

boolean isEmpty() {
return queue.isEmpty();
}

public void destroy() {
void destroy() {
try {
for (Connection c : queue) {
c.close().markForRecycle(true);
Expand All @@ -390,7 +416,7 @@ public void destroy() {
// ------------------------------------------------------ Nested Classes


private static final class TimeoutResolver {
static final class TimeoutResolver {

private static final String IDLE_ATTRIBUTE_NAME = "grizzly-ahc-conn-pool-idle-attribute";
private static final Attribute<IdleRecord> IDLE_ATTR =
Expand All @@ -407,25 +433,25 @@ public IdleRecord evaluate() {
// ------------------------------------------------- Private Methods


private boolean removeTimeout(final Connection c) {
boolean removeTimeout(final Connection c) {
IDLE_ATTR.get(c).timeoutMs = 0;
return true;
}

private Long getTimeoutMs(final Connection c) {
Long getTimeoutMs(final Connection c) {
return IDLE_ATTR.get(c).timeoutMs;
}

private void setTimeoutMs(final Connection c, final long timeoutMs) {
void setTimeoutMs(final Connection c, final long timeoutMs) {
IDLE_ATTR.get(c).timeoutMs = timeoutMs;
}


// -------------------------------------------------- Nested Classes

private static final class IdleRecord {
static final class IdleRecord {

private volatile long timeoutMs;
volatile long timeoutMs;

} // END IdleRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ public boolean isDone() {

public V get() throws InterruptedException, ExecutionException {

try {
return get(60, TimeUnit.SECONDS);
} catch (TimeoutException te) {
throw new ExecutionException(te);
}
return delegate.get();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@
*/
public class GrizzlyResponseHeaders extends HttpResponseHeaders {

private final FluentCaseInsensitiveStringsMap headers = new FluentCaseInsensitiveStringsMap();
private final FluentCaseInsensitiveStringsMap headers =
new FluentCaseInsensitiveStringsMap();
private final HttpResponsePacket response;
private volatile boolean initialized;

// ------------------------------------------------------------ Constructors


public GrizzlyResponseHeaders(final HttpResponsePacket response,
final URI uri,
final AsyncHttpProvider provider) {

super(uri, provider);
this.response = response;

final MimeHeaders headersLocal = response.getHeaders();
for (String name : headersLocal.names()) {
for (String header : headersLocal.values(name)) {
headers.add(name, header);
}
}
}


Expand All @@ -57,6 +56,19 @@ public GrizzlyResponseHeaders(final HttpResponsePacket response,
*/
@Override
public FluentCaseInsensitiveStringsMap getHeaders() {
if (!initialized) {
synchronized (headers) {
if (!initialized) {
initialized = true;
final MimeHeaders headersLocal = response.getHeaders();
for (String name : headersLocal.names()) {
for (String header : headersLocal.values(name)) {
headers.add(name, header);
}
}
}
}
}
return headers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import com.ning.http.util.ProxyUtils;
import com.ning.http.util.SslUtils;
import com.ning.http.util.UTF8UrlEncoder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -626,7 +624,6 @@ private void configure(URI uri, HttpURLConnection urlConnection, Request request
urlConnection.setRequestProperty("Content-Type", mre.getContentType());
urlConnection.setRequestProperty("Content-Length", String.valueOf(mre.getContentLength()));

ChannelBuffer b = ChannelBuffers.dynamicBuffer(lenght);
mre.writeRequest(urlConnection.getOutputStream());
} else if (request.getEntityWriter() != null) {
int lenght = (int) request.getContentLength();
Expand Down
Loading