Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,19 +447,6 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
return defaultNonSSAResources();
}

/**
* If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect
* events from its own updates of dependent resources and then filter them.
*
* <p>Disable this if you want to react to your own dependent resource updates
*
* @return if special annotation should be used for dependent resource to filter events
* @since 4.5.0
*/
default boolean previousAnnotationForDependentResourcesEventFiltering() {
return true;
}

/**
* For dependent resources, the framework can add an annotation to filter out events resulting
* directly from the framework's operation. There are, however, some resources that do not follow
Expand Down Expand Up @@ -491,18 +478,16 @@ default Set<Class<? extends HasMetadata>> withPreviousAnnotationForDependentReso

/**
* If the event logic should parse the resourceVersion to determine the ordering of dependent
* resource events. This is typically not needed.
* resource events.
*
* <p>Disabled by default as Kubernetes does not support, and discourages, this interpretation of
* resourceVersions. Enable only if your api server event processing seems to lag the operator
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
* <p>Enabled by default as Kubernetes does support this interpretation of resourceVersions.
* Disable only if your api server provides non comparable resource versions..
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,6 @@ public Set<Class<? extends HasMetadata>> defaultNonSSAResources() {
defaultNonSSAResource, ConfigurationService::defaultNonSSAResources);
}

@Override
public boolean previousAnnotationForDependentResourcesEventFiltering() {
return overriddenValueOrDefault(
previousAnnotationForDependentResources,
ConfigurationService::previousAnnotationForDependentResourcesEventFiltering);
}

@Override
public boolean parseResourceVersionsForEventFilteringAndCaching() {
return overriddenValueOrDefault(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,21 @@ class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
private final GroupVersionKind groupVersionKind;
private final InformerConfiguration<R> informerConfig;
private final KubernetesClient kubernetesClient;
private final boolean comparableResourceVersions;

protected DefaultInformerEventSourceConfiguration(
GroupVersionKind groupVersionKind,
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
InformerConfiguration<R> informerConfig,
KubernetesClient kubernetesClient) {
KubernetesClient kubernetesClient,
boolean comparableResourceVersions) {
this.informerConfig = Objects.requireNonNull(informerConfig);
this.groupVersionKind = groupVersionKind;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
this.kubernetesClient = kubernetesClient;
this.comparableResourceVersions = comparableResourceVersions;
}

@Override
Expand Down Expand Up @@ -135,6 +138,11 @@ public Optional<GroupVersionKind> getGroupVersionKind() {
public Optional<KubernetesClient> getKubernetesClient() {
return Optional.ofNullable(kubernetesClient);
}

@Override
public boolean parseResourceVersionsForEventFilteringAndCaching() {
return this.comparableResourceVersions;
}
}

@SuppressWarnings({"unused", "UnusedReturnValue"})
Expand All @@ -148,6 +156,7 @@ class Builder<R extends HasMetadata> {
private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private KubernetesClient kubernetesClient;
private boolean comparableResourceVersions = true;

private Builder(Class<R> resourceClass, Class<? extends HasMetadata> primaryResourceClass) {
this(resourceClass, primaryResourceClass, null);
Expand Down Expand Up @@ -285,6 +294,11 @@ public Builder<R> withFieldSelector(FieldSelector fieldSelector) {
return this;
}

public Builder<R> parseResourceVersionsForEventFilteringAndCaching(boolean parse) {
this.comparableResourceVersions = parse;
return this;
}

public void updateFrom(InformerConfiguration<R> informerConfig) {
if (informerConfig != null) {
final var informerConfigName = informerConfig.getName();
Expand Down Expand Up @@ -324,7 +338,10 @@ public InformerEventSourceConfiguration<R> build() {
HasMetadata.getKind(primaryResourceClass),
false)),
config.build(),
kubernetesClient);
kubernetesClient,
comparableResourceVersions);
}
}

boolean parseResourceVersionsForEventFilteringAndCaching();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
Expand All @@ -48,6 +49,16 @@
*/
public class PrimaryUpdateAndCacheUtils {

private static final RecentOperationCacheFiller<?> DUMMY_CACHE_FILLER =
new RecentOperationCacheFiller<>() {
@Override
public void handleRecentResourceCreate(ResourceID resourceID, Object resource) {}

@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, Object resource, Object previousVersionOfResource) {}
};

public static final int DEFAULT_MAX_RETRY = 10;
public static final int DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS = 10000;
public static final int DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS = 50;
Expand Down Expand Up @@ -143,6 +154,24 @@ public static <P extends HasMetadata> P updateAndCacheResource(
DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS);
}

// TODO: where does this belong
public static <P extends HasMetadata> P updateAndCacheSecondaryResource(
P resourceToUpdate,
Context<?> context,
RecentOperationCacheFiller<P> cacheFiller,
UnaryOperator<P> updateMethod) {
return updateAndCacheResource(
resourceToUpdate,
context,
cacheFiller == null ? (RecentOperationCacheFiller<P>) DUMMY_CACHE_FILLER : cacheFiller,
null,
o -> o,
updateMethod,
0,
DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS,
DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS);
}

/**
* Modifies the primary using the specified modification function, then uses the modified resource
* for the request to update with provided update method. As the {@code resourceVersion} field of
Expand All @@ -156,7 +185,6 @@ public static <P extends HasMetadata> P updateAndCacheResource(
* @param resourceToUpdate original resource to update
* @param context of reconciliation
* @param modificationFunction modifications to make on primary
* @param updateMethod the update method implementation
* @param maxRetry maximum number of retries before giving up
* @param cachePollTimeoutMillis maximum amount of milliseconds to wait for the updated resource
* to appear in cache
Expand All @@ -172,24 +200,45 @@ public static <P extends HasMetadata> P updateAndCacheResource(
int maxRetry,
long cachePollTimeoutMillis,
long cachePollPeriodMillis) {
return updateAndCacheResource(
resourceToUpdate,
context,
context.eventSourceRetriever().getControllerEventSource(),
context.getPrimaryCache(),
modificationFunction,
updateMethod,
maxRetry,
cachePollPeriodMillis,
cachePollTimeoutMillis);
}

private static <P extends HasMetadata> P updateAndCacheResource(
P resourceToUpdate,
Context<?> context,
RecentOperationCacheFiller<P> cacheFiller,
IndexedResourceCache<P> cache,
UnaryOperator<P> modificationFunction,
UnaryOperator<P> updateMethod,
int maxRetry,
long cachePollTimeoutMillis,
long cachePollPeriodMillis) {

ResourceID id = ResourceID.fromResource(resourceToUpdate);
if (log.isDebugEnabled()) {
log.debug("Update and cache: {}", ResourceID.fromResource(resourceToUpdate));
log.debug("Update and cache: {}", id);
}
P modified = null;
int retryIndex = 0;
while (true) {
try {
cacheFiller.startModifying(id);
modified = modificationFunction.apply(resourceToUpdate);
modified
.getMetadata()
.setResourceVersion(resourceToUpdate.getMetadata().getResourceVersion());
var updated = updateMethod.apply(modified);
context
.eventSourceRetriever()
.getControllerEventSource()
.handleRecentResourceUpdate(
ResourceID.fromResource(resourceToUpdate), updated, resourceToUpdate);
cacheFiller.handleRecentResourceUpdate(
ResourceID.fromResource(resourceToUpdate), updated, resourceToUpdate);
return updated;
} catch (KubernetesClientException e) {
log.trace("Exception during patch for resource: {}", resourceToUpdate);
Expand Down Expand Up @@ -219,20 +268,26 @@ public static <P extends HasMetadata> P updateAndCacheResource(
e.getCode());
resourceToUpdate =
pollLocalCache(
context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis);
context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis, cache);
} finally {
cacheFiller.doneModifying(id);
}
}
}

private static <P extends HasMetadata> P pollLocalCache(
Context<P> context, P staleResource, long timeoutMillis, long pollDelayMillis) {
Context<?> context,
P staleResource,
long timeoutMillis,
long pollDelayMillis,
IndexedResourceCache<P> cache) {
try {
var resourceId = ResourceID.fromResource(staleResource);
var startTime = LocalTime.now();
final var timeoutTime = startTime.plus(timeoutMillis, ChronoUnit.MILLIS);
while (timeoutTime.isAfter(LocalTime.now())) {
log.debug("Polling cache for resource: {}", resourceId);
var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow();
var cachedResource = cache.get(resourceId).orElseThrow();
if (!cachedResource
.getMetadata()
.getResourceVersion()
Expand Down Expand Up @@ -451,6 +506,11 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
}
}

public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) {
return compareResourceVersions(
h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion());
}

public static int compareResourceVersions(String v1, String v2) {
var v1Length = v1.length();
if (v1Length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

public interface RecentOperationCacheFiller<R> {

default void startModifying(ResourceID id) {}

default void doneModifying(ResourceID id) {}

void handleRecentResourceCreate(ResourceID resourceID, R resource);

void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousVersionOfResource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ protected AbstractEventSourceHolderDependentResource(Class<R> resourceType, Stri
* that multiple event sources would be created and only one started and registered. Note that
* this method does not start the event source, so no blocking IO is involved.
*/
@Override
public synchronized Optional<T> eventSource(EventSourceContext<P> context) {
// some sub-classes (e.g. KubernetesDependentResource) can have their event source created
// before this method is called in the managed case, so only create the event source if it
Expand Down Expand Up @@ -123,13 +124,15 @@ public Optional<T> eventSource() {
return Optional.ofNullable(eventSource);
}

@Override
protected void onCreated(P primary, R created, Context<P> context) {
if (isCacheFillerEventSource) {
recentOperationCacheFiller()
.handleRecentResourceCreate(ResourceID.fromResource(primary), created);
}
}

@Override
protected void onUpdated(P primary, R updated, R actual, Context<P> context) {
if (isCacheFillerEventSource) {
recentOperationCacheFiller()
Expand All @@ -138,7 +141,7 @@ protected void onUpdated(P primary, R updated, R actual, Context<P> context) {
}

@SuppressWarnings("unchecked")
private RecentOperationCacheFiller<R> recentOperationCacheFiller() {
protected RecentOperationCacheFiller<R> recentOperationCacheFiller() {
return (RecentOperationCacheFiller<R>) eventSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand Down Expand Up @@ -93,7 +94,12 @@ private void handleExplicitStateDelete(P primary, R secondary, Context<P> contex
@SuppressWarnings({"rawtypes", "unchecked", "unused"})
protected void handleExplicitStateCreation(P primary, R created, Context<P> context) {
var resource = dependentResourceWithExplicitState.stateResource(primary, created);
var stateResource = context.getClient().resource(resource).create();
var stateResource =
PrimaryUpdateAndCacheUtils.updateAndCacheSecondaryResource(
resource,
context,
(RecentOperationCacheFiller) externalStateEventSource,
toCreate -> context.getClient().resource(toCreate).create());
if (externalStateEventSource != null) {
((RecentOperationCacheFiller) externalStateEventSource)
.handleRecentResourceCreate(ResourceID.fromResource(primary), stateResource);
Expand Down
Loading