Skip to content

Commit aaea1a5

Browse files
committed
feat: key based bulk resource creation (#1521)
1 parent 59e1663 commit aaea1a5

File tree

6 files changed

+133
-131
lines changed

6 files changed

+133
-131
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/ReconcileResult.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@ public static <T> ReconcileResult<T> noOperation(T resource) {
2222
return new ReconcileResult<>(resource, Operation.NONE);
2323
}
2424

25-
@SafeVarargs
26-
public static <T> ReconcileResult<T> aggregatedResult(ReconcileResult<T>... results) {
25+
public static <T> ReconcileResult<T> aggregatedResult(List<ReconcileResult<T>> results) {
2726
if (results == null) {
2827
throw new IllegalArgumentException("Should provide results to aggregate");
2928
}
30-
if (results.length == 1) {
31-
return results[0];
29+
if (results.size() == 1) {
30+
return results.get(0);
3231
}
33-
final Map<T, Operation> operations = new HashMap<>(results.length);
32+
final Map<T, Operation> operations = new HashMap<>(results.size());
3433
for (ReconcileResult<T> res : results) {
3534
res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
3635
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java

+40-42
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.dependent;
22

3-
import java.util.Optional;
3+
import java.util.*;
44

55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
@@ -26,90 +26,91 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
2626
protected Updater<R, P> updater;
2727
protected BulkDependentResource<R, P> bulkDependentResource;
2828
private ResourceDiscriminator<R, P> resourceDiscriminator;
29-
private int currentCount;
3029

31-
@SuppressWarnings("unchecked")
32-
public AbstractDependentResource() {
30+
@SuppressWarnings({"unchecked", "rawtypes"})
31+
protected AbstractDependentResource() {
3332
creator = creatable ? (Creator<R, P>) this : null;
3433
updater = updatable ? (Updater<R, P>) this : null;
3534

36-
bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
35+
bulkDependentResource = bulk ? (BulkDependentResource) this : null;
3736
}
3837

38+
3939
@Override
4040
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
4141
if (bulk) {
42-
final var count = bulkDependentResource.count(primary, context);
43-
deleteBulkResourcesIfRequired(count, primary, context);
44-
@SuppressWarnings("unchecked")
45-
final ReconcileResult<R>[] results = new ReconcileResult[count];
46-
for (int i = 0; i < count; i++) {
47-
results[i] = reconcileIndexAware(primary, i, context);
42+
final var targetKeys = bulkDependentResource.targetKeys(primary, context);
43+
Map<String, R> actualResources =
44+
bulkDependentResource.getSecondaryResources(primary, context);
45+
46+
deleteBulkResourcesIfRequired(targetKeys, actualResources, primary, context);
47+
final List<ReconcileResult<R>> results = new ArrayList<>(targetKeys.size());
48+
49+
for (String key : targetKeys) {
50+
results.add(reconcileIndexAware(primary, actualResources.get(key), key, context));
4851
}
49-
currentCount = count;
5052
return ReconcileResult.aggregatedResult(results);
5153
} else {
52-
return reconcileIndexAware(primary, 0, context);
54+
var actualResource = getSecondaryResource(primary, context);
55+
return reconcileIndexAware(primary, actualResource.orElse(null), null, context);
5356
}
5457
}
5558

56-
protected void deleteBulkResourcesIfRequired(int targetCount, P primary, Context<P> context) {
57-
if (targetCount >= currentCount) {
58-
return;
59-
}
60-
for (int i = targetCount; i < currentCount; i++) {
61-
var resource = bulkDependentResource.getSecondaryResource(primary, i, context);
62-
var index = i;
63-
resource.ifPresent(
64-
r -> bulkDependentResource.deleteBulkResourceWithIndex(primary, r, index, context));
65-
}
59+
@SuppressWarnings({"rawtypes"})
60+
protected void deleteBulkResourcesIfRequired(Set targetKeys, Map<String, R> actualResources,
61+
P primary, Context<P> context) {
62+
actualResources.forEach((key, value) -> {
63+
if (!targetKeys.contains(key)) {
64+
bulkDependentResource.deleteBulkResource(primary, value, key, context);
65+
}
66+
});
6667
}
6768

68-
protected ReconcileResult<R> reconcileIndexAware(P primary, int i, Context<P> context) {
69-
Optional<R> maybeActual = bulk ? bulkDependentResource.getSecondaryResource(primary, i, context)
70-
: getSecondaryResource(primary, context);
69+
protected ReconcileResult<R> reconcileIndexAware(P primary, R resource, String key,
70+
Context<P> context) {
7171
if (creatable || updatable) {
72-
if (maybeActual.isEmpty()) {
72+
if (resource == null) {
7373
if (creatable) {
74-
var desired = desiredIndexAware(primary, i, context);
74+
var desired = desiredIndexAware(primary, key, context);
7575
throwIfNull(desired, primary, "Desired");
7676
logForOperation("Creating", primary, desired);
7777
var createdResource = handleCreate(desired, primary, context);
7878
return ReconcileResult.resourceCreated(createdResource);
7979
}
8080
} else {
81-
final var actual = maybeActual.get();
8281
if (updatable) {
8382
final Matcher.Result<R> match;
8483
if (bulk) {
85-
match = bulkDependentResource.match(actual, primary, i, context);
84+
match = bulkDependentResource.match(resource, primary, key, context);
8685
} else {
87-
match = updater.match(actual, primary, context);
86+
match = updater.match(resource, primary, context);
8887
}
8988
if (!match.matched()) {
9089
final var desired =
91-
match.computedDesired().orElse(desiredIndexAware(primary, i, context));
90+
match.computedDesired().orElse(desiredIndexAware(primary, key, context));
9291
throwIfNull(desired, primary, "Desired");
9392
logForOperation("Updating", primary, desired);
94-
var updatedResource = handleUpdate(actual, desired, primary, context);
93+
var updatedResource = handleUpdate(resource, desired, primary, context);
9594
return ReconcileResult.resourceUpdated(updatedResource);
9695
}
9796
} else {
98-
log.debug("Update skipped for dependent {} as it matched the existing one", actual);
97+
log.debug("Update skipped for dependent {} as it matched the existing one", resource);
9998
}
10099
}
101100
} else {
102101
log.debug(
103102
"Dependent {} is read-only, implement Creator and/or Updater interfaces to modify it",
104103
getClass().getSimpleName());
105104
}
106-
return ReconcileResult.noOperation(maybeActual.orElse(null));
105+
return ReconcileResult.noOperation(resource);
107106
}
108107

109-
private R desiredIndexAware(P primary, int i, Context<P> context) {
110-
return bulk ? desired(primary, i, context) : desired(primary, context);
108+
private R desiredIndexAware(P primary, String key, Context<P> context) {
109+
return bulk ? bulkDependentResource.desired(primary, key, context)
110+
: desired(primary, context);
111111
}
112112

113+
@Override
113114
public Optional<R> getSecondaryResource(P primary, Context<P> context) {
114115
return resourceDiscriminator == null ? context.getSecondaryResource(resourceType())
115116
: resourceDiscriminator.distinguish(resourceType(), primary, context);
@@ -172,13 +173,10 @@ protected R desired(P primary, Context<P> context) {
172173
"desired method must be implemented if this DependentResource can be created and/or updated");
173174
}
174175

175-
protected R desired(P primary, int index, Context<P> context) {
176-
throw new IllegalStateException("Must be implemented for bulk DependentResource creation");
177-
}
178-
179176
public void delete(P primary, Context<P> context) {
180177
if (bulk) {
181-
deleteBulkResourcesIfRequired(0, primary, context);
178+
var actualResources = bulkDependentResource.getSecondaryResources(primary, context);
179+
deleteBulkResourcesIfRequired(Collections.emptySet(), actualResources, primary, context);
182180
} else {
183181
handleDelete(primary, context);
184182
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.dependent;
22

3-
import java.util.Optional;
3+
import java.util.Map;
4+
import java.util.Set;
45

56
import io.fabric8.kubernetes.api.model.HasMetadata;
67
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -13,24 +14,27 @@
1314
* {@link Creator} and {@link Deleter} interfaces out of the box. A concrete dependent resource can
1415
* implement additionally also {@link Updater}.
1516
*/
16-
public interface BulkDependentResource<R, P extends HasMetadata> extends Creator<R, P>, Deleter<P> {
17+
public interface BulkDependentResource<R, P extends HasMetadata>
18+
extends Creator<R, P>, Deleter<P> {
1719

1820
/**
1921
* @return number of resources to create
2022
*/
21-
int count(P primary, Context<P> context);
23+
Set<String> targetKeys(P primary, Context<P> context);
2224

23-
R desired(P primary, int index, Context<P> context);
25+
Map<String, R> getSecondaryResources(P primary, Context<P> context);
26+
27+
R desired(P primary, String key, Context<P> context);
2428

2529
/**
2630
* Used to delete resource if the desired count is lower than the actual count of a resource.
2731
*
2832
* @param primary resource
2933
* @param resource actual resource from the cache for the index
30-
* @param i index of the resource
34+
* @param key key of the resource
3135
* @param context actual context
3236
*/
33-
void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context);
37+
void deleteBulkResource(P primary, R resource, String key, Context<P> context);
3438

3539
/**
3640
* Determines whether the specified secondary resource matches the desired state with target index
@@ -39,14 +43,13 @@ public interface BulkDependentResource<R, P extends HasMetadata> extends Creator
3943
*
4044
* @param actualResource the resource we want to determine whether it's matching the desired state
4145
* @param primary the primary resource from which the desired state is inferred
46+
* @param key key of the resource
4247
* @param context the context in which the resource is being matched
4348
* @return a {@link Result} encapsulating whether the resource matched its desired state and this
4449
* associated state if it was computed as part of the matching process. Use the static
4550
* convenience methods ({@link Result#nonComputed(boolean)} and
4651
* {@link Result#computed(boolean, Object)})
4752
*/
48-
Result<R> match(R actualResource, P primary, int index, Context<P> context);
49-
50-
Optional<R> getSecondaryResource(P primary, int index, Context<P> context);
53+
Result<R> match(R actualResource, P primary, String key, Context<P> context);
5154

5255
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,18 @@ public Result<R> match(R actualResource, P primary, Context<P> context) {
139139
return matcher.match(actualResource, primary, context);
140140
}
141141

142-
public Result<R> match(R actualResource, P primary, int index, Context<P> context) {
143-
final var desired = desired(primary, index, context);
144-
return GenericKubernetesResourceMatcher.match(desired, actualResource, false);
142+
public Result<R> match(R actualResource, P primary, String key, Context<P> context) {
143+
final var desired = bulkDependentResource.desired(primary, key, context);
144+
return GenericKubernetesResourceMatcher.match((R) desired, actualResource, false);
145145
}
146146

147147
protected void handleDelete(P primary, Context<P> context) {
148148
var resource = getSecondaryResource(primary, context);
149149
resource.ifPresent(r -> client.resource(r).delete());
150150
}
151151

152-
public void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context) {
152+
153+
public void deleteBulkResource(P primary, R resource, String key, Context<P> context) {
153154
client.resource(resource).delete();
154155
}
155156

@@ -229,11 +230,6 @@ protected R desired(P primary, Context<P> context) {
229230
return super.desired(primary, context);
230231
}
231232

232-
@Override
233-
protected R desired(P primary, int index, Context<P> context) {
234-
return super.desired(primary, index, context);
235-
}
236-
237233
private void prepareEventFiltering(R desired, ResourceID resourceID) {
238234
((InformerEventSource<R, P>) eventSource().orElseThrow())
239235
.prepareForCreateOrUpdateEventFiltering(resourceID, desired);
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package io.javaoperatorsdk.operator.sample.bulkdependent;
22

3-
import java.util.Map;
4-
import java.util.Optional;
5-
import java.util.stream.Collectors;
3+
import java.util.*;
64

75
import io.fabric8.kubernetes.api.model.ConfigMap;
86
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
@@ -27,43 +25,49 @@ public class ConfigMapDeleterBulkDependentResource
2725
public static final String LABEL_KEY = "bulk";
2826
public static final String LABEL_VALUE = "true";
2927
public static final String ADDITIONAL_DATA_KEY = "additionalData";
28+
public static final String INDEX_DELIMITER = "-";
3029

3130
public ConfigMapDeleterBulkDependentResource() {
3231
super(ConfigMap.class);
3332
}
3433

3534
@Override
36-
public ConfigMap desired(BulkDependentTestCustomResource primary,
37-
int index, Context<BulkDependentTestCustomResource> context) {
35+
public Set<String> targetKeys(BulkDependentTestCustomResource primary,
36+
Context<BulkDependentTestCustomResource> context) {
37+
var number = primary.getSpec().getNumberOfResources();
38+
Set<String> res = new HashSet<>();
39+
for (int i = 0; i < number; i++) {
40+
res.add(Integer.toString(i));
41+
}
42+
return res;
43+
}
44+
45+
@Override
46+
public ConfigMap desired(BulkDependentTestCustomResource primary, String key,
47+
Context<BulkDependentTestCustomResource> context) {
3848
ConfigMap configMap = new ConfigMap();
3949
configMap.setMetadata(new ObjectMetaBuilder()
40-
.withName(primary.getMetadata().getName() + "-" + index)
50+
.withName(primary.getMetadata().getName() + INDEX_DELIMITER + key)
4151
.withNamespace(primary.getMetadata().getNamespace())
4252
.withLabels(Map.of(LABEL_KEY, LABEL_VALUE))
4353
.build());
4454
configMap.setData(
45-
Map.of("number", "" + index, ADDITIONAL_DATA_KEY, primary.getSpec().getAdditionalData()));
55+
Map.of("number", "" + key, ADDITIONAL_DATA_KEY, primary.getSpec().getAdditionalData()));
4656
return configMap;
4757
}
4858

4959
@Override
50-
public int count(BulkDependentTestCustomResource primary,
60+
public Map<String, ConfigMap> getSecondaryResources(BulkDependentTestCustomResource primary,
5161
Context<BulkDependentTestCustomResource> context) {
52-
return primary.getSpec().getNumberOfResources();
53-
}
54-
55-
@Override
56-
public Optional<ConfigMap> getSecondaryResource(BulkDependentTestCustomResource primary,
57-
int index, Context<BulkDependentTestCustomResource> context) {
58-
var resources = context.getSecondaryResources(resourceType()).stream()
59-
.filter(r -> r.getMetadata().getName().endsWith("-" + index))
60-
.collect(Collectors.toList());
61-
if (resources.isEmpty()) {
62-
return Optional.empty();
63-
} else if (resources.size() > 1) {
64-
throw new IllegalStateException("More than one resource found for index:" + index);
65-
} else {
66-
return Optional.of(resources.get(0));
67-
}
62+
var configMaps = context.getSecondaryResources(ConfigMap.class);
63+
Map<String, ConfigMap> result = new HashMap<>(configMaps.size());
64+
configMaps.forEach(cm -> {
65+
String name = cm.getMetadata().getName();
66+
if (name.startsWith(primary.getMetadata().getName())) {
67+
String key = name.substring(name.lastIndexOf(INDEX_DELIMITER) + 1);
68+
result.put(key, cm);
69+
}
70+
});
71+
return result;
6872
}
6973
}

0 commit comments

Comments
 (0)