|
1 | 1 | package io.javaoperatorsdk.operator.processing;
|
2 | 2 |
|
| 3 | +import java.util.ArrayList; |
| 4 | +import java.util.HashMap; |
| 5 | +import java.util.List; |
3 | 6 | import java.util.Map;
|
4 | 7 | import java.util.Optional;
|
5 | 8 | import java.util.Set;
|
|
32 | 35 | import io.javaoperatorsdk.operator.api.reconciler.Ignore;
|
33 | 36 | import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
|
34 | 37 | import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
|
35 |
| -import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware; |
| 38 | +import io.javaoperatorsdk.operator.api.reconciler.dependent.DeferrableEventSourceHolder; |
36 | 39 | import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
|
37 | 40 | import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
|
38 | 41 | import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
|
@@ -214,25 +217,35 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
|
214 | 217 | final var ownSources = provider.prepareEventSources(context);
|
215 | 218 | ownSources.forEach(eventSourceManager::registerEventSource);
|
216 | 219 | }
|
217 |
| - managedWorkflow |
218 |
| - .getDependentResourcesByName().entrySet().stream() |
219 |
| - .forEach(drEntry -> { |
220 |
| - if (drEntry.getValue() instanceof EventSourceProvider) { |
221 |
| - final var provider = (EventSourceProvider) drEntry.getValue(); |
222 |
| - final var source = provider.initEventSource(context); |
223 |
| - eventSourceManager.registerEventSource(drEntry.getKey(), source); |
224 |
| - } else { |
225 |
| - Optional<ResourceEventSource> eventSource = |
226 |
| - drEntry.getValue().eventSource(context); |
227 |
| - eventSource.ifPresent(es -> { |
228 |
| - eventSourceManager.registerEventSource(drEntry.getKey(), es); |
229 |
| - }); |
230 |
| - } |
231 |
| - }); |
232 |
| - managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue) |
233 |
| - .filter(EventSourceAware.class::isInstance) |
234 |
| - .forEach(dr -> ((EventSourceAware) dr) |
235 |
| - .selectEventSources(eventSourceManager)); |
| 220 | + |
| 221 | + // register created event sources |
| 222 | + final var dependentResourcesByName = managedWorkflow.getDependentResourcesByName(); |
| 223 | + final var size = dependentResourcesByName.size(); |
| 224 | + if (size > 0) { |
| 225 | + dependentResourcesByName.forEach((key, value) -> { |
| 226 | + if (value instanceof EventSourceProvider) { |
| 227 | + final var provider = (EventSourceProvider) value; |
| 228 | + final var source = provider.initEventSource(context); |
| 229 | + eventSourceManager.registerEventSource(key, source); |
| 230 | + } else { |
| 231 | + Optional<ResourceEventSource> eventSource = value.eventSource(context); |
| 232 | + eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es)); |
| 233 | + } |
| 234 | + }); |
| 235 | + |
| 236 | + // resolve event sources referenced by name for dependents that reuse an existing event source |
| 237 | + final Map<String, List<DeferrableEventSourceHolder>> unresolvable = new HashMap<>(size); |
| 238 | + dependentResourcesByName.values().stream() |
| 239 | + .filter(DeferrableEventSourceHolder.class::isInstance) |
| 240 | + .map(DeferrableEventSourceHolder.class::cast) |
| 241 | + .forEach(dr -> ((DeferrableEventSourceHolder<P>) dr) |
| 242 | + .resolveEventSource(eventSourceManager).ifPresent(unresolved -> unresolvable |
| 243 | + .computeIfAbsent(unresolved, s -> new ArrayList<>()).add(dr))); |
| 244 | + if (!unresolvable.isEmpty()) { |
| 245 | + throw new IllegalStateException( |
| 246 | + "Couldn't resolve referenced EventSources: " + unresolvable); |
| 247 | + } |
| 248 | + } |
236 | 249 | }
|
237 | 250 |
|
238 | 251 | @Override
|
|
0 commit comments