Skip to content

NettyResponseFuture#abort invokes callbacks before isDone() returns true #1368

Closed
@joedj

Description

@joedj

This issue makes it difficult to e.g. integrate NettyResponseFuture with Guava's ListenableFuture.

The current behaviour is incompatible with com.google.common.util.concurrent.Futures#getDone(Future), because getDone(future) checks the state of future.isDone() before calling future.get().

I believe this could be fixed by swapping the order of future.completeExceptionally(t) and terminateAndExit() in NettyResponseFuture#abort, or possibly by changing NettyResponseFuture#isDone() to delegate to the internal future.isDone().

Here is a test case that demonstrates the issue:

import com.google.common.util.concurrent.MoreExecutors;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.junit.Test;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.*;

public class NettyResponseFutureTest {

    // this test always fails, because isDone() is called on the netty callback thread
    @Test
    public void testIsDoneBeforeCallbacks() throws Exception {
        checkFutureIsDone(MoreExecutors.directExecutor());
    }

    // this test usually passes, because there's a race between netty setting isDone
    // and the listener being invoked on the executor thread
    @Test
    public void testIsDoneBeforeCallbacksInAnotherThread() throws InterruptedException {
        checkFutureIsDone(Executors.newSingleThreadExecutor());
    }

    private void checkFutureIsDone(Executor executor) throws InterruptedException {
        AsyncHttpClient client = new DefaultAsyncHttpClient();
        // the intent here is to generate ConnectException
        ListenableFuture<Response> future = client.prepareGet("http://localhost:12345/").execute();

        AtomicBoolean done = new AtomicBoolean();
        AtomicBoolean complete = new AtomicBoolean();

        future.addListener(() -> {
            done.set(future.isDone());
            synchronized (complete) {
                complete.set(true);
                complete.notifyAll();
            }
        }, executor);

        synchronized (complete) {
            while (!complete.get()) {
                complete.wait();
            }
        }

        assertTrue("Listener called before future.isDone()", done.get());
    }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions