From 9cc31e5f83037bdc2c865d2cd88d6dcd99d52d40 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 31 Oct 2025 11:31:55 -0700 Subject: [PATCH 1/4] xds: Avoid default bootstrap when global override in XdsNameResolver This fixes a regression with an internal API from 27d150890 where overridding the global bootstrap didn't impact parsing the default bootstrap. So if no global bootstrap was available XdsNameResolver would fail to start even though an override was in place in SharedXdsClientPoolProvider. Instead of dealing with the override in SharedXdsClientPoolProvider, do it in GrpcBootstrapperImpl so XdsNameResolver is ignorant of the source of the default bootstrap. We want all of this to go away in favor of XDS_CLIENT_SUPPLIER injection, but there needs to be some overlap for migration. cl/826085025 --- .../io/grpc/xds/GrpcBootstrapperImpl.java | 12 ++++++++++- .../InternalSharedXdsClientPoolProvider.java | 2 +- .../grpc/xds/SharedXdsClientPoolProvider.java | 21 +++++++------------ 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/GrpcBootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/GrpcBootstrapperImpl.java index fdcf3a972b5..494e95a58f6 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcBootstrapperImpl.java +++ b/xds/src/main/java/io/grpc/xds/GrpcBootstrapperImpl.java @@ -100,12 +100,22 @@ protected Object getImplSpecificConfig(Map serverConfig, String serve return getChannelCredentials(serverConfig, serverUri); } + @GuardedBy("GrpcBootstrapperImpl.class") + private static Map defaultBootstrapOverride; @GuardedBy("GrpcBootstrapperImpl.class") private static BootstrapInfo defaultBootstrap; + static synchronized void setDefaultBootstrapOverride(Map rawBootstrap) { + defaultBootstrapOverride = rawBootstrap; + } + static synchronized BootstrapInfo defaultBootstrap() throws XdsInitializationException { if (defaultBootstrap == null) { - defaultBootstrap = new GrpcBootstrapperImpl().bootstrap(); + if (defaultBootstrapOverride == null) { + defaultBootstrap = new GrpcBootstrapperImpl().bootstrap(); + } else { + defaultBootstrap = new GrpcBootstrapperImpl().bootstrap(defaultBootstrapOverride); + } } return defaultBootstrap; } diff --git a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java index 5eb36a498cf..cc5ff128274 100644 --- a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java @@ -41,7 +41,7 @@ private InternalSharedXdsClientPoolProvider() {} */ @Deprecated public static void setDefaultProviderBootstrapOverride(Map bootstrap) { - SharedXdsClientPoolProvider.getDefaultProvider().setBootstrapOverride(bootstrap); + GrpcBootstrapperImpl.setDefaultBootstrapOverride(bootstrap); } /** diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 29b6870ab97..4cafbd1a762 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -55,29 +54,24 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER = new ExponentialBackoffPolicy.Provider(); + @Nullable private final Bootstrapper bootstrapper; private final Object lock = new Object(); - private final AtomicReference> bootstrapOverride = new AtomicReference<>(); private final Map> targetToXdsClientMap = new ConcurrentHashMap<>(); SharedXdsClientPoolProvider() { - this(new GrpcBootstrapperImpl()); + this(null); } @VisibleForTesting - SharedXdsClientPoolProvider(Bootstrapper bootstrapper) { - this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); + SharedXdsClientPoolProvider(@Nullable Bootstrapper bootstrapper) { + this.bootstrapper = bootstrapper; } static SharedXdsClientPoolProvider getDefaultProvider() { return SharedXdsClientPoolProviderHolder.instance; } - @Deprecated - public void setBootstrapOverride(Map bootstrap) { - bootstrapOverride.set(bootstrap); - } - @Override @Nullable public ObjectPool get(String target) { @@ -89,11 +83,10 @@ public ObjectPool getOrCreate( String target, MetricRecorder metricRecorder, CallCredentials transportCallCredentials) throws XdsInitializationException { BootstrapInfo bootstrapInfo; - Map rawBootstrap = bootstrapOverride.get(); - if (rawBootstrap != null) { - bootstrapInfo = bootstrapper.bootstrap(rawBootstrap); - } else { + if (bootstrapper != null) { bootstrapInfo = bootstrapper.bootstrap(); + } else { + bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap(); } return getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials); } From 21696cd3dffc3b85b5fa3e9ac4464fa24d78de4f Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 31 Oct 2025 15:24:57 -0700 Subject: [PATCH 2/4] xds: Detect negative ref count for xds client If the refcount goes negative, then the next getObject() will return null. This was noticed during code inspection when investigating a NullPointerException in b/454396128, although it is unclear if this is actually happening. --- .../main/java/io/grpc/xds/SharedXdsClientPoolProvider.java | 4 ++++ xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 4cafbd1a762..45c379244af 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -202,6 +202,10 @@ public XdsClient returnObject(Object object) { metricReporter = null; targetToXdsClientMap.remove(target); scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); + } else if (refCount < 0) { + assert false; // We want our tests to fail + log.log(Level.SEVERE, "Negative reference count. File a bug", new Exception()); + refCount = 0; } return null; } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 09b4c5d8637..b2b4e2c14cf 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -186,8 +186,8 @@ public void setUp() throws XdsInitializationException { @After public void cleanUp() { - if (xdsClientPool != null) { - xdsClientPool.returnObject(xdsClient); + if (xdsClient != null) { + xdsClient = xdsClientPool.returnObject(xdsClient); } CommonBootstrapperTestUtils.setEnableXdsFallback(originalEnableXdsFallback); } From 795ce0280a6ca9c21eec7bb0c284914fe2dfbecd Mon Sep 17 00:00:00 2001 From: Kannan J Date: Tue, 4 Nov 2025 07:26:04 +0000 Subject: [PATCH 3/4] rls: Add route lookup reason to request whether it is due to a cache miss or stale cache entry (#12442) b/348690462 --- .../java/io/grpc/rls/CachingRlsLbClient.java | 134 +++++++------ .../java/io/grpc/rls/RlsProtoConverters.java | 5 +- .../main/java/io/grpc/rls/RlsProtoData.java | 30 ++- .../java/io/grpc/rls/RlsRequestFactory.java | 12 +- .../io/grpc/rls/CachingRlsLbClientTest.java | 181 +++++++++++------- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 12 +- .../io/grpc/rls/RlsProtoConvertersTest.java | 4 +- .../io/grpc/rls/RlsRequestFactoryTest.java | 26 +-- 8 files changed, 246 insertions(+), 158 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index cc3ac9f516e..2c26d29f14d 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -61,6 +61,7 @@ import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter; import io.grpc.rls.RlsProtoData.RouteLookupConfig; import io.grpc.rls.RlsProtoData.RouteLookupRequest; +import io.grpc.rls.RlsProtoData.RouteLookupRequestKey; import io.grpc.rls.RlsProtoData.RouteLookupResponse; import io.grpc.stub.StreamObserver; import io.grpc.util.ForwardingLoadBalancerHelper; @@ -112,7 +113,7 @@ final class CachingRlsLbClient { private final Future periodicCleaner; // any RPC on the fly will cached in this map @GuardedBy("lock") - private final Map pendingCallCache = new HashMap<>(); + private final Map pendingCallCache = new HashMap<>(); private final ScheduledExecutorService scheduledExecutorService; private final Ticker ticker; @@ -292,18 +293,22 @@ private void periodicClean() { /** Populates async cache entry for new request. */ @GuardedBy("lock") private CachedRouteLookupResponse asyncRlsCall( - RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) { + RouteLookupRequestKey routeLookupRequestKey, @Nullable BackoffPolicy backoffPolicy, + RouteLookupRequest.Reason routeLookupReason) { if (throttler.shouldThrottle()) { - logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup", request); + logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup", + routeLookupRequestKey); // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting // on this result return CachedRouteLookupResponse.backoffEntry(createBackOffEntry( - request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy)); + routeLookupRequestKey, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), + backoffPolicy)); } final SettableFuture response = SettableFuture.create(); - io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request); + io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert( + RouteLookupRequest.create(routeLookupRequestKey.keyMap(), routeLookupReason)); logger.log(ChannelLogLevel.DEBUG, - "[RLS Entry {0}] Starting RouteLookup: {1}", request, routeLookupRequest); + "[RLS Entry {0}] Starting RouteLookup: {1}", routeLookupRequestKey, routeLookupRequest); rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS) .routeLookup( routeLookupRequest, @@ -311,14 +316,14 @@ private CachedRouteLookupResponse asyncRlsCall( @Override public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) { logger.log(ChannelLogLevel.DEBUG, - "[RLS Entry {0}] RouteLookup succeeded: {1}", request, value); + "[RLS Entry {0}] RouteLookup succeeded: {1}", routeLookupRequestKey, value); response.set(RESPONSE_CONVERTER.reverse().convert(value)); } @Override public void onError(Throwable t) { logger.log(ChannelLogLevel.DEBUG, - "[RLS Entry {0}] RouteLookup failed: {1}", request, t); + "[RLS Entry {0}] RouteLookup failed: {1}", routeLookupRequestKey, t); response.setException(t); throttler.registerBackendResponse(true); } @@ -329,7 +334,7 @@ public void onCompleted() { } }); return CachedRouteLookupResponse.pendingResponse( - createPendingEntry(request, response, backoffPolicy)); + createPendingEntry(routeLookupRequestKey, response, backoffPolicy)); } /** @@ -338,16 +343,17 @@ public void onCompleted() { * changed after the return. */ @CheckReturnValue - final CachedRouteLookupResponse get(final RouteLookupRequest request) { + final CachedRouteLookupResponse get(final RouteLookupRequestKey routeLookupRequestKey) { synchronized (lock) { final CacheEntry cacheEntry; - cacheEntry = linkedHashLruCache.read(request); + cacheEntry = linkedHashLruCache.read(routeLookupRequestKey); if (cacheEntry == null) { - PendingCacheEntry pendingEntry = pendingCallCache.get(request); + PendingCacheEntry pendingEntry = pendingCallCache.get(routeLookupRequestKey); if (pendingEntry != null) { return CachedRouteLookupResponse.pendingResponse(pendingEntry); } - return asyncRlsCall(request, /* backoffPolicy= */ null); + return asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null, + RouteLookupRequest.Reason.REASON_MISS); } if (cacheEntry instanceof DataCacheEntry) { @@ -383,13 +389,14 @@ void requestConnection() { @GuardedBy("lock") private PendingCacheEntry createPendingEntry( - RouteLookupRequest request, + RouteLookupRequestKey routeLookupRequestKey, ListenableFuture pendingCall, @Nullable BackoffPolicy backoffPolicy) { - PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy); + PendingCacheEntry entry = new PendingCacheEntry(routeLookupRequestKey, pendingCall, + backoffPolicy); // Add the entry to the map before adding the Listener, because the listener removes the // entry from the map - pendingCallCache.put(request, entry); + pendingCallCache.put(routeLookupRequestKey, entry); // Beware that the listener can run immediately on the current thread pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor()); return entry; @@ -397,17 +404,18 @@ private PendingCacheEntry createPendingEntry( private void pendingRpcComplete(PendingCacheEntry entry) { synchronized (lock) { - boolean clientClosed = pendingCallCache.remove(entry.request) == null; + boolean clientClosed = pendingCallCache.remove(entry.routeLookupRequestKey) == null; if (clientClosed) { return; } try { - createDataEntry(entry.request, Futures.getDone(entry.pendingCall)); + createDataEntry(entry.routeLookupRequestKey, Futures.getDone(entry.pendingCall)); // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to // reattempt picks when the child LB is done connecting } catch (Exception e) { - createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy); + createBackOffEntry(entry.routeLookupRequestKey, Status.fromThrowable(e), + entry.backoffPolicy); // Cache updated. updateBalancingState() to reattempt picks helper.triggerPendingRpcProcessing(); } @@ -416,21 +424,21 @@ private void pendingRpcComplete(PendingCacheEntry entry) { @GuardedBy("lock") private DataCacheEntry createDataEntry( - RouteLookupRequest request, RouteLookupResponse routeLookupResponse) { + RouteLookupRequestKey routeLookupRequestKey, RouteLookupResponse routeLookupResponse) { logger.log( ChannelLogLevel.DEBUG, "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}", - request, routeLookupResponse); - DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse); + routeLookupRequestKey, routeLookupResponse); + DataCacheEntry entry = new DataCacheEntry(routeLookupRequestKey, routeLookupResponse); // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until // this cache update because the lock is held - linkedHashLruCache.cacheAndClean(request, entry); + linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry); return entry; } @GuardedBy("lock") - private BackoffCacheEntry createBackOffEntry( - RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) { + private BackoffCacheEntry createBackOffEntry(RouteLookupRequestKey routeLookupRequestKey, + Status status, @Nullable BackoffPolicy backoffPolicy) { if (backoffPolicy == null) { backoffPolicy = backoffProvider.get(); } @@ -438,12 +446,12 @@ private BackoffCacheEntry createBackOffEntry( logger.log( ChannelLogLevel.DEBUG, "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}", - request, status, delayNanos); - BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy); + routeLookupRequestKey, status, delayNanos); + BackoffCacheEntry entry = new BackoffCacheEntry(routeLookupRequestKey, status, backoffPolicy); // Lock is held, so the task can't execute before the assignment entry.scheduledFuture = scheduledExecutorService.schedule( () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS); - linkedHashLruCache.cacheAndClean(request, entry); + linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry); return entry; } @@ -455,9 +463,10 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) { return; } logger.log(ChannelLogLevel.DEBUG, - "[RLS Entry {0}] Calling RLS for transition to pending", entry.request); - linkedHashLruCache.invalidate(entry.request); - asyncRlsCall(entry.request, entry.backoffPolicy); + "[RLS Entry {0}] Calling RLS for transition to pending", entry.routeLookupRequestKey); + linkedHashLruCache.invalidate(entry.routeLookupRequestKey); + asyncRlsCall(entry.routeLookupRequestKey, entry.backoffPolicy, + RouteLookupRequest.Reason.REASON_MISS); } } @@ -590,15 +599,15 @@ public String toString() { /** A pending cache entry when the async RouteLookup RPC is still on the fly. */ static final class PendingCacheEntry { private final ListenableFuture pendingCall; - private final RouteLookupRequest request; + private final RouteLookupRequestKey routeLookupRequestKey; @Nullable private final BackoffPolicy backoffPolicy; PendingCacheEntry( - RouteLookupRequest request, + RouteLookupRequestKey routeLookupRequestKey, ListenableFuture pendingCall, @Nullable BackoffPolicy backoffPolicy) { - this.request = checkNotNull(request, "request"); + this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request"); this.pendingCall = checkNotNull(pendingCall, "pendingCall"); this.backoffPolicy = backoffPolicy; } @@ -606,7 +615,7 @@ static final class PendingCacheEntry { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("request", request) + .add("routeLookupRequestKey", routeLookupRequestKey) .toString(); } } @@ -614,10 +623,10 @@ public String toString() { /** Common cache entry data for {@link RlsAsyncLruCache}. */ abstract static class CacheEntry { - protected final RouteLookupRequest request; + protected final RouteLookupRequestKey routeLookupRequestKey; - CacheEntry(RouteLookupRequest request) { - this.request = checkNotNull(request, "request"); + CacheEntry(RouteLookupRequestKey routeLookupRequestKey) { + this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request"); } abstract int getSizeBytes(); @@ -640,8 +649,9 @@ final class DataCacheEntry extends CacheEntry { private final List childPolicyWrappers; // GuardedBy CachingRlsLbClient.lock - DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) { - super(request); + DataCacheEntry(RouteLookupRequestKey routeLookupRequestKey, + final RouteLookupResponse response) { + super(routeLookupRequestKey); this.response = checkNotNull(response, "response"); checkState(!response.targets().isEmpty(), "No targets returned by RLS"); childPolicyWrappers = @@ -669,13 +679,14 @@ final class DataCacheEntry extends CacheEntry { */ void maybeRefresh() { synchronized (lock) { // Lock is already held, but ErrorProne can't tell - if (pendingCallCache.containsKey(request)) { + if (pendingCallCache.containsKey(routeLookupRequestKey)) { // pending already requested return; } logger.log(ChannelLogLevel.DEBUG, - "[RLS Entry {0}] Cache entry is stale, refreshing", request); - asyncRlsCall(request, /* backoffPolicy= */ null); + "[RLS Entry {0}] Cache entry is stale, refreshing", routeLookupRequestKey); + asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null, + RouteLookupRequest.Reason.REASON_STALE); } } @@ -745,7 +756,7 @@ void cleanup() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("request", request) + .add("request", routeLookupRequestKey) .add("response", response) .add("expireTime", expireTime) .add("staleTime", staleTime) @@ -764,8 +775,9 @@ private static final class BackoffCacheEntry extends CacheEntry { private final BackoffPolicy backoffPolicy; private Future scheduledFuture; - BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) { - super(request); + BackoffCacheEntry(RouteLookupRequestKey routeLookupRequestKey, Status status, + BackoffPolicy backoffPolicy) { + super(routeLookupRequestKey); this.status = checkNotNull(status, "status"); this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy"); } @@ -792,7 +804,7 @@ void cleanup() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("request", request) + .add("request", routeLookupRequestKey) .add("status", status) .toString(); } @@ -811,7 +823,7 @@ static final class Builder { private Throttler throttler = new HappyThrottler(); private ResolvedAddressFactory resolvedAddressFactory; private Ticker ticker = Ticker.systemTicker(); - private EvictionListener evictionListener; + private EvictionListener evictionListener; private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider(); Builder setHelper(Helper helper) { @@ -845,7 +857,7 @@ Builder setTicker(Ticker ticker) { } Builder setEvictionListener( - @Nullable EvictionListener evictionListener) { + @Nullable EvictionListener evictionListener) { this.evictionListener = evictionListener; return this; } @@ -867,17 +879,17 @@ CachingRlsLbClient build() { * CacheEntry#cleanup()} after original {@link EvictionListener} is finished. */ private static final class AutoCleaningEvictionListener - implements EvictionListener { + implements EvictionListener { - private final EvictionListener delegate; + private final EvictionListener delegate; AutoCleaningEvictionListener( - @Nullable EvictionListener delegate) { + @Nullable EvictionListener delegate) { this.delegate = delegate; } @Override - public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) { + public void onEviction(RouteLookupRequestKey key, CacheEntry value, EvictionType cause) { if (delegate != null) { delegate.onEviction(key, value, cause); } @@ -902,29 +914,29 @@ public void registerBackendResponse(boolean throttled) { /** Implementation of {@link LinkedHashLruCache} for RLS. */ private static final class RlsAsyncLruCache - extends LinkedHashLruCache { + extends LinkedHashLruCache { private final RlsLbHelper helper; RlsAsyncLruCache(long maxEstimatedSizeBytes, - @Nullable EvictionListener evictionListener, + @Nullable EvictionListener evictionListener, Ticker ticker, RlsLbHelper helper) { super(maxEstimatedSizeBytes, evictionListener, ticker); this.helper = checkNotNull(helper, "helper"); } @Override - protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) { + protected boolean isExpired(RouteLookupRequestKey key, CacheEntry value, long nowNanos) { return value.isExpired(nowNanos); } @Override - protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) { + protected int estimateSizeOf(RouteLookupRequestKey key, CacheEntry value) { return value.getSizeBytes(); } @Override protected boolean shouldInvalidateEldestEntry( - RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) { + RouteLookupRequestKey eldestKey, CacheEntry eldestValue, long now) { if (!eldestValue.isOldEnoughToBeEvicted(now)) { return false; } @@ -933,7 +945,7 @@ protected boolean shouldInvalidateEldestEntry( return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes(); } - public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) { + public CacheEntry cacheAndClean(RouteLookupRequestKey key, CacheEntry value) { CacheEntry newEntry = cache(key, value); // force cleanup if new entry pushed cache over max size (in bytes) @@ -989,9 +1001,9 @@ final class RlsPicker extends SubchannelPicker { public PickResult pickSubchannel(PickSubchannelArgs args) { String serviceName = args.getMethodDescriptor().getServiceName(); String methodName = args.getMethodDescriptor().getBareMethodName(); - RouteLookupRequest request = + RlsProtoData.RouteLookupRequestKey lookupRequestKey = requestFactory.create(serviceName, methodName, args.getHeaders()); - final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request); + final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(lookupRequestKey); if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) { Metadata headers = args.getHeaders(); diff --git a/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java b/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java index aa5147449c4..70f9fb4d891 100644 --- a/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java +++ b/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java @@ -64,7 +64,9 @@ static final class RouteLookupRequestConverter @Override protected RlsProtoData.RouteLookupRequest doForward(RouteLookupRequest routeLookupRequest) { return RlsProtoData.RouteLookupRequest.create( - ImmutableMap.copyOf(routeLookupRequest.getKeyMapMap())); + ImmutableMap.copyOf(routeLookupRequest.getKeyMapMap()), + RlsProtoData.RouteLookupRequest.Reason.valueOf(routeLookupRequest.getReason().name()) + ); } @Override @@ -72,6 +74,7 @@ protected RouteLookupRequest doBackward(RlsProtoData.RouteLookupRequest routeLoo return RouteLookupRequest.newBuilder() .setTargetType("grpc") + .setReason(RouteLookupRequest.Reason.valueOf(routeLookupRequest.reason().name())) .putAllKeyMap(routeLookupRequest.keyMap()) .build(); } diff --git a/rls/src/main/java/io/grpc/rls/RlsProtoData.java b/rls/src/main/java/io/grpc/rls/RlsProtoData.java index 49f32c6b6e3..39c404870f9 100644 --- a/rls/src/main/java/io/grpc/rls/RlsProtoData.java +++ b/rls/src/main/java/io/grpc/rls/RlsProtoData.java @@ -27,16 +27,42 @@ final class RlsProtoData { private RlsProtoData() {} + /** A key object for the Rls route lookup data cache. */ + @AutoValue + @Immutable + abstract static class RouteLookupRequestKey { + + /** Returns a map of key values extracted via key builders for the gRPC or HTTP request. */ + abstract ImmutableMap keyMap(); + + static RouteLookupRequestKey create(ImmutableMap keyMap) { + return new AutoValue_RlsProtoData_RouteLookupRequestKey(keyMap); + } + } + /** A request object sent to route lookup service. */ @AutoValue @Immutable abstract static class RouteLookupRequest { + /** Names should match those in {@link io.grpc.lookup.v1.RouteLookupRequest.Reason}. */ + enum Reason { + /** Unused. */ + REASON_UNKNOWN, + /** No data available in local cache. */ + REASON_MISS, + /** Data in local cache is stale. */ + REASON_STALE; + } + + /** Reason for making this request. */ + abstract Reason reason(); + /** Returns a map of key values extracted via key builders for the gRPC or HTTP request. */ abstract ImmutableMap keyMap(); - static RouteLookupRequest create(ImmutableMap keyMap) { - return new AutoValue_RlsProtoData_RouteLookupRequest(keyMap); + static RouteLookupRequest create(ImmutableMap keyMap, Reason reason) { + return new AutoValue_RlsProtoData_RouteLookupRequest(reason, keyMap); } } diff --git a/rls/src/main/java/io/grpc/rls/RlsRequestFactory.java b/rls/src/main/java/io/grpc/rls/RlsRequestFactory.java index e26e49979e1..1fed78f4df3 100644 --- a/rls/src/main/java/io/grpc/rls/RlsRequestFactory.java +++ b/rls/src/main/java/io/grpc/rls/RlsRequestFactory.java @@ -27,13 +27,13 @@ import io.grpc.rls.RlsProtoData.GrpcKeyBuilder.Name; import io.grpc.rls.RlsProtoData.NameMatcher; import io.grpc.rls.RlsProtoData.RouteLookupConfig; -import io.grpc.rls.RlsProtoData.RouteLookupRequest; +import io.grpc.rls.RlsProtoData.RouteLookupRequestKey; import java.util.HashMap; import java.util.List; import java.util.Map; /** - * A RlsRequestFactory creates {@link RouteLookupRequest} using key builder map from {@link + * A RlsRequestFactory creates {@link RouteLookupRequestKey} using key builder map from {@link * RouteLookupConfig}. */ final class RlsRequestFactory { @@ -61,9 +61,9 @@ private static Map createKeyBuilderTable( return table; } - /** Creates a {@link RouteLookupRequest} for given request's metadata. */ + /** Creates a {@link RouteLookupRequestKey} for the given request lookup metadata. */ @CheckReturnValue - RouteLookupRequest create(String service, String method, Metadata metadata) { + RouteLookupRequestKey create(String service, String method, Metadata metadata) { checkNotNull(service, "service"); checkNotNull(method, "method"); String path = "/" + service + "/" + method; @@ -73,7 +73,7 @@ RouteLookupRequest create(String service, String method, Metadata metadata) { grpcKeyBuilder = keyBuilderTable.get("/" + service + "/*"); } if (grpcKeyBuilder == null) { - return RouteLookupRequest.create(ImmutableMap.of()); + return RouteLookupRequestKey.create(ImmutableMap.of()); } ImmutableMap.Builder rlsRequestHeaders = createRequestHeaders(metadata, grpcKeyBuilder.headers()); @@ -89,7 +89,7 @@ RouteLookupRequest create(String service, String method, Metadata metadata) { rlsRequestHeaders.put(extraKeys.method(), method); } rlsRequestHeaders.putAll(constantKeys); - return RouteLookupRequest.create(rlsRequestHeaders.buildOrThrow()); + return RouteLookupRequestKey.create(rlsRequestHeaders.buildOrThrow()); } private ImmutableMap.Builder createRequestHeaders( diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 4f086abc4a2..93c7d0f00ff 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -128,7 +128,7 @@ public class CachingRlsLbClientTest { public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); @Mock - private EvictionListener evictionListener; + private EvictionListener evictionListener; @Mock private SocketAddress socketAddress; @Mock @@ -200,14 +200,14 @@ public void tearDown() throws Exception { } private CachedRouteLookupResponse getInSyncContext( - final RouteLookupRequest request) + final RlsProtoData.RouteLookupRequestKey routeLookupRequestKey) throws ExecutionException, InterruptedException, TimeoutException { final SettableFuture responseSettableFuture = SettableFuture.create(); syncContext.execute(new Runnable() { @Override public void run() { - responseSettableFuture.set(rlsLbClient.get(request)); + responseSettableFuture.set(rlsLbClient.get(routeLookupRequestKey)); } }); return responseSettableFuture.get(5, TimeUnit.SECONDS); @@ -217,48 +217,53 @@ public void run() { public void get_noError_lifeCycle() throws Exception { setUpRlsLbClient(); InOrder inOrder = inOrder(evictionListener); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create(ImmutableList.of("target"), "header"))); // initial request - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); // server response fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); // cache hit for staled entry fakeClock.forwardTime(ROUTE_LOOKUP_CONFIG.staleAgeInNanos(), TimeUnit.NANOSECONDS); - resp = getInSyncContext(routeLookupRequest); + rlsServerImpl.routeLookupReason = null; + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); // async refresh finishes fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); inOrder .verify(evictionListener) - .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED)); + .onEviction(eq(routeLookupRequestKey), any(CacheEntry.class), eq(EvictionType.REPLACED)); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); + assertThat(rlsServerImpl.routeLookupReason).isEqualTo( + io.grpc.lookup.v1.RouteLookupRequest.Reason.REASON_STALE); assertThat(resp.hasData()).isTrue(); // existing cache expired fakeClock.forwardTime(ROUTE_LOOKUP_CONFIG.maxAgeInNanos(), TimeUnit.NANOSECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); inOrder .verify(evictionListener) - .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPIRED)); + .onEviction(eq(routeLookupRequestKey), any(CacheEntry.class), eq(EvictionType.EXPIRED)); inOrder.verifyNoMoreInteractions(); } @@ -287,22 +292,27 @@ public void rls_withCustomRlsChannelServiceConfig() throws Exception { .setThrottler(fakeThrottler) .setTicker(fakeClock.getTicker()) .build(); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create(ImmutableList.of("target"), "header"))); + rlsServerImpl.routeLookupReason = null; // initial request - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); // server response fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); + assertThat(rlsServerImpl.routeLookupReason).isEqualTo( + io.grpc.lookup.v1.RouteLookupRequest.Reason.REASON_MISS); assertThat(rlsChannelOverriddenAuthority).isEqualTo("bigtable.googleapis.com:443"); assertThat(rlsChannelServiceConfig).isEqualTo(routeLookupChannelServiceConfig); @@ -311,26 +321,28 @@ public void rls_withCustomRlsChannelServiceConfig() throws Exception { @Test public void get_throttledAndRecover() throws Exception { setUpRlsLbClient(); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create(ImmutableList.of("target"), "header"))); fakeThrottler.nextResult = true; fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS); - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasError()).isTrue(); fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); // initially backed off entry is backed off again verify(evictionListener) - .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT)); + .onEviction(eq(routeLookupRequestKey), any(CacheEntry.class), eq(EvictionType.EXPLICIT)); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasError()).isTrue(); @@ -338,14 +350,17 @@ public void get_throttledAndRecover() throws Exception { fakeThrottler.nextResult = false; fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); + rlsServerImpl.routeLookupReason = null; // server responses fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + assertThat(rlsServerImpl.routeLookupReason).isEqualTo( + io.grpc.lookup.v1.RouteLookupRequest.Reason.REASON_MISS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); } @@ -354,21 +369,24 @@ public void get_throttledAndRecover() throws Exception { public void get_updatesLbState() throws Exception { setUpRlsLbClient(); InOrder inOrder = inOrder(helper); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "service1", + "method-key", "create")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create( ImmutableList.of("primary.cloudbigtable.googleapis.com"), "header-rls-data-value"))); // valid channel - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class); @@ -393,13 +411,13 @@ public void get_updatesLbState() throws Exception { // move backoff further back to only test error behavior fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); // try to get invalid - RouteLookupRequest invalidRouteLookupRequest = - RouteLookupRequest.create(ImmutableMap.of()); - CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest); + RlsProtoData.RouteLookupRequestKey invalidRouteLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create(ImmutableMap.of()); + CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequestKey); assertThat(errorResp.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - errorResp = getInSyncContext(invalidRouteLookupRequest); + errorResp = getInSyncContext(invalidRouteLookupRequestKey); assertThat(errorResp.hasError()).isTrue(); // Channel is still READY because the subchannel for method /service1/create is still READY. @@ -423,21 +441,24 @@ public void get_updatesLbState() throws Exception { @Test public void timeout_not_changing_picked_subchannel() throws Exception { setUpRlsLbClient(); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "service1", + "method-key", "create")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create( ImmutableList.of("primary.cloudbigtable.googleapis.com", "target2", "target3"), "header-rls-data-value"))); // valid channel - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isFalse(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class); @@ -493,21 +514,24 @@ public void get_withAdaptiveThrottler() throws Exception { .setTicker(fakeClock.getTicker()) .build(); InOrder inOrder = inOrder(helper); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "service1", + "method-key", "create")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create( ImmutableList.of("primary.cloudbigtable.googleapis.com"), "header-rls-data-value"))); // valid channel - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class); @@ -524,13 +548,13 @@ public void get_withAdaptiveThrottler() throws Exception { // move backoff further back to only test error behavior fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); // try to get invalid - RouteLookupRequest invalidRouteLookupRequest = - RouteLookupRequest.create(ImmutableMap.of()); - CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest); + RlsProtoData.RouteLookupRequestKey invalidRouteLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create(ImmutableMap.of()); + CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequestKey); assertThat(errorResp.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - errorResp = getInSyncContext(invalidRouteLookupRequest); + errorResp = getInSyncContext(invalidRouteLookupRequestKey); assertThat(errorResp.hasError()).isTrue(); // Channel is still READY because the subchannel for method /service1/create is still READY. @@ -560,22 +584,26 @@ private PickSubchannelArgsImpl getInvalidArgs(Metadata headers) { @Test public void get_childPolicyWrapper_reusedForSameTarget() throws Exception { setUpRlsLbClient(); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); - RouteLookupRequest routeLookupRequest2 = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "baz")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey2 = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "baz")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create(ImmutableList.of("target"), "header"), - routeLookupRequest2, + routeLookupRequestKey2, RouteLookupResponse.create(ImmutableList.of("target"), "header2"))); - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); assertThat(resp.getHeaderData()).isEqualTo("header"); @@ -585,11 +613,11 @@ public void get_childPolicyWrapper_reusedForSameTarget() throws Exception { assertThat(childPolicyWrapper.getPicker()).isNotInstanceOf(RlsPicker.class); // request2 has same target, it should reuse childPolicyWrapper - CachedRouteLookupResponse resp2 = getInSyncContext(routeLookupRequest2); + CachedRouteLookupResponse resp2 = getInSyncContext(routeLookupRequestKey2); assertThat(resp2.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp2 = getInSyncContext(routeLookupRequest2); + resp2 = getInSyncContext(routeLookupRequestKey2); assertThat(resp2.hasData()).isTrue(); assertThat(resp2.getHeaderData()).isEqualTo("header2"); assertThat(resp2.getChildPolicyWrapper()).isEqualTo(resp.getChildPolicyWrapper()); @@ -598,20 +626,22 @@ public void get_childPolicyWrapper_reusedForSameTarget() throws Exception { @Test public void get_childPolicyWrapper_multiTarget() throws Exception { setUpRlsLbClient(); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( - "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar")); rlsServerImpl.setLookupTable( ImmutableMap.of( - routeLookupRequest, + routeLookupRequestKey, RouteLookupResponse.create( ImmutableList.of("target1", "target2", "target3"), "header"))); - CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.isPending()).isTrue(); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); - resp = getInSyncContext(routeLookupRequest); + resp = getInSyncContext(routeLookupRequestKey); assertThat(resp.hasData()).isTrue(); List policyWrappers = new ArrayList<>(); @@ -680,14 +710,15 @@ public void metricGauges() throws ExecutionException, InterruptedException, Time .recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")), eq(0L), any(), any()); - RouteLookupRequest routeLookupRequest = RouteLookupRequest.create( - ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key", - "bar")); - rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest, + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key", + "bar")); + rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequestKey, RouteLookupResponse.create(ImmutableList.of("target"), "header"))); // Make a request that will populate the cache with an entry - getInSyncContext(routeLookupRequest); + getInSyncContext(routeLookupRequestKey); fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); // Gauge values should reflect the new cache entry. @@ -857,7 +888,9 @@ private static final class StaticFixedDelayRlsServerImpl private final long responseDelayNano; private final ScheduledExecutorService scheduledExecutorService; - private Map lookupTable = ImmutableMap.of(); + private Map lookupTable = + ImmutableMap.of(); + io.grpc.lookup.v1.RouteLookupRequest.Reason routeLookupReason; public StaticFixedDelayRlsServerImpl( long responseDelayNano, ScheduledExecutorService scheduledExecutorService) { @@ -867,7 +900,8 @@ public StaticFixedDelayRlsServerImpl( checkNotNull(scheduledExecutorService, "scheduledExecutorService"); } - private void setLookupTable(Map lookupTable) { + private void setLookupTable(Map lookupTable) { this.lookupTable = checkNotNull(lookupTable, "lookupTable"); } @@ -879,8 +913,11 @@ public void routeLookup(final io.grpc.lookup.v1.RouteLookupRequest request, new Runnable() { @Override public void run() { + routeLookupReason = request.getReason(); RouteLookupResponse response = - lookupTable.get(REQUEST_CONVERTER.convert(request)); + lookupTable.get( + RlsProtoData.RouteLookupRequestKey.create( + REQUEST_CONVERTER.convert(request).keyMap())); if (response == null) { responseObserver.onError(new RuntimeException("not found")); } else { diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 354466f3caf..c180935b153 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -166,15 +166,19 @@ public void setUp() { .build(); fakeRlsServerImpl.setLookupTable( ImmutableMap.of( - RouteLookupRequest.create(ImmutableMap.of( + RouteLookupRequest.create( + ImmutableMap.of( "server", "fake-bigtable.googleapis.com", "service-key", "com.google", - "method-key", "Search")), + "method-key", "Search"), + RouteLookupRequest.Reason.REASON_MISS), RouteLookupResponse.create(ImmutableList.of("wilderness"), "where are you?"), - RouteLookupRequest.create(ImmutableMap.of( + RouteLookupRequest.create( + ImmutableMap.of( "server", "fake-bigtable.googleapis.com", "service-key", "com.google", - "method-key", "Rescue")), + "method-key", "Rescue"), + RouteLookupRequest.Reason.REASON_MISS), RouteLookupResponse.create(ImmutableList.of("civilization"), "you are safe"))); rlsLb = (RlsLoadBalancer) provider.newLoadBalancer(helper); diff --git a/rls/src/test/java/io/grpc/rls/RlsProtoConvertersTest.java b/rls/src/test/java/io/grpc/rls/RlsProtoConvertersTest.java index fc5fdb59f21..ad1ce8c363e 100644 --- a/rls/src/test/java/io/grpc/rls/RlsProtoConvertersTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsProtoConvertersTest.java @@ -61,12 +61,14 @@ public void convert_toRequestObject() { Converter converter = new RouteLookupRequestConverter().reverse(); RlsProtoData.RouteLookupRequest requestObject = - RlsProtoData.RouteLookupRequest.create(ImmutableMap.of("key1", "val1")); + RlsProtoData.RouteLookupRequest.create(ImmutableMap.of("key1", "val1"), + RlsProtoData.RouteLookupRequest.Reason.REASON_MISS); RouteLookupRequest proto = converter.convert(requestObject); assertThat(proto.getTargetType()).isEqualTo("grpc"); assertThat(proto.getKeyMapMap()).containsExactly("key1", "val1"); + assertThat(proto.getReason()).isEqualTo(RouteLookupRequest.Reason.REASON_MISS); } @Test diff --git a/rls/src/test/java/io/grpc/rls/RlsRequestFactoryTest.java b/rls/src/test/java/io/grpc/rls/RlsRequestFactoryTest.java index 6ee2c01af8a..2b900994ed9 100644 --- a/rls/src/test/java/io/grpc/rls/RlsRequestFactoryTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsRequestFactoryTest.java @@ -26,7 +26,6 @@ import io.grpc.rls.RlsProtoData.GrpcKeyBuilder.Name; import io.grpc.rls.RlsProtoData.NameMatcher; import io.grpc.rls.RlsProtoData.RouteLookupConfig; -import io.grpc.rls.RlsProtoData.RouteLookupRequest; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -82,8 +81,9 @@ public void create_pathMatches() { metadata.put(Metadata.Key.of("X-Google-Id", Metadata.ASCII_STRING_MARSHALLER), "123"); metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar"); - RouteLookupRequest request = factory.create("com.google.service1", "Create", metadata); - assertThat(request.keyMap()).containsExactly( + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + factory.create("com.google.service1", "Create", metadata); + assertThat(routeLookupRequestKey.keyMap()).containsExactly( "user", "test", "id", "123", "server-1", "bigtable.googleapis.com", @@ -97,9 +97,10 @@ public void create_pathFallbackMatches() { metadata.put(Metadata.Key.of("Password", Metadata.ASCII_STRING_MARSHALLER), "hunter2"); metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar"); - RouteLookupRequest request = factory.create("com.google.service1" , "Update", metadata); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + factory.create("com.google.service1" , "Update", metadata); - assertThat(request.keyMap()).containsExactly( + assertThat(routeLookupRequestKey.keyMap()).containsExactly( "user", "test", "password", "hunter2", "service-2", "com.google.service1", @@ -113,9 +114,10 @@ public void create_pathFallbackMatches_optionalHeaderMissing() { metadata.put(Metadata.Key.of("X-Google-Id", Metadata.ASCII_STRING_MARSHALLER), "123"); metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar"); - RouteLookupRequest request = factory.create("com.google.service1", "Update", metadata); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + factory.create("com.google.service1", "Update", metadata); - assertThat(request.keyMap()).containsExactly( + assertThat(routeLookupRequestKey.keyMap()).containsExactly( "user", "test", "service-2", "com.google.service1", "const-key-2", "const-value-2"); @@ -128,8 +130,9 @@ public void create_unknownPath() { metadata.put(Metadata.Key.of("X-Google-Id", Metadata.ASCII_STRING_MARSHALLER), "123"); metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar"); - RouteLookupRequest request = factory.create("abc.def.service999", "Update", metadata); - assertThat(request.keyMap()).isEmpty(); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + factory.create("abc.def.service999", "Update", metadata); + assertThat(routeLookupRequestKey.keyMap()).isEmpty(); } @Test @@ -139,9 +142,10 @@ public void create_noMethodInRlsConfig() { metadata.put(Metadata.Key.of("X-Google-Id", Metadata.ASCII_STRING_MARSHALLER), "123"); metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar"); - RouteLookupRequest request = factory.create("com.google.service3", "Update", metadata); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + factory.create("com.google.service3", "Update", metadata); - assertThat(request.keyMap()).containsExactly( + assertThat(routeLookupRequestKey.keyMap()).containsExactly( "user", "test", "const-key-4", "const-value-4"); } } From 5e8af564ec1a5eeabc00578267f7101c57144951 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 3 Nov 2025 13:12:50 -0800 Subject: [PATCH 4/4] core: Fix NPE during address update with Happy Eyeballs Fixes #12168 --- .../internal/PickFirstLeafLoadBalancer.java | 8 +++- .../PickFirstLeafLoadBalancerTest.java | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index ebe329ca591..2689d7d2308 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -137,9 +137,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { final ImmutableList newImmutableAddressGroups = ImmutableList.builder().addAll(cleanServers).build(); - if (rawConnectivityState == READY || rawConnectivityState == CONNECTING) { + if (rawConnectivityState == READY + || (rawConnectivityState == CONNECTING + && (!enableHappyEyeballs || addressIndex.isValid()))) { // If the previous ready (or connecting) subchannel exists in new address list, - // keep this connection and don't create new subchannels + // keep this connection and don't create new subchannels. Happy Eyeballs is excluded when + // connecting, because it allows multiple attempts simultaneously, thus is fine to start at + // the beginning. SocketAddress previousAddress = addressIndex.getCurrentAddress(); addressIndex.updateGroups(newImmutableAddressGroups); if (addressIndex.seekTo(previousAddress)) { diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index 0e902bfdd56..8b09fce2aa2 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -1872,6 +1872,45 @@ public void updateAddresses_identical_transient_failure() { assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); } + @Test + public void updateAddresses_identicalSingleAddress_connecting() { + // Creating first set of endpoints/addresses + List oldServers = Lists.newArrayList(servers.get(0)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // First connection attempt is successful + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS, TimeUnit.MILLISECONDS); + + // verify that picker returns no subchannel + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withNoResult(), picker.pickSubchannel(mockArgs)); + + // Accept same resolved addresses to update + reset(mockHelper); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS, TimeUnit.MILLISECONDS); + + // Verify that no new subchannels were created or started + verify(mockSubchannel2, never()).start(any()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // verify that picker hasn't changed via checking mock helper's interactions + verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verifyNoMoreInteractions(mockHelper); + } + @Test public void twoAddressesSeriallyConnect() { // Starting first connection attempt