From dde1d484ffb1585b63c375d717e50a80412ed98f Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 6 Aug 2020 17:27:57 +0200 Subject: [PATCH 0001/2246] Update control changes --- .../operator/api/UpdateControl.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java index 76adf14559..4c94ee5f5e 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java @@ -2,11 +2,14 @@ import io.fabric8.kubernetes.client.CustomResource; +import java.util.concurrent.TimeUnit; + public class UpdateControl { private final T customResource; private final boolean updateStatusSubResource; private final boolean updateCustomResource; + private long reprocessDelay = -1; private UpdateControl(T customResource, boolean updateStatusSubResource, boolean updateCustomResource) { if ((updateCustomResource || updateStatusSubResource) && customResource == null) { @@ -41,4 +44,17 @@ public boolean isUpdateCustomResource() { return updateCustomResource; } + public long getReprocessDelay() { + return reprocessDelay; + } + + public UpdateControl reprocessAfter(long milliseconds) { + this.reprocessDelay = milliseconds; + return this; + } + + public UpdateControl reprocessAfter(long delay, TimeUnit timeUnit) { + this.reprocessDelay = timeUnit.toMillis(delay); + return this; + } } From 4e98deaff08e99930189c9bf2e95c138eee20536 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 6 Aug 2020 17:54:27 +0200 Subject: [PATCH 0002/2246] - create dispatch control class --- .../operator/processing/DispatchControl.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java new file mode 100644 index 0000000000..29a8f36807 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java @@ -0,0 +1,26 @@ +package com.github.containersolutions.operator.processing; + +public class DispatchControl { + + public static DispatchControl reprocessAfter(long milliseconds) { + return new DispatchControl(milliseconds); + } + + public static DispatchControl defaultDispatch() { + return new DispatchControl(-1L); + } + + private DispatchControl(long reprocessDelay) { + this.reprocessDelay = reprocessDelay; + } + + private long reprocessDelay = -1; + + public boolean reprocessEvent() { + return reprocessDelay > 0; + } + + public long getReprocessDelay() { + return reprocessDelay; + } +} From a4e67736e530be880b0564432e224596265c408d Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 24 Aug 2020 22:09:27 +0200 Subject: [PATCH 0003/2246] - intermediate structure for event propagation --- .../operator/api/UpdateControl.java | 3 +++ .../operator/processing/EventConsumer.java | 18 ++++---------- .../operator/processing/EventDispatcher.java | 19 ++++++++++++--- .../operator/processing/EventScheduler.java | 24 +++++++++++++++---- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java index 4c94ee5f5e..1582ceebe9 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java @@ -47,6 +47,9 @@ public boolean isUpdateCustomResource() { public long getReprocessDelay() { return reprocessDelay; } + public boolean isForReprocess() { + return reprocessDelay > 0; + } public UpdateControl reprocessAfter(long milliseconds) { this.reprocessDelay = milliseconds; diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java index 766b4f8c07..b8409ac754 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java @@ -21,23 +21,13 @@ class EventConsumer implements Runnable { @Override public void run() { log.debug("Processing event started: {}", event); - if (processEvent()) { - eventScheduler.eventProcessingFinishedSuccessfully(event); - log.debug("Event processed successfully: {}", event); - } else { - this.eventScheduler.eventProcessingFailed(event); - log.debug("Event processed failed: {}", event); - } - } - - @SuppressWarnings("unchecked") - private boolean processEvent() { try { - eventDispatcher.handleEvent(event); + DispatchControl dispatchControl = eventDispatcher.handleEvent(event); + eventScheduler.eventProcessingFinishedSuccessfully(event, dispatchControl); + log.debug("Event processed successfully: {}", event); } catch (RuntimeException e) { log.error("Processing event {} failed.", event, e); - return false; + this.eventScheduler.eventProcessingFailed(event); } - return true; } } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java index 0d6f5346d1..62bd6a9aab 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java @@ -35,17 +35,17 @@ public EventDispatcher(ResourceController controller, this.generationAware = generationAware; } - public void handleEvent(CustomResourceEvent event) { + public DispatchControl handleEvent(CustomResourceEvent event) { Watcher.Action action = event.getAction(); CustomResource resource = event.getResource(); log.info("Handling {} event for resource {}", action, resource.getMetadata()); if (Watcher.Action.ERROR == action) { log.error("Received error for resource: {}", resource.getMetadata().getName()); - return; + return DispatchControl.defaultDispatch(); } if (markedForDeletion(resource) && !ControllerUtils.hasDefaultFinalizer(resource, resourceDefaultFinalizer)) { log.debug("Skipping event dispatching since its marked for deletion but has no default finalizer: {}", event); - return; + return DispatchControl.defaultDispatch(); } Context context = new DefaultContext(new RetryInfo(event.getRetryCount(), event.getRetryExecution().isLastExecution())); if (markedForDeletion(resource)) { @@ -58,6 +58,7 @@ public void handleEvent(CustomResourceEvent event) { removeFinalizer, hasDefaultFinalizer); } cleanup(resource); + return DispatchControl.defaultDispatch(); } else { if (!ControllerUtils.hasDefaultFinalizer(resource, resourceDefaultFinalizer) && !markedForDeletion(resource)) { /* We always add the default finalizer if missing and not marked for deletion. @@ -66,7 +67,9 @@ public void handleEvent(CustomResourceEvent event) { there is a finalizer. */ updateCustomResourceWithFinalizer(resource); + return DispatchControl.defaultDispatch(); } else { + // todo generation awareness on rescheduled event if (!generationAware || largerGenerationThenProcessedBefore(resource)) { UpdateControl updateControl = controller.createOrUpdateResource(resource, context); if (updateControl.isUpdateStatusSubResource()) { @@ -75,13 +78,23 @@ public void handleEvent(CustomResourceEvent event) { updateCustomResource(updateControl.getCustomResource()); } markLastGenerationProcessed(resource); + return updateControlToDispatchControl(updateControl); } else { log.debug("Skipping processing since generation not increased. Event: {}", event); + return DispatchControl.defaultDispatch(); } } } } + private DispatchControl updateControlToDispatchControl(UpdateControl updateControl) { + if (updateControl.isForReprocess()) { + return DispatchControl.reprocessAfter(updateControl.getReprocessDelay()); + } else { + return DispatchControl.defaultDispatch(); + } + } + public boolean largerGenerationThenProcessedBefore(CustomResource resource) { Long lastGeneration = lastGenerationProcessedSuccessfully.get(resource.getMetadata().getUid()); if (lastGeneration == null) { diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java index b72ba8da69..11d955d849 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java @@ -88,29 +88,45 @@ void scheduleEventFromApi(CustomResourceEvent event) { private void scheduleEventForExecution(CustomResourceEvent event) { try { lock.lock(); - log.trace("Current queue size {}", executor.getQueue().size()); - log.debug("Scheduling event for execution: {}", event); Optional nextBackOff = event.nextBackOff(); if (!nextBackOff.isPresent()) { log.warn("Event max retry limit reached. Will be discarded. {}", event); return; } + scheduleEventForExecutionWithBackoff(event, nextBackOff.get()); + } finally { + lock.unlock(); + } + } + + private void scheduleEventForExecutionWithBackoff(CustomResourceEvent event, long backOff) { + try { + lock.lock(); + log.trace("Current queue size {}", executor.getQueue().size()); + log.debug("Scheduling event for execution: {}", event); + eventStore.addEventUnderProcessing(event); executor.schedule(new EventConsumer(event, eventDispatcher, this), - nextBackOff.get(), TimeUnit.MILLISECONDS); + backOff, TimeUnit.MILLISECONDS); log.trace("Scheduled task for event: {}", event); } finally { lock.unlock(); } } - void eventProcessingFinishedSuccessfully(CustomResourceEvent event) { + + void eventProcessingFinishedSuccessfully(CustomResourceEvent event, DispatchControl dispatchControl) { try { lock.lock(); eventStore.removeEventUnderProcessing(event.resourceUid()); if (eventStore.containsNotScheduledEvent(event.resourceUid())) { log.debug("Scheduling recent event for processing: {}", event); scheduleNotYetScheduledEventForExecution(event.resourceUid()); + } else if (dispatchControl.reprocessEvent()) { + // reset retry - if putting back an event the old retries info does not make sense anymore. + CustomResourceEvent eventWithRetryReset + = new CustomResourceEvent(event.getAction(), event.getResource(), retry); + scheduleEventForExecutionWithBackoff(eventWithRetryReset, dispatchControl.getReprocessDelay()); } } finally { lock.unlock(); From 5aed01663df4a9cde8507a2f78798c1cdd696e7a Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 28 Aug 2020 11:14:37 +0200 Subject: [PATCH 0004/2246] - steps for reprocessing related changes --- .../operator/api/DeleteControl.java | 28 ++++++ .../operator/api/ReprocessControl.java | 25 ++++++ .../operator/api/ResourceController.java | 2 +- .../operator/api/UpdateControl.java | 20 +---- .../processing/CustomResourceEvent.java | 9 ++ .../operator/processing/DispatchControl.java | 22 +++-- .../operator/processing/EventConsumer.java | 20 +++-- .../operator/processing/EventDispatcher.java | 88 +++++++++++-------- .../operator/processing/EventScheduler.java | 84 +++++++++--------- .../operator/processing/ResourceCache.java | 20 +++++ 10 files changed, 206 insertions(+), 112 deletions(-) create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/api/DeleteControl.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/api/ReprocessControl.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/ResourceCache.java diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/DeleteControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/DeleteControl.java new file mode 100644 index 0000000000..691889cd5c --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/DeleteControl.java @@ -0,0 +1,28 @@ +package com.github.containersolutions.operator.api; + +public class DeleteControl extends ReprocessControl { + + private boolean removeFinalizer = true; + + public boolean getRemoveFinalizer() { + return removeFinalizer; + } + + public DeleteControl setRemoveFinalizer(boolean removeFinalizer) { + this.removeFinalizer = removeFinalizer; + validate(); + return this; + } + + public ReprocessControl reprocessAfter(long milliseconds) { + super.reprocessAfter(milliseconds); + validate(); + return this; + } + + private void validate() { + if (super.isForReprocess() && removeFinalizer) { + throw new IllegalStateException("If finalizer is to be removed, cannot reprocess."); + } + } +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/ReprocessControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/ReprocessControl.java new file mode 100644 index 0000000000..fa627c78d8 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/ReprocessControl.java @@ -0,0 +1,25 @@ +package com.github.containersolutions.operator.api; + +import java.util.concurrent.TimeUnit; + +public class ReprocessControl { + + private long reprocessDelay = -1; + + public long getReprocessDelay() { + return reprocessDelay; + } + public boolean isForReprocess() { + return reprocessDelay > 0; + } + + public ReprocessControl reprocessAfter(long milliseconds) { + this.reprocessDelay = milliseconds; + return this; + } + + public ReprocessControl reprocessAfter(long delay, TimeUnit timeUnit) { + reprocessAfter(timeUnit.toMillis(delay)); + return this; + } +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceController.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceController.java index 62b204df2a..ea1cd74543 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceController.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceController.java @@ -15,7 +15,7 @@ public interface ResourceController { * @return true - so the finalizer is automatically removed after the call. * false if you don't want to remove the finalizer. Note that this is ALMOST NEVER the case. */ - boolean deleteResource(R resource, Context context); + DeleteControl deleteResource(R resource, Context context); /** * The implementation of this operation is required to be idempotent. diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java index 1582ceebe9..c072f5032e 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/UpdateControl.java @@ -4,12 +4,12 @@ import java.util.concurrent.TimeUnit; -public class UpdateControl { +public class UpdateControl extends ReprocessControl { private final T customResource; private final boolean updateStatusSubResource; private final boolean updateCustomResource; - private long reprocessDelay = -1; + private UpdateControl(T customResource, boolean updateStatusSubResource, boolean updateCustomResource) { if ((updateCustomResource || updateStatusSubResource) && customResource == null) { @@ -44,20 +44,4 @@ public boolean isUpdateCustomResource() { return updateCustomResource; } - public long getReprocessDelay() { - return reprocessDelay; - } - public boolean isForReprocess() { - return reprocessDelay > 0; - } - - public UpdateControl reprocessAfter(long milliseconds) { - this.reprocessDelay = milliseconds; - return this; - } - - public UpdateControl reprocessAfter(long delay, TimeUnit timeUnit) { - this.reprocessDelay = timeUnit.toMillis(delay); - return this; - } } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java index 433131a312..4331ce084d 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java @@ -13,6 +13,7 @@ public class CustomResourceEvent { private final Watcher.Action action; private final CustomResource resource; private int retryCount = -1; + private boolean processRegardlessOfGeneration = false; public CustomResourceEvent(Watcher.Action action, CustomResource resource, Retry retry) { this.action = action; @@ -20,6 +21,14 @@ public CustomResourceEvent(Watcher.Action action, CustomResource resource, Retry this.retryExecution = retry.initExecution(); } + public boolean isProcessRegardlessOfGeneration() { + return processRegardlessOfGeneration; + } + + public void setProcessRegardlessOfGeneration(boolean processRegardlessOfGeneration) { + this.processRegardlessOfGeneration = processRegardlessOfGeneration; + } + public Watcher.Action getAction() { return action; } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java index 29a8f36807..575ec9ac78 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/DispatchControl.java @@ -2,20 +2,28 @@ public class DispatchControl { + private final long reprocessDelay; + private final boolean error; + + // todo validate state on setters + + public static DispatchControl errorDuringDispatch() { + return new DispatchControl(-1L, true); + } + public static DispatchControl reprocessAfter(long milliseconds) { - return new DispatchControl(milliseconds); + return new DispatchControl(milliseconds, false); } public static DispatchControl defaultDispatch() { - return new DispatchControl(-1L); + return new DispatchControl(-1L, false); } - private DispatchControl(long reprocessDelay) { + private DispatchControl(long reprocessDelay, boolean error) { this.reprocessDelay = reprocessDelay; + this.error = error; } - private long reprocessDelay = -1; - public boolean reprocessEvent() { return reprocessDelay > 0; } @@ -23,4 +31,8 @@ public boolean reprocessEvent() { public long getReprocessDelay() { return reprocessDelay; } + + public boolean isError() { + return error; + } } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java index b8409ac754..f803e03552 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java @@ -20,14 +20,16 @@ class EventConsumer implements Runnable { @Override public void run() { - log.debug("Processing event started: {}", event); - try { - DispatchControl dispatchControl = eventDispatcher.handleEvent(event); - eventScheduler.eventProcessingFinishedSuccessfully(event, dispatchControl); - log.debug("Event processed successfully: {}", event); - } catch (RuntimeException e) { - log.error("Processing event {} failed.", event, e); - this.eventScheduler.eventProcessingFailed(event); - } + DispatchControl dispatchControl = eventDispatcher.handleEvent(event); + eventScheduler.eventProcessingFinished(event, dispatchControl); +// log.debug("Processing event started: {}", event); +// try { +// DispatchControl dispatchControl = eventDispatcher.handleEvent(event); +// eventScheduler.eventProcessingFinishedSuccessfully(event, dispatchControl); +// log.debug("Event processed successfully: {}", event); +// } catch (RuntimeException e) { +// log.error("Processing event {} failed.", event, e); +// this.eventScheduler.eventProcessingFailed(event); +// } } } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java index 62bd6a9aab..6c2b80f4b4 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java @@ -36,10 +36,18 @@ public EventDispatcher(ResourceController controller, } public DispatchControl handleEvent(CustomResourceEvent event) { - Watcher.Action action = event.getAction(); + try { + return handDispatch(event); + } catch (RuntimeException e) { + log.error("Error during event processing {} failed.", event, e); + return DispatchControl.errorDuringDispatch(); + } + } + + private DispatchControl handDispatch(CustomResourceEvent event) { CustomResource resource = event.getResource(); - log.info("Handling {} event for resource {}", action, resource.getMetadata()); - if (Watcher.Action.ERROR == action) { + log.info("Handling {} event for resource {}", event.getAction(), resource.getMetadata()); + if (Watcher.Action.ERROR == event.getAction()) { log.error("Received error for resource: {}", resource.getMetadata().getName()); return DispatchControl.defaultDispatch(); } @@ -49,45 +57,55 @@ public DispatchControl handleEvent(CustomResourceEvent event) { } Context context = new DefaultContext(new RetryInfo(event.getRetryCount(), event.getRetryExecution().isLastExecution())); if (markedForDeletion(resource)) { - boolean removeFinalizer = controller.deleteResource(resource, context); - boolean hasDefaultFinalizer = ControllerUtils.hasDefaultFinalizer(resource, resourceDefaultFinalizer); - if (removeFinalizer && hasDefaultFinalizer) { - removeDefaultFinalizer(resource); - } else { - log.debug("Skipping finalizer remove. removeFinalizer: {}, hasDefaultFinalizer: {} ", - removeFinalizer, hasDefaultFinalizer); - } - cleanup(resource); + return handleDelete(resource, context); + } else { + return handleCreateOrUpdate(event, resource, context); + } + } + + private DispatchControl handleCreateOrUpdate(CustomResourceEvent event, CustomResource resource, Context context) { + if (!ControllerUtils.hasDefaultFinalizer(resource, resourceDefaultFinalizer) && !markedForDeletion(resource)) { + /* We always add the default finalizer if missing and not marked for deletion. + We execute the controller processing only for processing the event sent as a results + of the finalizer add. This will make sure that the resources are not created before + there is a finalizer. + */ + updateCustomResourceWithFinalizer(resource); return DispatchControl.defaultDispatch(); } else { - if (!ControllerUtils.hasDefaultFinalizer(resource, resourceDefaultFinalizer) && !markedForDeletion(resource)) { - /* We always add the default finalizer if missing and not marked for deletion. - We execute the controller processing only for processing the event sent as a results - of the finalizer add. This will make sure that the resources are not created before - there is a finalizer. - */ - updateCustomResourceWithFinalizer(resource); - return DispatchControl.defaultDispatch(); - } else { - // todo generation awareness on rescheduled event - if (!generationAware || largerGenerationThenProcessedBefore(resource)) { - UpdateControl updateControl = controller.createOrUpdateResource(resource, context); - if (updateControl.isUpdateStatusSubResource()) { - customResourceFacade.updateStatus(updateControl.getCustomResource()); - } else if (updateControl.isUpdateCustomResource()) { - updateCustomResource(updateControl.getCustomResource()); - } - markLastGenerationProcessed(resource); - return updateControlToDispatchControl(updateControl); - } else { - log.debug("Skipping processing since generation not increased. Event: {}", event); - return DispatchControl.defaultDispatch(); + // todo generation awareness on rescheduled event + // todo test regardless generation + if (!generationAware || largerGenerationThenProcessedBefore(resource) || event.isProcessRegardlessOfGeneration()) { + UpdateControl updateControl = controller.createOrUpdateResource(resource, context); + if (updateControl.isUpdateStatusSubResource()) { + customResourceFacade.updateStatus(updateControl.getCustomResource()); + } else if (updateControl.isUpdateCustomResource()) { + updateCustomResource(updateControl.getCustomResource()); } + markLastGenerationProcessed(resource); + return reprocessControlToDispatchControl(updateControl); + } else { + log.debug("Skipping processing since generation not increased. Event: {}", event); + return DispatchControl.defaultDispatch(); } } } - private DispatchControl updateControlToDispatchControl(UpdateControl updateControl) { + private DispatchControl handleDelete(CustomResource resource, Context context) { + // todo unit test new cases + DeleteControl deleteControl = controller.deleteResource(resource, context); + boolean hasDefaultFinalizer = ControllerUtils.hasDefaultFinalizer(resource, resourceDefaultFinalizer); + if (deleteControl.getRemoveFinalizer() && hasDefaultFinalizer) { + removeDefaultFinalizer(resource); + cleanup(resource); + } else { + log.debug("Skipping finalizer remove. removeFinalizer: {}, hasDefaultFinalizer: {} ", + deleteControl.getRemoveFinalizer(), hasDefaultFinalizer); + } + return reprocessControlToDispatchControl(deleteControl); + } + + private DispatchControl reprocessControlToDispatchControl(ReprocessControl updateControl) { if (updateControl.isForReprocess()) { return DispatchControl.reprocessAfter(updateControl.getReprocessDelay()); } else { diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java index 11d955d849..5d179efa09 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java @@ -36,12 +36,14 @@ public class EventScheduler implements Watcher { private final static Logger log = LoggerFactory.getLogger(EventScheduler.class); + private final EventStore eventStore = new EventStore(); + private final ResourceCache resourceCache = new ResourceCache(); + private final EventDispatcher eventDispatcher; private final ScheduledThreadPoolExecutor executor; - private final EventStore eventStore = new EventStore(); private final Retry retry; - private ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantLock(); public EventScheduler(EventDispatcher eventDispatcher, Retry retry) { this.eventDispatcher = eventDispatcher; @@ -54,15 +56,15 @@ public EventScheduler(EventDispatcher eventDispatcher, Retry retry) { public void eventReceived(Watcher.Action action, CustomResource resource) { log.debug("Event received for action: {}, {}: {}", action.toString().toLowerCase(), resource.getClass().getSimpleName(), resource.getMetadata().getName()); - CustomResourceEvent event = new CustomResourceEvent(action, resource, retry); - scheduleEventFromApi(event); + resourceCache.cacheResource(resource); // always store the latest event. Outside the sync block is intentional. + scheduleEventFromApi(new CustomResourceEvent(action, resource, retry)); } void scheduleEventFromApi(CustomResourceEvent event) { try { lock.lock(); - log.debug("Scheduling event from Api: {}", event); if (event.getAction() == Action.DELETED) { + // todo cancel retries and reprocessing log.debug("Skipping delete event for event: {}", event); return; } @@ -86,66 +88,60 @@ void scheduleEventFromApi(CustomResourceEvent event) { } private void scheduleEventForExecution(CustomResourceEvent event) { - try { - lock.lock(); - Optional nextBackOff = event.nextBackOff(); - if (!nextBackOff.isPresent()) { - log.warn("Event max retry limit reached. Will be discarded. {}", event); - return; - } - scheduleEventForExecutionWithBackoff(event, nextBackOff.get()); - } finally { - lock.unlock(); - } - } - - private void scheduleEventForExecutionWithBackoff(CustomResourceEvent event, long backOff) { try { lock.lock(); log.trace("Current queue size {}", executor.getQueue().size()); log.debug("Scheduling event for execution: {}", event); - eventStore.addEventUnderProcessing(event); - executor.schedule(new EventConsumer(event, eventDispatcher, this), - backOff, TimeUnit.MILLISECONDS); - log.trace("Scheduled task for event: {}", event); + executor.execute(new EventConsumer(event, eventDispatcher, this)); } finally { lock.unlock(); } } - - void eventProcessingFinishedSuccessfully(CustomResourceEvent event, DispatchControl dispatchControl) { + void eventProcessingFinished(CustomResourceEvent event, DispatchControl dispatchControl) { try { + // todo log debug messages lock.lock(); eventStore.removeEventUnderProcessing(event.resourceUid()); if (eventStore.containsNotScheduledEvent(event.resourceUid())) { - log.debug("Scheduling recent event for processing: {}", event); scheduleNotYetScheduledEventForExecution(event.resourceUid()); - } else if (dispatchControl.reprocessEvent()) { - // reset retry - if putting back an event the old retries info does not make sense anymore. - CustomResourceEvent eventWithRetryReset - = new CustomResourceEvent(event.getAction(), event.getResource(), retry); - scheduleEventForExecutionWithBackoff(eventWithRetryReset, dispatchControl.getReprocessDelay()); + } else { + // todo reprocess even is there is an event scheduled? + if (dispatchControl.reprocessEvent()) { + scheduleEventForReprocessing(event); + } + if (dispatchControl.isError()) { + scheduleEventForRetry(event); + } } } finally { lock.unlock(); } } - void eventProcessingFailed(CustomResourceEvent event) { - try { - lock.lock(); - eventStore.removeEventUnderProcessing(event.resourceUid()); - if (eventStore.containsNotScheduledEvent(event.resourceUid())) { - log.debug("Event processing failed. Scheduling the most recent event. Failed event: {}", event); - scheduleNotYetScheduledEventForExecution(event.resourceUid()); - } else { - log.debug("Event processing failed. Attempting to re-schedule the event: {}", event); - scheduleEventForExecution(event); - } - } finally { - lock.unlock(); + + private void scheduleEventForReprocessing(CustomResourceEvent event) { + + } + + private void scheduleEventForRetry(CustomResourceEvent event) { + + } + + private class ReprocessSupport implements Runnable { + + @Override + public void run() { + + } + } + + private class RetrySupport implements Runnable { + + @Override + public void run() { + } } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/ResourceCache.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/ResourceCache.java new file mode 100644 index 0000000000..cd38512298 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/ResourceCache.java @@ -0,0 +1,20 @@ +package com.github.containersolutions.operator.processing; + +import io.fabric8.kubernetes.client.CustomResource; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class ResourceCache { + + private final Map resources = new ConcurrentHashMap<>(); + + public void cacheResource(CustomResource resource) { + resources.put(resource.getMetadata().getUid(), resource); + } + + public Optional getLatestResource(String uuid) { + return Optional.ofNullable(resources.get(uuid)); + } +} From 47b12b3f793fa72e4f006ff809187afef70f65c0 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 28 Aug 2020 13:35:06 +0200 Subject: [PATCH 0005/2246] design-docs --- docs/event-design.xml | 1 + 1 file changed, 1 insertion(+) create mode 100644 docs/event-design.xml diff --git a/docs/event-design.xml b/docs/event-design.xml new file mode 100644 index 0000000000..3bb7125b02 --- /dev/null +++ b/docs/event-design.xml @@ -0,0 +1 @@ +5Vtbc5s4GP01nuk+pMNVxo+1k24zu9104nS7fZRB2JpixICc2P31K0DCIGGHOMI4bV5ihBBwdI6+mxjZs/X2zxQmq88kQNHIMoLtyL4eWexvbLF/ecuubPE8s2xYpjgom2oNc/wT8UaDt25wgLJGR0pIRHHSbPRJHCOfNtpgmpKnZreQRM27JnCJlIa5DyO19RsO6Iq/hWvs2z8hvFyJO5sGP7OGojNvyFYwIE+1JvtmZM9SQmj5a72doSgHT+BSXvfxwNnqwVIU0y4X3AB0FX76h/67u/K//WV4d3c/H674KI8w2vAXvnnMx7OMub9CwSZCKX96uhOQpGQTBygf1RjZ06cVpmieQD8/+8RIwNpWdB2xI5P95OOjlKLtwQc3KzgYjxBZI5ruWBdBIpsjyClkAX78tJ8Qc8zbVvXJEHMBOQmW1dh7nNgPDtULYLMOwnaNswRSht3l4eaMB8fNUXHbIn9DMYlZM8dQgg3FwYdcyOzIj2CWYb+JFNpi+l8O6nuXH32vnbnecryLg504iNnb1C7KD7/Xz+0vK47EdeXDoUBZM6Q5YS9ANqmPjkAhFjOYLhF9jmrqHNfm0G2ZQtGWoghS/Nh83LZp5Xf4QnBB461ED0EhR2JG+Zr8qvriIw1UrYJ8IHsiDVTioAxUsKx67dOJBw6vc+VEWcZnGLPF/BJk6zag8lpUC86q2rEC3u8iUtBRpAdm9DwidSSRmrK2uorUlURqymrvWaTC56sR7UtKmIsFKdqbB+PdbJNRsr5HfPqsmTj5h8pLxoK/4YI5pQ06wggv45yr7CImeXuaKxYzr+8DP7HGQZCPMU1Rhn/CRTFeTq4kf/kCDnc6cq+PSZ67pPziUbUE1ol4WG8H14erXA6NadLDouZCL808CcMM9TLpk2dX5oHXY9dsqqLNi2pTt6xCbcuxqTrt33KXsyaRC4HOAR089zbsvN6wUz13TbbMOMmWmWd0OLt6nJMhjdlYClps90RjNvakgWT3qG9jpsY69xxJWaUgYhhOF8z1BMv817vb3CrFMFLt2bkVDC5Pwe4wCtapRGdQhTnSlJ6qMGBIfgg4s8LUoM48RgWSoFjiQSwyjswu2NOMPTAVvQMM1yQOHlY4FqdEX0c0fMRRNRQjxpzflqR0RZaECfhm31r5nBEKaYvHuSCUObU1cpp1alZEfc68NAKlva25nFDpgHURzq3ZXLS1MN6x2z24F8dHk+Pj9M13NQ5/wGtmDCzjHiUp8VGWqcbkDRmb8QF867bGOqut8TTYGiFN82XSrGzUC5YBnTZq/Do9n8mWSU6eErd2lbYnLRFKDqVvbath8IwwPpDoEiswoGsFprcYWCwD585JalSY1VFg5qAKM6WcILBOVJhcSnDPbD0tNR6bYrrY+D/yvJbWzEkAs1XRV5P8JtZYSvlVucTnTKPskusT4EBhWLs5PWMaRfDosg0jkJKWzqk+r5wAcGT99y1bncWntxVRCZ/gWaaBIZnmAk0GQqZsVwPBph7uat14heawRVPy+e5xwyX1N+3GPhv2o3wCvbRXI497tMRZXq1SjdWRkljZhcV2AK5zMxUvsqSi6VsvlFnHK2XGe9tymnUt4XW9kvRekxEiTuy/WmartZ+7OGdAngnQ7Mbo8FykiL5tL8NZ3Rbb1mdO3liCrnNEbw+6m8E03CZnquDzxRFHtSNG6FROFPXsu9iqjzzH8TLKhXobZxTG/gVq1pMLZ5MW0YIqIqlTQLbY+mSrpvpPlu0b24Jkdo03SrYNJVuZNY55omoncgH4zFUluyUTJzlXxixFkOaJ99dINcRRNCMRSYtr7TAMLd8vKksp+YFqZwKwAC7QI27XkVMJrtMi77Zcnlwm1LcpWM3lXaMkIrt1uS72gXYAkRe2og18Dy3CvtAGRke05US2PrRVD7KB9tck+GXQHg+PtvUbod21KtAf2qp3X6uMHvi+4DVQIzNw0bgN6gkY27C3RdszB4daTac3iD2nkG6yiuF6Ce4iL3DaUPeshQ16Qx24g6Ou+sFzuE6iehjxCTNvhd+rjjhDgjZhbcIXkxhJWPOm7kmgtnlsznQvE9P6ZZPVEqD058No2Ckw0h3rD1aVl7I+JxcfJodCUc0pYU/KYYu9or2meB010LhOSZIUadAOm0SLzsXun9tQ9M+KyfPRIy5OLFBI0nywRHxGEQy/y0euKNltScE2dvaWXXDVAOTF4v3VswvOoOVMT9qF4J6aXfDkDxq1ZRfY4f5b7rL7/ot4++Z/ \ No newline at end of file From d25a8c6a050a7904681d67d050bf024c59ab6ddf Mon Sep 17 00:00:00 2001 From: csviri Date: Sun, 6 Sep 2020 20:45:39 +0200 Subject: [PATCH 0006/2246] impl skeleton --- .../operator/api/Context.java | 7 ++ .../processing/CustomResourceEvent.java | 22 +++---- .../operator/processing/EventConsumer.java | 9 --- .../operator/processing/EventScheduler.java | 5 ++ .../processing/event/DefaultEventManager.java | 65 +++++++++++++++++++ .../operator/processing/event/Event.java | 16 +++++ .../processing/event/EventBuffer.java | 4 ++ .../processing/event/EventHandler.java | 7 ++ .../processing/event/EventManager.java | 14 ++++ .../processing/event/EventProducer.java | 18 +++++ .../operator/EventSchedulerTest.java | 10 +-- .../spingboot/starter/TestController.java | 3 + 12 files changed, 154 insertions(+), 26 deletions(-) create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/DefaultEventManager.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/Event.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventBuffer.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventHandler.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventManager.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventProducer.java diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/api/Context.java b/operator-framework/src/main/java/com/github/containersolutions/operator/api/Context.java index 16cde8f72b..656d94580e 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/api/Context.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/api/Context.java @@ -1,9 +1,16 @@ package com.github.containersolutions.operator.api; +import com.github.containersolutions.operator.processing.event.Event; +import com.github.containersolutions.operator.processing.event.EventManager; import io.fabric8.kubernetes.client.CustomResource; +import java.util.List; + public interface Context { RetryInfo retryInfo(); + EventManager getEventManager(); + + List getEvents(); } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java index 4331ce084d..1738b12745 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java @@ -1,5 +1,6 @@ package com.github.containersolutions.operator.processing; +import com.github.containersolutions.operator.processing.event.Event; import com.github.containersolutions.operator.processing.retry.Retry; import com.github.containersolutions.operator.processing.retry.RetryExecution; import io.fabric8.kubernetes.client.CustomResource; @@ -7,17 +8,18 @@ import java.util.Optional; -public class CustomResourceEvent { +public class CustomResourceEvent extends Event { private final RetryExecution retryExecution; private final Watcher.Action action; - private final CustomResource resource; + private int retryCount = -1; private boolean processRegardlessOfGeneration = false; public CustomResourceEvent(Watcher.Action action, CustomResource resource, Retry retry) { + super(resource); this.action = action; - this.resource = resource; + this.retryExecution = retry.initExecution(); } @@ -33,12 +35,8 @@ public Watcher.Action getAction() { return action; } - public CustomResource getResource() { - return resource; - } - public String resourceUid() { - return resource.getMetadata().getUid(); + return getCustomResource().getMetadata().getUid(); } public Optional nextBackOff() { @@ -50,10 +48,10 @@ public Optional nextBackOff() { public String toString() { return "CustomResourceEvent{" + "action=" + action + - ", resource=[ name=" + resource.getMetadata().getName() + ", kind=" + resource.getKind() + - ", apiVersion=" + resource.getApiVersion() + " ,resourceVersion=" + resource.getMetadata().getResourceVersion() + - ", markedForDeletion: " + (resource.getMetadata().getDeletionTimestamp() != null - && !resource.getMetadata().getDeletionTimestamp().isEmpty()) + + ", resource=[ name=" + getCustomResource().getMetadata().getName() + ", kind=" + getCustomResource().getKind() + + ", apiVersion=" + getCustomResource().getApiVersion() + " ,resourceVersion=" + getCustomResource().getMetadata().getResourceVersion() + + ", markedForDeletion: " + (getCustomResource().getMetadata().getDeletionTimestamp() != null + && !getCustomResource().getMetadata().getDeletionTimestamp().isEmpty()) + " ], retriesIndex=" + retryCount + '}'; } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java index f803e03552..8b7fdcbfd3 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java @@ -22,14 +22,5 @@ class EventConsumer implements Runnable { public void run() { DispatchControl dispatchControl = eventDispatcher.handleEvent(event); eventScheduler.eventProcessingFinished(event, dispatchControl); -// log.debug("Processing event started: {}", event); -// try { -// DispatchControl dispatchControl = eventDispatcher.handleEvent(event); -// eventScheduler.eventProcessingFinishedSuccessfully(event, dispatchControl); -// log.debug("Event processed successfully: {}", event); -// } catch (RuntimeException e) { -// log.error("Processing event {} failed.", event, e); -// this.eventScheduler.eventProcessingFailed(event); -// } } } diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java index 5d179efa09..b3cdc79cea 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java @@ -1,6 +1,7 @@ package com.github.containersolutions.operator.processing; +import com.github.containersolutions.operator.processing.event.Event; import com.github.containersolutions.operator.processing.retry.Retry; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClientException; @@ -60,6 +61,10 @@ public void eventReceived(Watcher.Action action, CustomResource resource) { scheduleEventFromApi(new CustomResourceEvent(action, resource, retry)); } + public void handleEvent(Event event) { + + } + void scheduleEventFromApi(CustomResourceEvent event) { try { lock.lock(); diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/DefaultEventManager.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/DefaultEventManager.java new file mode 100644 index 0000000000..333dbc11e3 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/DefaultEventManager.java @@ -0,0 +1,65 @@ +package com.github.containersolutions.operator.processing.event; + +import com.github.containersolutions.operator.processing.EventScheduler; +import io.fabric8.kubernetes.api.model.EventSource; +import io.fabric8.kubernetes.client.CustomResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultEventManager implements EventHandler, EventManager { + + private static final Logger log = LoggerFactory.getLogger(DefaultEventManager.class); + + private Map> eventSources = new ConcurrentHashMap<>(); + + private EventScheduler eventScheduler; + + public DefaultEventManager(EventScheduler eventScheduler) { + this.eventScheduler = eventScheduler; + } + + @Override + public void handleEvent(Event event, EventProducer eventProducer) { + eventScheduler.handleEvent(event); + } + + // Registration should happen from the same thread within controller + @Override + public void registerEventProducer(CustomResource customResource, EventProducer eventProducer) { + List eventSourceList = eventSources.get(customResource); + if (eventSourceList == null) { + eventSourceList = new ArrayList<>(1); + eventSources.put(customResource, eventSourceList); + } + eventSourceList.add(eventProducer); + eventProducer.setEventHandler(this); + eventProducer.eventProducerRegistered(customResource); + } + + // todo think about concurrency when async de-registration happens + @Override + public void deRegisterEventProducer(CustomResource customResource, EventProducer eventProducer) { + List eventSourceList = eventSources.get(customResource); + if (eventSourceList == null || !eventSourceList.contains(eventProducer)) { + log.warn("Event producer: {} not found for custom resource: ", eventProducer, customResource); + } else { + eventSourceList.remove(eventProducer); + eventProducer.eventProducerDeRegistered(customResource); + } + } + + @Override + public List getRegisteredEventSources(CustomResource customResource) { + List eventSourceList = eventSources.get(customResource); + if (eventSourceList == null) { + return Collections.emptyList(); + } + return eventSourceList; + } +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/Event.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/Event.java new file mode 100644 index 0000000000..6a9eae96b1 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/Event.java @@ -0,0 +1,16 @@ +package com.github.containersolutions.operator.processing.event; + +import io.fabric8.kubernetes.client.CustomResource; + +public class Event { + + private CustomResource customResource; + + public Event(CustomResource customResource) { + this.customResource = customResource; + } + + public CustomResource getCustomResource() { + return customResource; + } +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventBuffer.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventBuffer.java new file mode 100644 index 0000000000..26d49d0085 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventBuffer.java @@ -0,0 +1,4 @@ +package com.github.containersolutions.operator.processing.event; + +public class EventBuffer { +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventHandler.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventHandler.java new file mode 100644 index 0000000000..8d5ec36676 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventHandler.java @@ -0,0 +1,7 @@ +package com.github.containersolutions.operator.processing.event; + +public interface EventHandler { + + void handleEvent(Event event, EventProducer eventProducer); + +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventManager.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventManager.java new file mode 100644 index 0000000000..ed0edf200e --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventManager.java @@ -0,0 +1,14 @@ +package com.github.containersolutions.operator.processing.event; + +import io.fabric8.kubernetes.client.CustomResource; + +import java.util.List; + +public interface EventManager { + + void registerEventProducer(CustomResource customResource, EventProducer eventSource); + + void deRegisterEventProducer(CustomResource customResource, EventProducer eventSource); + + List getRegisteredEventSources(CustomResource customResource); +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventProducer.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventProducer.java new file mode 100644 index 0000000000..c2c850aa2e --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventProducer.java @@ -0,0 +1,18 @@ +package com.github.containersolutions.operator.processing.event; + +import io.fabric8.kubernetes.client.CustomResource; + +public abstract class EventProducer { + + protected EventHandler eventHandler; + + public void setEventHandler(EventHandler eventHandler) { + this.eventHandler = eventHandler; + } + + protected void eventProducerRegistered(CustomResource customResource) { + } + + protected void eventProducerDeRegistered(CustomResource customResource) { + } +} diff --git a/operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java b/operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java index 5dec1b5528..0e4de2405a 100644 --- a/operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java +++ b/operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java @@ -49,7 +49,7 @@ public void schedulesEvent() { waitMinimalTimeForExecution(); verify(eventDispatcher, times(1)).handleEvent( - argThat(event -> event.getResource().equals(resource) && event.getAction() == Watcher.Action.MODIFIED)); + argThat(event -> event.getCustomResource().equals(resource) && event.getAction() == Watcher.Action.MODIFIED)); assertThat(eventProcessingList).hasSize(1); } @@ -145,13 +145,13 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() { doAnswer(this::exceptionInExecution).when(eventDispatcher).handleEvent(ArgumentMatchers.argThat(new ArgumentMatcher() { @Override public boolean matches(CustomResourceEvent event) { - return event.getResource().equals(resource1); + return event.getCustomResource().equals(resource1); } })); doAnswer(this::normalExecution).when(eventDispatcher).handleEvent(ArgumentMatchers.argThat(new ArgumentMatcher() { @Override public boolean matches(CustomResourceEvent event) { - return event.getResource().equals(resource2); + return event.getCustomResource().equals(resource2); } })); @@ -196,7 +196,7 @@ private Object normalExecution(InvocationOnMock invocation) { Thread.sleep(INVOCATION_DURATION); LocalDateTime end = LocalDateTime.now(); eventProcessingList.add(new EventProcessingDetail(((CustomResourceEvent) args[0]).getAction(), start, end, - ((CustomResourceEvent) args[0]).getResource())); + ((CustomResourceEvent) args[0]).getCustomResource())); return null; } catch (InterruptedException e) { throw new IllegalStateException(e); @@ -219,7 +219,7 @@ private Object exceptionInExecution(InvocationOnMock invocation) { LocalDateTime end = LocalDateTime.now(); IllegalStateException exception = new IllegalStateException("Exception thrown for testing purposes"); eventProcessingList.add(new EventProcessingDetail(((CustomResourceEvent) args[0]).getAction(), start, end, - ((CustomResourceEvent) args[0]).getResource(), exception)); + ((CustomResourceEvent) args[0]).getCustomResource(), exception)); throw exception; } catch (InterruptedException e) { throw new IllegalStateException(e); diff --git a/spring-boot-starter/src/test/java/com/github/containersolutions/operator/spingboot/starter/TestController.java b/spring-boot-starter/src/test/java/com/github/containersolutions/operator/spingboot/starter/TestController.java index 4b5b94756b..4bad173ab3 100644 --- a/spring-boot-starter/src/test/java/com/github/containersolutions/operator/spingboot/starter/TestController.java +++ b/spring-boot-starter/src/test/java/com/github/containersolutions/operator/spingboot/starter/TestController.java @@ -20,6 +20,9 @@ public boolean deleteResource(CustomResource resource, Context context) { @Override public UpdateControl createOrUpdateResource(CustomResource resource, Context context) { + + + return UpdateControl.noUpdate(); } } From 49a546f360f188f4ac8029f41aec4dd7be5d1296 Mon Sep 17 00:00:00 2001 From: csviri Date: Sun, 6 Sep 2020 20:46:28 +0200 Subject: [PATCH 0007/2246] impl skeleton --- .../containersolutions/operator/processing/EventDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java index 6c2b80f4b4..04a7fa2397 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java @@ -45,7 +45,7 @@ public DispatchControl handleEvent(CustomResourceEvent event) { } private DispatchControl handDispatch(CustomResourceEvent event) { - CustomResource resource = event.getResource(); + CustomResource resource = event.getCustomResource(); log.info("Handling {} event for resource {}", event.getAction(), resource.getMetadata()); if (Watcher.Action.ERROR == event.getAction()) { log.error("Received error for resource: {}", resource.getMetadata().getName()); From 4c263d61635ce8f1e17a49c3ca23fa559390d080 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 7 Sep 2020 19:37:30 +0200 Subject: [PATCH 0008/2246] impl skeleton progress --- .../operator/processing/EventBuffer.java | 44 +++++++ .../operator/processing/EventDispatcher.java | 3 +- .../operator/processing/EventScheduler.java | 114 +++++++----------- .../operator/processing/EventStore.java | 2 + .../operator/processing/ExecutionUnit.java | 33 +++++ ...nsumer.java => ExecutionUnitConsumer.java} | 9 +- .../operator/processing/ProcessingUtils.java | 11 ++ .../{ => event}/CustomResourceEvent.java | 17 ++- .../processing/event/DefaultEventManager.java | 25 ++-- .../operator/processing/event/Event.java | 11 +- .../processing/event/EventBuffer.java | 4 - .../processing/event/EventManager.java | 6 +- .../processing/event/EventProducer.java | 4 +- .../operator/EventDispatcherTest.java | 2 +- .../operator/EventSchedulerTest.java | 2 +- 15 files changed, 176 insertions(+), 111 deletions(-) create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventBuffer.java create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/ExecutionUnit.java rename operator-framework/src/main/java/com/github/containersolutions/operator/processing/{EventConsumer.java => ExecutionUnitConsumer.java} (60%) create mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/ProcessingUtils.java rename operator-framework/src/main/java/com/github/containersolutions/operator/processing/{ => event}/CustomResourceEvent.java (84%) delete mode 100644 operator-framework/src/main/java/com/github/containersolutions/operator/processing/event/EventBuffer.java diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventBuffer.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventBuffer.java new file mode 100644 index 0000000000..02bd44a000 --- /dev/null +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventBuffer.java @@ -0,0 +1,44 @@ +package com.github.containersolutions.operator.processing; + +import com.github.containersolutions.operator.processing.event.CustomResourceEvent; +import com.github.containersolutions.operator.processing.event.Event; +import io.fabric8.kubernetes.client.CustomResource; + +import java.util.*; + +public class EventBuffer { + + private Map> events = new HashMap<>(); + private Map latestCustomResourceEvent = new HashMap<>(); + + public void addEvent(Event event) { + String uid = event.getRelatedCustomResourceUid(); + List crEvents = events.get(uid); + if (crEvents == null) { + crEvents = new ArrayList<>(1); + events.put(uid, crEvents); + } + crEvents.add(event); + } + + public List getAndRemoveEventsForExecution(CustomResource resource) { + String uid = ProcessingUtils.getUID(resource); + List crEvents = events.remove(uid); + if (crEvents == null) { + crEvents = Collections.emptyList(); + } + List result = new ArrayList<>(crEvents.size()+1); + + CustomResourceEvent customResourceEvent = latestCustomResourceEvent.get(uid); + + + return result; + } + + public void addOrUpdateLatestCustomResourceEvent(CustomResourceEvent customResourceEvent) { + latestCustomResourceEvent.put( + ProcessingUtils.getUID(customResourceEvent.getCustomResource()), customResourceEvent); + } + + +} diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java index 04a7fa2397..69f17d7998 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java @@ -2,6 +2,7 @@ import com.github.containersolutions.operator.ControllerUtils; import com.github.containersolutions.operator.api.*; +import com.github.containersolutions.operator.processing.event.CustomResourceEvent; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.MixedOperation; @@ -35,7 +36,7 @@ public EventDispatcher(ResourceController controller, this.generationAware = generationAware; } - public DispatchControl handleEvent(CustomResourceEvent event) { + public DispatchControl handleEvent(ExecutionUnit event) { try { return handDispatch(event); } catch (RuntimeException e) { diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java index b3cdc79cea..e2c3145410 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java @@ -1,6 +1,7 @@ package com.github.containersolutions.operator.processing; +import com.github.containersolutions.operator.processing.event.CustomResourceEvent; import com.github.containersolutions.operator.processing.event.Event; import com.github.containersolutions.operator.processing.retry.Retry; import io.fabric8.kubernetes.client.CustomResource; @@ -9,11 +10,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import static com.github.containersolutions.operator.processing.ProcessingUtils.*; + /** * Requirements: *