|
1 | 1 | package io.javaoperatorsdk.operator.processing;
|
2 | 2 |
|
| 3 | +import java.util.ArrayList; |
| 4 | +import java.util.HashMap; |
| 5 | +import java.util.List; |
3 | 6 | import java.util.Map;
|
4 | 7 | import java.util.Optional;
|
5 | 8 | import java.util.Set;
|
|
32 | 35 | import io.javaoperatorsdk.operator.api.reconciler.Ignore;
|
33 | 36 | import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
|
34 | 37 | import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
|
35 |
| -import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware; |
| 38 | +import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceNotFoundException; |
36 | 39 | import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
|
| 40 | +import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer; |
37 | 41 | import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
|
38 | 42 | import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
|
39 | 43 | import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult;
|
@@ -214,25 +218,40 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
|
214 | 218 | final var ownSources = provider.prepareEventSources(context);
|
215 | 219 | ownSources.forEach(eventSourceManager::registerEventSource);
|
216 | 220 | }
|
217 |
| - managedWorkflow |
218 |
| - .getDependentResourcesByName().entrySet().stream() |
219 |
| - .forEach(drEntry -> { |
220 |
| - if (drEntry.getValue() instanceof EventSourceProvider) { |
221 |
| - final var provider = (EventSourceProvider) drEntry.getValue(); |
222 |
| - final var source = provider.initEventSource(context); |
223 |
| - eventSourceManager.registerEventSource(drEntry.getKey(), source); |
224 |
| - } else { |
225 |
| - Optional<ResourceEventSource> eventSource = |
226 |
| - drEntry.getValue().eventSource(context); |
227 |
| - eventSource.ifPresent(es -> { |
228 |
| - eventSourceManager.registerEventSource(drEntry.getKey(), es); |
229 |
| - }); |
230 |
| - } |
231 |
| - }); |
232 |
| - managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue) |
233 |
| - .filter(EventSourceAware.class::isInstance) |
234 |
| - .forEach(dr -> ((EventSourceAware) dr) |
235 |
| - .selectEventSources(eventSourceManager)); |
| 221 | + |
| 222 | + // register created event sources |
| 223 | + final var dependentResourcesByName = managedWorkflow.getDependentResourcesByName(); |
| 224 | + final var size = dependentResourcesByName.size(); |
| 225 | + if (size > 0) { |
| 226 | + dependentResourcesByName.forEach((key, value) -> { |
| 227 | + if (value instanceof EventSourceProvider) { |
| 228 | + final var provider = (EventSourceProvider) value; |
| 229 | + final var source = provider.initEventSource(context); |
| 230 | + eventSourceManager.registerEventSource(key, source); |
| 231 | + } else { |
| 232 | + Optional<ResourceEventSource> eventSource = value.eventSource(context); |
| 233 | + eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es)); |
| 234 | + } |
| 235 | + }); |
| 236 | + |
| 237 | + // resolve event sources referenced by name for dependents that reuse an existing event source |
| 238 | + final Map<String, List<EventSourceReferencer>> unresolvable = new HashMap<>(size); |
| 239 | + dependentResourcesByName.values().stream() |
| 240 | + .filter(EventSourceReferencer.class::isInstance) |
| 241 | + .map(EventSourceReferencer.class::cast) |
| 242 | + .forEach(dr -> { |
| 243 | + try { |
| 244 | + ((EventSourceReferencer<P>) dr) |
| 245 | + .resolveEventSource(eventSourceManager); |
| 246 | + } catch (EventSourceNotFoundException e) { |
| 247 | + unresolvable.computeIfAbsent(e.getEventSourceName(), s -> new ArrayList<>()).add(dr); |
| 248 | + } |
| 249 | + }); |
| 250 | + if (!unresolvable.isEmpty()) { |
| 251 | + throw new IllegalStateException( |
| 252 | + "Couldn't resolve referenced EventSources: " + unresolvable); |
| 253 | + } |
| 254 | + } |
236 | 255 | }
|
237 | 256 |
|
238 | 257 | @Override
|
|
0 commit comments