Skip to content

feat: key based bulk resource creation #1521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ public static <T> ReconcileResult<T> noOperation(T resource) {
return new ReconcileResult<>(resource, Operation.NONE);
}

@SafeVarargs
public static <T> ReconcileResult<T> aggregatedResult(ReconcileResult<T>... results) {
public static <T> ReconcileResult<T> aggregatedResult(List<ReconcileResult<T>> results) {
if (results == null) {
throw new IllegalArgumentException("Should provide results to aggregate");
}
if (results.length == 1) {
return results[0];
if (results.size() == 1) {
return results.get(0);
}
final Map<T, Operation> operations = new HashMap<>(results.length);
final Map<T, Operation> operations = new HashMap<>(results.size());
for (ReconcileResult<T> res : results) {
res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.javaoperatorsdk.operator.processing.dependent;

import java.util.Optional;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,90 +26,91 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
protected Updater<R, P> updater;
protected BulkDependentResource<R, P> bulkDependentResource;
private ResourceDiscriminator<R, P> resourceDiscriminator;
private int currentCount;

@SuppressWarnings("unchecked")
public AbstractDependentResource() {
@SuppressWarnings({"unchecked", "rawtypes"})
protected AbstractDependentResource() {
creator = creatable ? (Creator<R, P>) this : null;
updater = updatable ? (Updater<R, P>) this : null;

bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
bulkDependentResource = bulk ? (BulkDependentResource) this : null;
}


@Override
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
if (bulk) {
final var count = bulkDependentResource.count(primary, context);
deleteBulkResourcesIfRequired(count, primary, context);
@SuppressWarnings("unchecked")
final ReconcileResult<R>[] results = new ReconcileResult[count];
for (int i = 0; i < count; i++) {
results[i] = reconcileIndexAware(primary, i, context);
final var targetKeys = bulkDependentResource.targetKeys(primary, context);
Map<String, R> actualResources =
bulkDependentResource.getSecondaryResources(primary, context);

deleteBulkResourcesIfRequired(targetKeys, actualResources, primary, context);
final List<ReconcileResult<R>> results = new ArrayList<>(targetKeys.size());

for (String key : targetKeys) {
results.add(reconcileIndexAware(primary, actualResources.get(key), key, context));
}
currentCount = count;
return ReconcileResult.aggregatedResult(results);
} else {
return reconcileIndexAware(primary, 0, context);
var actualResource = getSecondaryResource(primary, context);
return reconcileIndexAware(primary, actualResource.orElse(null), null, context);
}
}

protected void deleteBulkResourcesIfRequired(int targetCount, P primary, Context<P> context) {
if (targetCount >= currentCount) {
return;
}
for (int i = targetCount; i < currentCount; i++) {
var resource = bulkDependentResource.getSecondaryResource(primary, i, context);
var index = i;
resource.ifPresent(
r -> bulkDependentResource.deleteBulkResourceWithIndex(primary, r, index, context));
}
@SuppressWarnings({"rawtypes"})
protected void deleteBulkResourcesIfRequired(Set targetKeys, Map<String, R> actualResources,
P primary, Context<P> context) {
actualResources.forEach((key, value) -> {
if (!targetKeys.contains(key)) {
bulkDependentResource.deleteBulkResource(primary, value, key, context);
}
});
}

protected ReconcileResult<R> reconcileIndexAware(P primary, int i, Context<P> context) {
Optional<R> maybeActual = bulk ? bulkDependentResource.getSecondaryResource(primary, i, context)
: getSecondaryResource(primary, context);
protected ReconcileResult<R> reconcileIndexAware(P primary, R resource, String key,
Context<P> context) {
if (creatable || updatable) {
if (maybeActual.isEmpty()) {
if (resource == null) {
if (creatable) {
var desired = desiredIndexAware(primary, i, context);
var desired = desiredIndexAware(primary, key, context);
throwIfNull(desired, primary, "Desired");
logForOperation("Creating", primary, desired);
var createdResource = handleCreate(desired, primary, context);
return ReconcileResult.resourceCreated(createdResource);
}
} else {
final var actual = maybeActual.get();
if (updatable) {
final Matcher.Result<R> match;
if (bulk) {
match = bulkDependentResource.match(actual, primary, i, context);
match = bulkDependentResource.match(resource, primary, key, context);
} else {
match = updater.match(actual, primary, context);
match = updater.match(resource, primary, context);
}
if (!match.matched()) {
final var desired =
match.computedDesired().orElse(desiredIndexAware(primary, i, context));
match.computedDesired().orElse(desiredIndexAware(primary, key, context));
throwIfNull(desired, primary, "Desired");
logForOperation("Updating", primary, desired);
var updatedResource = handleUpdate(actual, desired, primary, context);
var updatedResource = handleUpdate(resource, desired, primary, context);
return ReconcileResult.resourceUpdated(updatedResource);
}
} else {
log.debug("Update skipped for dependent {} as it matched the existing one", actual);
log.debug("Update skipped for dependent {} as it matched the existing one", resource);
}
}
} else {
log.debug(
"Dependent {} is read-only, implement Creator and/or Updater interfaces to modify it",
getClass().getSimpleName());
}
return ReconcileResult.noOperation(maybeActual.orElse(null));
return ReconcileResult.noOperation(resource);
}

private R desiredIndexAware(P primary, int i, Context<P> context) {
return bulk ? desired(primary, i, context) : desired(primary, context);
private R desiredIndexAware(P primary, String key, Context<P> context) {
return bulk ? bulkDependentResource.desired(primary, key, context)
: desired(primary, context);
}

@Override
public Optional<R> getSecondaryResource(P primary, Context<P> context) {
return resourceDiscriminator == null ? context.getSecondaryResource(resourceType())
: resourceDiscriminator.distinguish(resourceType(), primary, context);
Expand Down Expand Up @@ -172,13 +173,10 @@ protected R desired(P primary, Context<P> context) {
"desired method must be implemented if this DependentResource can be created and/or updated");
}

protected R desired(P primary, int index, Context<P> context) {
throw new IllegalStateException("Must be implemented for bulk DependentResource creation");
}

public void delete(P primary, Context<P> context) {
if (bulk) {
deleteBulkResourcesIfRequired(0, primary, context);
var actualResources = bulkDependentResource.getSecondaryResources(primary, context);
deleteBulkResourcesIfRequired(Collections.emptySet(), actualResources, primary, context);
} else {
handleDelete(primary, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing.dependent;

import java.util.Optional;
import java.util.Map;
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
Expand All @@ -13,24 +14,27 @@
* {@link Creator} and {@link Deleter} interfaces out of the box. A concrete dependent resource can
* implement additionally also {@link Updater}.
*/
public interface BulkDependentResource<R, P extends HasMetadata> extends Creator<R, P>, Deleter<P> {
public interface BulkDependentResource<R, P extends HasMetadata>
extends Creator<R, P>, Deleter<P> {

/**
* @return number of resources to create
*/
int count(P primary, Context<P> context);
Set<String> targetKeys(P primary, Context<P> context);

R desired(P primary, int index, Context<P> context);
Map<String, R> getSecondaryResources(P primary, Context<P> context);

R desired(P primary, String key, Context<P> context);

/**
* Used to delete resource if the desired count is lower than the actual count of a resource.
*
* @param primary resource
* @param resource actual resource from the cache for the index
* @param i index of the resource
* @param key key of the resource
* @param context actual context
*/
void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context);
void deleteBulkResource(P primary, R resource, String key, Context<P> context);

/**
* Determines whether the specified secondary resource matches the desired state with target index
Expand All @@ -39,14 +43,13 @@ public interface BulkDependentResource<R, P extends HasMetadata> extends Creator
*
* @param actualResource the resource we want to determine whether it's matching the desired state
* @param primary the primary resource from which the desired state is inferred
* @param key key of the resource
* @param context the context in which the resource is being matched
* @return a {@link Result} encapsulating whether the resource matched its desired state and this
* associated state if it was computed as part of the matching process. Use the static
* convenience methods ({@link Result#nonComputed(boolean)} and
* {@link Result#computed(boolean, Object)})
*/
Result<R> match(R actualResource, P primary, int index, Context<P> context);

Optional<R> getSecondaryResource(P primary, int index, Context<P> context);
Result<R> match(R actualResource, P primary, String key, Context<P> context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,18 @@ public Result<R> match(R actualResource, P primary, Context<P> context) {
return matcher.match(actualResource, primary, context);
}

public Result<R> match(R actualResource, P primary, int index, Context<P> context) {
final var desired = desired(primary, index, context);
return GenericKubernetesResourceMatcher.match(desired, actualResource, false);
public Result<R> match(R actualResource, P primary, String key, Context<P> context) {
final var desired = bulkDependentResource.desired(primary, key, context);
return GenericKubernetesResourceMatcher.match((R) desired, actualResource, false);
}

protected void handleDelete(P primary, Context<P> context) {
var resource = getSecondaryResource(primary, context);
resource.ifPresent(r -> client.resource(r).delete());
}

public void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context) {

public void deleteBulkResource(P primary, R resource, String key, Context<P> context) {
client.resource(resource).delete();
}

Expand Down Expand Up @@ -229,11 +230,6 @@ protected R desired(P primary, Context<P> context) {
return super.desired(primary, context);
}

@Override
protected R desired(P primary, int index, Context<P> context) {
return super.desired(primary, index, context);
}

private void prepareEventFiltering(R desired, ResourceID resourceID) {
((InformerEventSource<R, P>) eventSource().orElseThrow())
.prepareForCreateOrUpdateEventFiltering(resourceID, desired);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.javaoperatorsdk.operator.sample.bulkdependent;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.*;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
Expand All @@ -27,43 +25,49 @@ public class ConfigMapDeleterBulkDependentResource
public static final String LABEL_KEY = "bulk";
public static final String LABEL_VALUE = "true";
public static final String ADDITIONAL_DATA_KEY = "additionalData";
public static final String INDEX_DELIMITER = "-";

public ConfigMapDeleterBulkDependentResource() {
super(ConfigMap.class);
}

@Override
public ConfigMap desired(BulkDependentTestCustomResource primary,
int index, Context<BulkDependentTestCustomResource> context) {
public Set<String> targetKeys(BulkDependentTestCustomResource primary,
Context<BulkDependentTestCustomResource> context) {
var number = primary.getSpec().getNumberOfResources();
Set<String> res = new HashSet<>();
for (int i = 0; i < number; i++) {
res.add(Integer.toString(i));
}
return res;
}

@Override
public ConfigMap desired(BulkDependentTestCustomResource primary, String key,
Context<BulkDependentTestCustomResource> context) {
ConfigMap configMap = new ConfigMap();
configMap.setMetadata(new ObjectMetaBuilder()
.withName(primary.getMetadata().getName() + "-" + index)
.withName(primary.getMetadata().getName() + INDEX_DELIMITER + key)
.withNamespace(primary.getMetadata().getNamespace())
.withLabels(Map.of(LABEL_KEY, LABEL_VALUE))
.build());
configMap.setData(
Map.of("number", "" + index, ADDITIONAL_DATA_KEY, primary.getSpec().getAdditionalData()));
Map.of("number", "" + key, ADDITIONAL_DATA_KEY, primary.getSpec().getAdditionalData()));
return configMap;
}

@Override
public int count(BulkDependentTestCustomResource primary,
public Map<String, ConfigMap> getSecondaryResources(BulkDependentTestCustomResource primary,
Context<BulkDependentTestCustomResource> context) {
return primary.getSpec().getNumberOfResources();
}

@Override
public Optional<ConfigMap> getSecondaryResource(BulkDependentTestCustomResource primary,
int index, Context<BulkDependentTestCustomResource> context) {
var resources = context.getSecondaryResources(resourceType()).stream()
.filter(r -> r.getMetadata().getName().endsWith("-" + index))
.collect(Collectors.toList());
if (resources.isEmpty()) {
return Optional.empty();
} else if (resources.size() > 1) {
throw new IllegalStateException("More than one resource found for index:" + index);
} else {
return Optional.of(resources.get(0));
}
var configMaps = context.getSecondaryResources(ConfigMap.class);
Map<String, ConfigMap> result = new HashMap<>(configMaps.size());
configMaps.forEach(cm -> {
String name = cm.getMetadata().getName();
if (name.startsWith(primary.getMetadata().getName())) {
String key = name.substring(name.lastIndexOf(INDEX_DELIMITER) + 1);
result.put(key, cm);
}
});
return result;
}
}
Loading