Skip to content

Replace AtomicBoolean with AtomicFieldUpdater #1291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -70,13 +68,28 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {

// state mutated from outside the event loop
// TODO check if they are indeed mutated outside the event loop
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
private final AtomicBoolean inAuth = new AtomicBoolean(false);
private final AtomicBoolean inProxyAuth = new AtomicBoolean(false);
private final AtomicBoolean statusReceived = new AtomicBoolean(false);
private final AtomicBoolean contentProcessed = new AtomicBoolean(false);
private final AtomicBoolean onThrowableCalled = new AtomicBoolean(false);
private volatile int isDone = 0;
private volatile int isCancelled = 0;
private volatile int inAuth = 0;
private volatile int inProxyAuth = 0;
private volatile int statusReceived = 0;
private volatile int contentProcessed = 0;
private volatile int onThrowableCalled = 0;

private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isCancelledField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isCancelled");
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> inAuthField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inAuth");
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> inProxyAuthField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inProxyAuth");
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> statusReceivedField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "statusReceived");
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> contentProcessedField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField =
AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");

// volatile where we need CAS ops
private volatile int redirectCount = 0;
Expand Down Expand Up @@ -124,19 +137,19 @@ public NettyResponseFuture(Request originalRequest,//

@Override
public boolean isDone() {
return isDone.get() || isCancelled();
return isDone != 0 || isCancelled();
}

@Override
public boolean isCancelled() {
return isCancelled.get();
return isCancelled != 0;
}

@Override
public boolean cancel(boolean force) {
cancelTimeouts();

if (isCancelled.getAndSet(true))
if (isCancelledField.getAndSet(this, 1) != 0)
return false;

// cancel could happen before channel was attached
Expand All @@ -145,7 +158,7 @@ public boolean cancel(boolean force) {
Channels.silentlyCloseChannel(channel);
}

if (!onThrowableCalled.getAndSet(true)) {
if (onThrowableCalledField.getAndSet(this, 1) == 0) {
try {
asyncHandler.onThrowable(new CancellationException());
} catch (Throwable t) {
Expand Down Expand Up @@ -183,11 +196,11 @@ private V getContent() throws ExecutionException {
V update = (V) CONTENT_UPDATER.get(this);
// No more retry
CURRENT_RETRY_UPDATER.set(this, maxRetry);
if (!contentProcessed.getAndSet(true)) {
if (contentProcessedField.getAndSet(this, 1) == 0) {
try {
update = asyncHandler.onCompleted();
} catch (Throwable ex) {
if (!onThrowableCalled.getAndSet(true)) {
if (onThrowableCalledField.getAndSet(this, 1) == 0) {
try {
try {
asyncHandler.onThrowable(ex);
Expand All @@ -211,7 +224,7 @@ private boolean terminateAndExit() {
cancelTimeouts();
this.channel = null;
this.reuseChannel = false;
return isDone.getAndSet(true) || isCancelled.get();
return isDoneField.getAndSet(this, 1) != 0 || isCancelled != 0;
}

public final void done() {
Expand Down Expand Up @@ -241,7 +254,7 @@ public final void abort(final Throwable t) {
if (terminateAndExit())
return;

if (onThrowableCalled.compareAndSet(false, true)) {
if (onThrowableCalledField.compareAndSet(this, 0, 1)) {
try {
asyncHandler.onThrowable(t);
} catch (Throwable te) {
Expand Down Expand Up @@ -341,12 +354,28 @@ public TimeoutsHolder getTimeoutsHolder() {
return timeoutsHolder;
}

public AtomicBoolean getInAuth() {
return inAuth;
public boolean getInAuth() {
return inAuth != 0;
}

public AtomicBoolean getInProxyAuth() {
return inProxyAuth;
public void setInAuth(boolean inAuth) {
this.inAuth = inAuth ? 1 : 0;
}

public boolean getAndSetInAuth(boolean set) {
return inAuthField.getAndSet(this, set ? 1 : 0) != 0;
}

public boolean getInProxyAuth() {
return inProxyAuth != 0;
}

public void setInProxyAuth(boolean inProxyAuth) {
this.inProxyAuth = inProxyAuth ? 1 : 0;
}

public boolean getAndSetInProxyAuth(boolean inProxyAuth) {
return inProxyAuthField.getAndSet(this, inProxyAuth ? 1 : 0) != 0;
}

public ChannelState getChannelState() {
Expand All @@ -358,7 +387,7 @@ public void setChannelState(ChannelState channelState) {
}

public boolean getAndSetStatusReceived(boolean sr) {
return statusReceived.getAndSet(sr);
return statusReceivedField.getAndSet(this, sr ? 1 : 0) != 0;
}

public boolean isStreamWasAlreadyConsumed() {
Expand Down Expand Up @@ -439,7 +468,9 @@ public void setCurrentRequest(Request currentRequest) {
* @return true if that {@link Future} cannot be recovered.
*/
public boolean canBeReplayed() {
return !isDone() && !(Channels.isChannelValid(channel) && !getUri().getScheme().equalsIgnoreCase("https")) && !inAuth.get() && !inProxyAuth.get();
return !isDone() && !(Channels.isChannelValid(channel) && !getUri().getScheme().equalsIgnoreCase("https"))
&& inAuth == 0
&& inProxyAuth == 0;
}

public long getStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public boolean exitAfterHandling407(//
ProxyServer proxyServer,//
HttpRequest httpRequest) {

if (future.getInProxyAuth().getAndSet(true)) {
if (future.getAndSetInProxyAuth(true)) {
LOGGER.info("Can't handle 407 as auth was already performed");
return false;
}
Expand Down Expand Up @@ -210,7 +210,7 @@ private void ntlmProxyChallenge(String authenticateHeader,//
// FIXME we might want to filter current NTLM and add (leave other
// Authorization headers untouched)
requestHeaders.set(HttpHeaders.Names.PROXY_AUTHORIZATION, "NTLM " + challengeHeader);
future.getInProxyAuth().set(false);
future.setInProxyAuth(false);

} else {
String serverChallenge = authenticateHeader.substring("NTLM ".length()).trim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public boolean exitAfterHandlingRedirect(//

} else {
// We must allow auth handling again.
future.getInAuth().set(false);
future.getInProxyAuth().set(false);
future.setInAuth(false);
future.setInProxyAuth(false);

String originalMethod = request.getMethod();
boolean switchToGet = !originalMethod.equals(GET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public boolean exitAfterHandling401(//
return false;
}

if (future.getInAuth().getAndSet(true)) {
if (future.getAndSetInAuth(true)) {
LOGGER.info("Can't handle 401 as auth was already performed");
return false;
}
Expand Down Expand Up @@ -195,7 +195,7 @@ private void ntlmChallenge(String authenticateHeader,//
// FIXME we might want to filter current NTLM and add (leave other
// Authorization headers untouched)
requestHeaders.set(AUTHORIZATION, "NTLM " + challengeHeader);
future.getInAuth().set(false);
future.setInAuth(false);

} else {
String serverChallenge = authenticateHeader.substring("NTLM ".length()).trim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
requestFactory.addAuthorizationHeader(headers, perConnectionAuthorizationHeader(request, proxy, realm));
requestFactory.setProxyAuthorizationHeader(headers, perConnectionProxyAuthorizationHeader(request, proxyRealm));

future.getInAuth().set(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM);
future.getInProxyAuth().set(proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM);
future.setInAuth(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM);
future.setInProxyAuth(proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM);

// Do not throw an exception when we need an extra connection for a redirect
// FIXME why? This violate the max connection per host handling, right?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void operationComplete(Channel channel, Throwable cause) {
* We need to make sure we aren't in the middle of an authorization process before publishing events as we will re-publish again the same event after the authorization,
* causing unpredictable behavior.
*/
boolean startPublishing = !future.getInAuth().get() && !future.getInProxyAuth().get();
boolean startPublishing = !future.getInAuth() && !future.getInProxyAuth();
if (startPublishing) {

if (notifyHeaders) {
Expand Down