diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 6215c20179..70f04d8f91 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -447,19 +447,6 @@ default Set> defaultNonSSAResource() { return defaultNonSSAResources(); } - /** - * If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect - * events from its own updates of dependent resources and then filter them. - * - *

Disable this if you want to react to your own dependent resource updates - * - * @return if special annotation should be used for dependent resource to filter events - * @since 4.5.0 - */ - default boolean previousAnnotationForDependentResourcesEventFiltering() { - return true; - } - /** * For dependent resources, the framework can add an annotation to filter out events resulting * directly from the framework's operation. There are, however, some resources that do not follow @@ -491,18 +478,16 @@ default Set> withPreviousAnnotationForDependentReso /** * If the event logic should parse the resourceVersion to determine the ordering of dependent - * resource events. This is typically not needed. + * resource events. * - *

Disabled by default as Kubernetes does not support, and discourages, this interpretation of - * resourceVersions. Enable only if your api server event processing seems to lag the operator - * logic, and you want to further minimize the amount of work done / updates issued by the - * operator. + *

Enabled by default as Kubernetes does support this interpretation of resourceVersions. + * Disable only if your api server provides non comparable resource versions.. * * @return if resource version should be parsed (as integer) * @since 4.5.0 */ default boolean parseResourceVersionsForEventFilteringAndCaching() { - return false; + return true; } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 3d29bb6589..353d399988 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -331,13 +331,6 @@ public Set> defaultNonSSAResources() { defaultNonSSAResource, ConfigurationService::defaultNonSSAResources); } - @Override - public boolean previousAnnotationForDependentResourcesEventFiltering() { - return overriddenValueOrDefault( - previousAnnotationForDependentResources, - ConfigurationService::previousAnnotationForDependentResourcesEventFiltering); - } - @Override public boolean parseResourceVersionsForEventFilteringAndCaching() { return overriddenValueOrDefault( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java index bca605a41c..693910c196 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java @@ -96,18 +96,21 @@ class DefaultInformerEventSourceConfiguration private final GroupVersionKind groupVersionKind; private final InformerConfiguration informerConfig; private final KubernetesClient kubernetesClient; + private final boolean comparableResourceVersions; protected DefaultInformerEventSourceConfiguration( GroupVersionKind groupVersionKind, PrimaryToSecondaryMapper primaryToSecondaryMapper, SecondaryToPrimaryMapper secondaryToPrimaryMapper, InformerConfiguration informerConfig, - KubernetesClient kubernetesClient) { + KubernetesClient kubernetesClient, + boolean comparableResourceVersions) { this.informerConfig = Objects.requireNonNull(informerConfig); this.groupVersionKind = groupVersionKind; this.primaryToSecondaryMapper = primaryToSecondaryMapper; this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; this.kubernetesClient = kubernetesClient; + this.comparableResourceVersions = comparableResourceVersions; } @Override @@ -135,6 +138,11 @@ public Optional getGroupVersionKind() { public Optional getKubernetesClient() { return Optional.ofNullable(kubernetesClient); } + + @Override + public boolean parseResourceVersionsForEventFilteringAndCaching() { + return this.comparableResourceVersions; + } } @SuppressWarnings({"unused", "UnusedReturnValue"}) @@ -148,6 +156,7 @@ class Builder { private PrimaryToSecondaryMapper primaryToSecondaryMapper; private SecondaryToPrimaryMapper secondaryToPrimaryMapper; private KubernetesClient kubernetesClient; + private boolean comparableResourceVersions = true; private Builder(Class resourceClass, Class primaryResourceClass) { this(resourceClass, primaryResourceClass, null); @@ -285,6 +294,11 @@ public Builder withFieldSelector(FieldSelector fieldSelector) { return this; } + public Builder parseResourceVersionsForEventFilteringAndCaching(boolean parse) { + this.comparableResourceVersions = parse; + return this; + } + public void updateFrom(InformerConfiguration informerConfig) { if (informerConfig != null) { final var informerConfigName = informerConfig.getName(); @@ -324,7 +338,10 @@ public InformerEventSourceConfiguration build() { HasMetadata.getKind(primaryResourceClass), false)), config.build(), - kubernetesClient); + kubernetesClient, + comparableResourceVersions); } } + + boolean parseResourceVersionsForEventFilteringAndCaching(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index b4b3405ec4..3d46d4ca1b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -31,6 +31,7 @@ import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.processing.event.ResourceID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; @@ -48,6 +49,16 @@ */ public class PrimaryUpdateAndCacheUtils { + private static final RecentOperationCacheFiller DUMMY_CACHE_FILLER = + new RecentOperationCacheFiller<>() { + @Override + public void handleRecentResourceCreate(ResourceID resourceID, Object resource) {} + + @Override + public void handleRecentResourceUpdate( + ResourceID resourceID, Object resource, Object previousVersionOfResource) {} + }; + public static final int DEFAULT_MAX_RETRY = 10; public static final int DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS = 10000; public static final int DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS = 50; @@ -143,6 +154,24 @@ public static

P updateAndCacheResource( DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS); } + // TODO: where does this belong + public static

P updateAndCacheSecondaryResource( + P resourceToUpdate, + Context context, + RecentOperationCacheFiller

cacheFiller, + UnaryOperator

updateMethod) { + return updateAndCacheResource( + resourceToUpdate, + context, + cacheFiller == null ? (RecentOperationCacheFiller

) DUMMY_CACHE_FILLER : cacheFiller, + null, + o -> o, + updateMethod, + 0, + DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS, + DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS); + } + /** * Modifies the primary using the specified modification function, then uses the modified resource * for the request to update with provided update method. As the {@code resourceVersion} field of @@ -156,7 +185,6 @@ public static

P updateAndCacheResource( * @param resourceToUpdate original resource to update * @param context of reconciliation * @param modificationFunction modifications to make on primary - * @param updateMethod the update method implementation * @param maxRetry maximum number of retries before giving up * @param cachePollTimeoutMillis maximum amount of milliseconds to wait for the updated resource * to appear in cache @@ -172,24 +200,45 @@ public static

P updateAndCacheResource( int maxRetry, long cachePollTimeoutMillis, long cachePollPeriodMillis) { + return updateAndCacheResource( + resourceToUpdate, + context, + context.eventSourceRetriever().getControllerEventSource(), + context.getPrimaryCache(), + modificationFunction, + updateMethod, + maxRetry, + cachePollPeriodMillis, + cachePollTimeoutMillis); + } + + private static

P updateAndCacheResource( + P resourceToUpdate, + Context context, + RecentOperationCacheFiller

cacheFiller, + IndexedResourceCache

cache, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod, + int maxRetry, + long cachePollTimeoutMillis, + long cachePollPeriodMillis) { + ResourceID id = ResourceID.fromResource(resourceToUpdate); if (log.isDebugEnabled()) { - log.debug("Update and cache: {}", ResourceID.fromResource(resourceToUpdate)); + log.debug("Update and cache: {}", id); } P modified = null; int retryIndex = 0; while (true) { try { + cacheFiller.startModifying(id); modified = modificationFunction.apply(resourceToUpdate); modified .getMetadata() .setResourceVersion(resourceToUpdate.getMetadata().getResourceVersion()); var updated = updateMethod.apply(modified); - context - .eventSourceRetriever() - .getControllerEventSource() - .handleRecentResourceUpdate( - ResourceID.fromResource(resourceToUpdate), updated, resourceToUpdate); + cacheFiller.handleRecentResourceUpdate( + ResourceID.fromResource(resourceToUpdate), updated, resourceToUpdate); return updated; } catch (KubernetesClientException e) { log.trace("Exception during patch for resource: {}", resourceToUpdate); @@ -219,20 +268,26 @@ public static

P updateAndCacheResource( e.getCode()); resourceToUpdate = pollLocalCache( - context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis); + context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis, cache); + } finally { + cacheFiller.doneModifying(id); } } } private static

P pollLocalCache( - Context

context, P staleResource, long timeoutMillis, long pollDelayMillis) { + Context context, + P staleResource, + long timeoutMillis, + long pollDelayMillis, + IndexedResourceCache

cache) { try { var resourceId = ResourceID.fromResource(staleResource); var startTime = LocalTime.now(); final var timeoutTime = startTime.plus(timeoutMillis, ChronoUnit.MILLIS); while (timeoutTime.isAfter(LocalTime.now())) { log.debug("Polling cache for resource: {}", resourceId); - var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow(); + var cachedResource = cache.get(resourceId).orElseThrow(); if (!cachedResource .getMetadata() .getResourceVersion() @@ -451,6 +506,11 @@ public static

P addFinalizerWithSSA( } } + public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) { + return compareResourceVersions( + h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion()); + } + public static int compareResourceVersions(String v1, String v2) { var v1Length = v1.length(); if (v1Length == 0) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java index 9cf1fce287..c8e26f5648 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java @@ -19,6 +19,10 @@ public interface RecentOperationCacheFiller { + default void startModifying(ResourceID id) {} + + default void doneModifying(ResourceID id) {} + void handleRecentResourceCreate(ResourceID resourceID, R resource); void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousVersionOfResource); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java index 7dff17baba..9fdd372d2f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java @@ -65,6 +65,7 @@ protected AbstractEventSourceHolderDependentResource(Class resourceType, Stri * that multiple event sources would be created and only one started and registered. Note that * this method does not start the event source, so no blocking IO is involved. */ + @Override public synchronized Optional eventSource(EventSourceContext

context) { // some sub-classes (e.g. KubernetesDependentResource) can have their event source created // before this method is called in the managed case, so only create the event source if it @@ -123,6 +124,7 @@ public Optional eventSource() { return Optional.ofNullable(eventSource); } + @Override protected void onCreated(P primary, R created, Context

context) { if (isCacheFillerEventSource) { recentOperationCacheFiller() @@ -130,6 +132,7 @@ protected void onCreated(P primary, R created, Context

context) { } } + @Override protected void onUpdated(P primary, R updated, R actual, Context

context) { if (isCacheFillerEventSource) { recentOperationCacheFiller() @@ -138,7 +141,7 @@ protected void onUpdated(P primary, R updated, R actual, Context

context) { } @SuppressWarnings("unchecked") - private RecentOperationCacheFiller recentOperationCacheFiller() { + protected RecentOperationCacheFiller recentOperationCacheFiller() { return (RecentOperationCacheFiller) eventSource; } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractExternalDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractExternalDependentResource.java index 2c5be82288..74356bb82d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractExternalDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractExternalDependentResource.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -93,7 +94,12 @@ private void handleExplicitStateDelete(P primary, R secondary, Context

contex @SuppressWarnings({"rawtypes", "unchecked", "unused"}) protected void handleExplicitStateCreation(P primary, R created, Context

context) { var resource = dependentResourceWithExplicitState.stateResource(primary, created); - var stateResource = context.getClient().resource(resource).create(); + var stateResource = + PrimaryUpdateAndCacheUtils.updateAndCacheSecondaryResource( + resource, + context, + (RecentOperationCacheFiller) externalStateEventSource, + toCreate -> context.getClient().resource(toCreate).create()); if (externalStateEventSource != null) { ((RecentOperationCacheFiller) externalStateEventSource) .handleRecentResourceCreate(ResourceID.fromResource(primary), stateResource); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index efe25b9a43..8ada2006af 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -31,6 +31,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.Ignore; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ConfiguredDependentResource; import io.javaoperatorsdk.operator.processing.GroupVersionKind; @@ -55,7 +56,6 @@ public abstract class KubernetesDependentResource kubernetesDependentResourceConfig; private volatile Boolean useSSA; - private volatile Boolean usePreviousAnnotationForEventFiltering; public KubernetesDependentResource() {} @@ -72,6 +72,25 @@ public void configureWith(KubernetesDependentResourceConfig config) { this.kubernetesDependentResourceConfig = config; } + @Override + protected R handleCreate(R desired, P primary, Context

context) { + return PrimaryUpdateAndCacheUtils.updateAndCacheSecondaryResource( + desired, + context, + recentOperationCacheFiller(), + toCreate -> KubernetesDependentResource.super.handleCreate(toCreate, primary, context)); + } + + @Override + protected R handleUpdate(R actual, R desired, P primary, Context

context) { + return PrimaryUpdateAndCacheUtils.updateAndCacheSecondaryResource( + desired, + context, + recentOperationCacheFiller(), + toUpdate -> + KubernetesDependentResource.super.handleUpdate(actual, toUpdate, primary, context)); + } + @SuppressWarnings("unused") public R create(R desired, P primary, Context

context) { if (useSSA(context)) { @@ -158,14 +177,6 @@ protected void addMetadata( } else { annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY); } - } else if (usePreviousAnnotation(context)) { // set a new one - eventSource() - .orElseThrow() - .addPreviousAnnotation( - Optional.ofNullable(actualResource) - .map(r -> r.getMetadata().getResourceVersion()) - .orElse(null), - target); } addReferenceHandlingMetadata(target, primary); } @@ -181,22 +192,6 @@ protected boolean useSSA(Context

context) { return useSSA; } - private boolean usePreviousAnnotation(Context

context) { - if (usePreviousAnnotationForEventFiltering == null) { - usePreviousAnnotationForEventFiltering = - context - .getControllerConfiguration() - .getConfigurationService() - .previousAnnotationForDependentResourcesEventFiltering() - && !context - .getControllerConfiguration() - .getConfigurationService() - .withPreviousAnnotationForDependentResourcesBlocklist() - .contains(this.resourceType()); - } - return usePreviousAnnotationForEventFiltering; - } - @Override protected void handleDelete(P primary, R secondary, Context

context) { if (secondary != null) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index b7a6406e20..7fb298642e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -47,7 +47,14 @@ public class ControllerEventSource @SuppressWarnings({"unchecked", "rawtypes"}) public ControllerEventSource(Controller controller) { - super(NAME, controller.getCRClient(), controller.getConfiguration(), false); + super( + NAME, + controller.getCRClient(), + controller.getConfiguration(), + controller + .getConfiguration() + .getConfigurationService() + .parseResourceVersionsForEventFilteringAndCaching()); this.controller = controller; final var config = controller.getConfiguration(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index ec11db25f4..717da9193c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -17,7 +17,6 @@ import java.util.Optional; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -78,21 +77,17 @@ public class InformerEventSource // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; - private final String id = UUID.randomUUID().toString(); public InformerEventSource( InformerEventSourceConfiguration configuration, EventSourceContext

context) { this( configuration, configuration.getKubernetesClient().orElse(context.getClient()), - context - .getControllerConfiguration() - .getConfigurationService() - .parseResourceVersionsForEventFilteringAndCaching()); + configuration.parseResourceVersionsForEventFilteringAndCaching()); } InformerEventSource(InformerEventSourceConfiguration configuration, KubernetesClient client) { - this(configuration, client, false); + this(configuration, client, true); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -135,8 +130,8 @@ public void onAdd(R newResource) { newResource.getMetadata().getResourceVersion()); } primaryToSecondaryIndex.onAddOrUpdate(newResource); - onAddOrUpdate( - Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource)); + super.onAdd(newResource); + onAddOrUpdate(Operation.ADD, newResource, null); } @Override @@ -150,11 +145,8 @@ public void onUpdate(R oldObject, R newObject) { oldObject.getMetadata().getResourceVersion()); } primaryToSecondaryIndex.onAddOrUpdate(newObject); - onAddOrUpdate( - Operation.UPDATE, - newObject, - oldObject, - () -> InformerEventSource.super.onUpdate(oldObject, newObject)); + super.onUpdate(oldObject, newObject); + onAddOrUpdate(Operation.UPDATE, newObject, oldObject); } @Override @@ -180,8 +172,7 @@ public synchronized void start() { manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate); } - private synchronized void onAddOrUpdate( - Operation operation, R newObject, R oldObject, Runnable superOnOp) { + private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) { var resourceID = ResourceID.fromResource(newObject); if (canSkipEvent(newObject, oldObject, resourceID)) { @@ -190,56 +181,20 @@ private synchronized void onAddOrUpdate( + " ID: {}", operation, ResourceID.fromResource(newObject)); - superOnOp.run(); + } else if (eventAcceptedByFilter(operation, newObject, oldObject)) { + log.debug( + "Propagating event for {}, resource with same version not result of a reconciliation." + + " Resource ID: {}", + operation, + resourceID); + propagateEvent(newObject); } else { - superOnOp.run(); - if (eventAcceptedByFilter(operation, newObject, oldObject)) { - log.debug( - "Propagating event for {}, resource with same version not result of a reconciliation." - + " Resource ID: {}", - operation, - resourceID); - propagateEvent(newObject); - } else { - log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); - } + log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); } } private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) { - var res = temporaryResourceCache.getResourceFromCache(resourceID); - if (res.isEmpty()) { - return isEventKnownFromAnnotation(newObject, oldObject); - } - boolean resVersionsEqual = - newObject - .getMetadata() - .getResourceVersion() - .equals(res.get().getMetadata().getResourceVersion()); - log.debug( - "Resource found in temporal cache for id: {} resource versions equal: {}", - resourceID, - resVersionsEqual); - return resVersionsEqual - || temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject); - } - - private boolean isEventKnownFromAnnotation(R newObject, R oldObject) { - String previous = newObject.getMetadata().getAnnotations().get(PREVIOUS_ANNOTATION_KEY); - boolean known = false; - if (previous != null) { - String[] parts = previous.split(","); - if (id.equals(parts[0])) { - if (oldObject == null && parts.length == 1) { - known = true; - } else if (oldObject != null - && parts.length == 2 - && oldObject.getMetadata().getResourceVersion().equals(parts[1])) { - known = true; - } - } - } - return known; + return temporaryResourceCache.canSkipEvent(resourceID, newObject); } private void propagateEvent(R object) { @@ -301,11 +256,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { primaryToSecondaryIndex.onAddOrUpdate(newResource); - temporaryResourceCache.putResource( - newResource, - Optional.ofNullable(oldResource) - .map(r -> r.getMetadata().getResourceVersion()) - .orElse(null)); + temporaryResourceCache.putResource(newResource); } private boolean useSecondaryToPrimaryIndex() { @@ -333,22 +284,6 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) { && (genericFilter == null || genericFilter.accept(resource)); } - /** - * Add an annotation to the resource so that the subsequent will be omitted - * - * @param resourceVersion null if there is no prior version - * @param target mutable resource that will be returned - */ - public R addPreviousAnnotation(String resourceVersion, R target) { - target - .getMetadata() - .getAnnotations() - .put( - PREVIOUS_ANNOTATION_KEY, - id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse("")); - return target; - } - private enum Operation { ADD, UPDATE diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index abd2b6a752..27ba5f5b16 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -221,6 +221,11 @@ public Optional get(ResourceID resourceID) { : r); } + public Optional getLastSyncResourceVersion(Optional namespace) { + return getSource(namespace.orElse(WATCH_ALL_NAMESPACES)) + .map(source -> source.getLastSyncResourceVersion()); + } + @Override public Stream keys() { return sources.values().stream().flatMap(Cache::keys); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 07db980fcd..096c201c12 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -156,6 +156,10 @@ public Optional get(ResourceID resourceID) { return Optional.ofNullable(cache.getByKey(getKey(resourceID))); } + public String getLastSyncResourceVersion() { + return this.informer.lastSyncResourceVersion(); + } + private String getKey(ResourceID resourceID) { return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 2679918b60..b3b7c6eeeb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -34,6 +34,7 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.Informable; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; @@ -70,6 +71,16 @@ protected ManagedInformerEventSource( this.configuration = configuration; } + @Override + public void startModifying(ResourceID id) { + this.temporaryResourceCache.startModifying(id); + } + + @Override + public void doneModifying(ResourceID id) { + this.temporaryResourceCache.doneModifying(id); + } + @Override public void onAdd(R resource) { temporaryResourceCache.onAddOrUpdateEvent(resource); @@ -122,30 +133,38 @@ public synchronized void stop() { @Override public void handleRecentResourceUpdate( ResourceID resourceID, R resource, R previousVersionOfResource) { - temporaryResourceCache.putResource( - resource, previousVersionOfResource.getMetadata().getResourceVersion()); + temporaryResourceCache.putResource(resource); } @Override public void handleRecentResourceCreate(ResourceID resourceID, R resource) { - temporaryResourceCache.putAddedResource(resource); + temporaryResourceCache.putResource(resource); } @Override public Optional get(ResourceID resourceID) { + var res = cache.get(resourceID); Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); - if (resource.isPresent()) { - log.debug("Resource found in temporary cache for Resource ID: {}", resourceID); + if (parseResourceVersions + && resource.isPresent() + && res.filter( + r -> + PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow()) + > 0) + .isEmpty()) { + log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID); return resource; - } else { - log.debug( - "Resource not found in temporary cache reading it from informer cache," - + " for Resource ID: {}", - resourceID); - var res = cache.get(resourceID); - log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID); - return res; } + log.debug( + "Resource not found, or older, in temporary cache. Found in informer cache {}, for" + + " Resource ID: {}", + res.isPresent(), + resourceID); + return res; + } + + public Optional getLastSyncResourceVersion(Optional namespace) { + return cache.getLastSyncResourceVersion(namespace); } @SuppressWarnings("unused") diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 06226ae4ba..276051c636 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -15,16 +15,16 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -46,56 +46,15 @@ */ public class TemporaryResourceCache { - static class ExpirationCache { - private final LinkedHashMap cache; - private final int ttlMs; - - public ExpirationCache(int maxEntries, int ttlMs) { - this.ttlMs = ttlMs; - this.cache = - new LinkedHashMap<>() { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxEntries; - } - }; - } - - public void add(K key) { - clean(); - cache.putIfAbsent(key, System.currentTimeMillis()); - } - - public boolean contains(K key) { - clean(); - return cache.get(key) != null; - } - - void clean() { - if (!cache.isEmpty()) { - long currentTimeMillis = System.currentTimeMillis(); - var iter = cache.entrySet().iterator(); - // the order will already be from oldest to newest, clean a fixed number of entries to - // amortize the cost amongst multiple calls - for (int i = 0; i < 10 && iter.hasNext(); i++) { - var entry = iter.next(); - if (currentTimeMillis - entry.getValue() > ttlMs) { - iter.remove(); - } - } - } - } - } - private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); - // keep up to the last million deletions for up to 10 minutes - private final ExpirationCache tombstones = new ExpirationCache<>(1000000, 1200000); private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; + private Map activelyModifying = new ConcurrentHashMap<>(); + public TemporaryResourceCache( ManagedInformerEventSource managedInformerEventSource, boolean parseResourceVersions) { @@ -103,87 +62,110 @@ public TemporaryResourceCache( this.parseResourceVersions = parseResourceVersions; } + public void startModifying(ResourceID id) { + activelyModifying + .compute( + id, + (ignored, lock) -> { + if (lock != null) { + throw new IllegalStateException(); + } + return new ReentrantLock(); + }) + .lock(); + } + + public void doneModifying(ResourceID id) { + activelyModifying.computeIfPresent( + id, + (ignored, lock) -> { + lock.unlock(); + return null; + }); + } + public synchronized void onDeleteEvent(T resource, boolean unknownState) { - tombstones.add(resource.getMetadata().getUid()); onEvent(resource, unknownState); } - public synchronized void onAddOrUpdateEvent(T resource) { - onEvent(resource, false); + public void onAddOrUpdateEvent(T resource) { + ReentrantLock lock = activelyModifying.get(ResourceID.fromResource(resource)); + if (lock != null) { + lock.lock(); + } + try { + onEvent(resource, false); + } finally { + lock.unlock(); + } } synchronized void onEvent(T resource, boolean unknownState) { cache.computeIfPresent( ResourceID.fromResource(resource), (id, cached) -> - (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); - } - - public synchronized void putAddedResource(T newResource) { - putResource(newResource, null); + (unknownState + || PrimaryUpdateAndCacheUtils.compareResourceVersions(resource, cached) >= 0) + ? null + : cached); } /** * put the item into the cache if the previousResourceVersion matches the current state. If not * the currently cached item is removed. - * - * @param previousResourceVersion null indicates an add */ - public synchronized void putResource(T newResource, String previousResourceVersion) { + public synchronized void putResource(T newResource) { + if (!parseResourceVersions) { + return; + } + var resourceId = ResourceID.fromResource(newResource); - var cachedResource = managedInformerEventSource.get(resourceId).orElse(null); - boolean moveAhead = false; - if (previousResourceVersion == null && cachedResource == null) { - if (tombstones.contains(newResource.getMetadata().getUid())) { - log.debug( - "Won't resurrect uid {} for resource id: {}", - newResource.getMetadata().getUid(), - resourceId); - return; - } - // we can skip further checks as this is a simple add and there's no previous entry to - // consider - moveAhead = true; + if (newResource.getMetadata().getResourceVersion() == null) { + log.warn( + "Resource {}: with no resourceVersion put in temporary cache. This is not the expected" + + " usage pattern, only resources returned from the api server should be put in the" + + " cache.", + resourceId); + return; } - if (moveAhead - || (cachedResource != null - && (cachedResource - .getMetadata() - .getResourceVersion() - .equals(previousResourceVersion)) - || isLaterResourceVersion(resourceId, newResource, cachedResource))) { + // first check against the source in general - this also prevents resurrecting resources when + // we've already seen the deletion event + String latest = + managedInformerEventSource + .getLastSyncResourceVersion(resourceId.getNamespace()) + .orElse(null); + if (latest != null + && PrimaryUpdateAndCacheUtils.compareResourceVersions( + latest, newResource.getMetadata().getResourceVersion()) + > 0) { log.debug( - "Temporarily moving ahead to target version {} for resource id: {}", + "Resource {}: resourceVersion {} is not later than latest {}", + resourceId, newResource.getMetadata().getResourceVersion(), - resourceId); - cache.put(resourceId, newResource); - } else if (cache.remove(resourceId) != null) { - log.debug("Removed an obsolete resource from cache for id: {}", resourceId); + latest); + return; } - } - /** - * @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()} - * is enabled and the resourceVersion of newResource is numerically greater than - * cachedResource, otherwise false - */ - public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) { - try { - if (parseResourceVersions - && Long.parseLong(newResource.getMetadata().getResourceVersion()) - > Long.parseLong(cachedResource.getMetadata().getResourceVersion())) { - return true; - } - } catch (NumberFormatException e) { + var cachedResource = managedInformerEventSource.get(resourceId).orElse(null); + + if (cachedResource == null + || PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) >= 0) { log.debug( - "Could not compare resourceVersions {} and {} for {}", + "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), - cachedResource.getMetadata().getResourceVersion(), resourceId); + cache.put(resourceId, newResource); } - return false; + } + + public boolean canSkipEvent(ResourceID resourceID, T resource) { + return parseResourceVersions + && getResourceFromCache(resourceID) + .filter( + cached -> PrimaryUpdateAndCacheUtils.compareResourceVersions(cached, resource) >= 0) + .isPresent(); } public synchronized Optional getResourceFromCache(ResourceID resourceID) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 208d6aeaaa..9d835311c2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -94,31 +94,18 @@ public synchronized void start() {} } @Test - void skipsEventPropagationIfResourceWithSameVersionInResourceCache() { + void skipsEventPropagation() { when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); + when(temporaryResourceCacheMock.canSkipEvent(any(), any())).thenReturn(true); + informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, never()).handleEvent(any()); } - @Test - void skipsAddEventPropagationViaAnnotation() { - informerEventSource.onAdd(informerEventSource.addPreviousAnnotation(null, testDeployment())); - - verify(eventHandlerMock, never()).handleEvent(any()); - } - - @Test - void skipsUpdateEventPropagationViaAnnotation() { - informerEventSource.onUpdate( - testDeployment(), informerEventSource.addPreviousAnnotation("1", testDeployment())); - - verify(eventHandlerMock, never()).handleEvent(any()); - } - @Test void processEventPropagationWithoutAnnotation() { informerEventSource.onUpdate(testDeployment(), testDeployment()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index e3dc2c82e4..fd77888b14 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -17,9 +17,7 @@ import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +25,6 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -46,30 +43,28 @@ class TemporaryPrimaryResourceCacheTest { @BeforeEach void setup() { informerEventSource = mock(InformerEventSource.class); - temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); + temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); } @Test void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() { var testResource = testResource(); var prevTestResource = testResource(); - prevTestResource.getMetadata().setResourceVersion("0"); + prevTestResource.getMetadata().setResourceVersion("1"); when(informerEventSource.get(any())).thenReturn(Optional.of(prevTestResource)); - temporaryResourceCache.putResource(testResource, "0"); + temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isPresent(); } @Test - void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() { + void updateNotAddsTheResourceIntoCacheIfTheInformerHasLaterVersion() { var testResource = testResource(); - var informerCachedResource = testResource(); - informerCachedResource.getMetadata().setResourceVersion("x"); - when(informerEventSource.get(any())).thenReturn(Optional.of(informerCachedResource)); + when(informerEventSource.getLastSyncResourceVersion(any())).thenReturn(Optional.of("3")); - temporaryResourceCache.putResource(testResource, "0"); + temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isNotPresent(); @@ -80,7 +75,7 @@ void addOperationAddsTheResourceIfInformerCacheStillEmpty() { var testResource = testResource(); when(informerEventSource.get(any())).thenReturn(Optional.empty()); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isPresent(); @@ -91,7 +86,12 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() { var testResource = testResource(); when(informerEventSource.get(any())).thenReturn(Optional.of(testResource())); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource( + new ConfigMapBuilder(testResource) + .editMetadata() + .withResourceVersion("1") + .endMetadata() + .build()); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); assertThat(cached).isNotPresent(); @@ -101,34 +101,25 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() { void removesResourceFromCache() { ConfigMap testResource = propagateTestResourceToCache(); - temporaryResourceCache.onAddOrUpdateEvent(testResource()); - - assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) - .isNotPresent(); - } - - @Test - void resourceVersionParsing() { - this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); - - ConfigMap testResource = propagateTestResourceToCache(); - - // an event with a newer version will not remove temporaryResourceCache.onAddOrUpdateEvent( new ConfigMapBuilder(testResource) .editMetadata() - .withResourceVersion("1") + .withResourceVersion("3") .endMetadata() .build()); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) - .isPresent(); + .isNotPresent(); + } + + @Test + void resourceNoVersionParsing() { + this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); - // anything else will remove - temporaryResourceCache.onAddOrUpdateEvent(testResource()); + this.temporaryResourceCache.putResource(testResource()); - assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) - .isNotPresent(); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource()))) + .isEmpty(); } @Test @@ -143,45 +134,19 @@ void rapidDeletion() { .endMetadata() .build(), false); - temporaryResourceCache.putAddedResource(testResource); + when(informerEventSource.getLastSyncResourceVersion( + Optional.of(testResource.getMetadata().getNamespace()))) + .thenReturn(Optional.of("3")); + temporaryResourceCache.putResource(testResource); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isEmpty(); } - @Test - void expirationCacheMax() { - ExpirationCache cache = new ExpirationCache<>(2, Integer.MAX_VALUE); - - cache.add(1); - cache.add(2); - cache.add(3); - - assertThat(cache.contains(1)).isFalse(); - assertThat(cache.contains(2)).isTrue(); - assertThat(cache.contains(3)).isTrue(); - } - - @Test - void expirationCacheTtl() { - ExpirationCache cache = new ExpirationCache<>(2, 1); - - cache.add(1); - cache.add(2); - - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted( - () -> { - assertThat(cache.contains(1)).isFalse(); - assertThat(cache.contains(2)).isFalse(); - }); - } - private ConfigMap propagateTestResourceToCache() { var testResource = testResource(); when(informerEventSource.get(any())).thenReturn(Optional.empty()); - temporaryResourceCache.putAddedResource(testResource); + temporaryResourceCache.putResource(testResource); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isPresent(); return testResource; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java index d1a03b0c8e..92e1d57bb6 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java @@ -32,6 +32,7 @@ import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -104,13 +105,17 @@ private void createExternalResource( .withData(Map.of(ID_KEY, createdResource.getId())) .build(); configMap.addOwnerReference(resource); - context.getClient().configMaps().resource(configMap).create(); + configMap = context.getClient().configMaps().resource(configMap).create(); var primaryID = ResourceID.fromResource(resource); // Making sure that the created resources are in the cache for the next reconciliation. // This is critical in this case, since on next reconciliation if it would not be in the cache // it would be created again. - configMapEventSource.handleRecentResourceCreate(primaryID, configMap); + PrimaryUpdateAndCacheUtils.updateAndCacheSecondaryResource( + configMap, + context, + configMapEventSource, + toCreate -> context.getClient().configMaps().resource(toCreate).create()); externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource); } @@ -128,6 +133,7 @@ public DeleteControl cleanup( return DeleteControl.defaultDelete(); } + @Override public int getNumberOfExecutions() { return numberOfExecutions.get(); }