Skip to content

bulk dependent resources #1448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b156a56
feat: improvements on caching and dependent resources
csviri Jul 27, 2022
e20f185
wip
csviri Jul 27, 2022
9103864
kubernetes dependent resource configuration
csviri Jul 28, 2022
0741846
IT fix
csviri Jul 28, 2022
2758551
fixed ITs
csviri Jul 29, 2022
50bd916
index based discriminator
csviri Jul 29, 2022
97ce295
IT fix
csviri Jul 29, 2022
0718875
wip
csviri Jul 29, 2022
b71c7c9
fixes from rebase from next
csviri Aug 26, 2022
6337e44
fix after rebase
csviri Sep 5, 2022
01c33ec
event source provider to context
csviri Sep 6, 2022
b701eb6
todo fixes
csviri Sep 16, 2022
562f7d2
remove void discriminator
csviri Sep 27, 2022
b414d58
fix: bulk creation of dependent resource directly in abstract resource
csviri Sep 8, 2022
3b34d1e
wip
csviri Sep 9, 2022
c7f3671
wip
csviri Sep 9, 2022
759b2e1
wip to start IT
csviri Sep 9, 2022
1b7e714
fixes, progress
csviri Sep 9, 2022
953d274
wp
csviri Sep 12, 2022
98da2e1
matcher
csviri Sep 12, 2022
40c3e47
test passes
csviri Sep 12, 2022
ceca37c
bulk dependent resource to an interface
csviri Sep 13, 2022
abccb4e
wip
csviri Sep 14, 2022
1fe3486
test improvement
csviri Sep 14, 2022
83224c8
note
csviri Sep 14, 2022
2d627f7
wip
csviri Sep 15, 2022
580c62f
rebase on next
csviri Sep 20, 2022
5339b04
increates test timeout
csviri Sep 20, 2022
9d17aba
comment
csviri Sep 20, 2022
1704be9
fix format
csviri Sep 20, 2022
fed173d
wip
csviri Sep 20, 2022
c7db869
delete, other improvements
csviri Sep 21, 2022
da8541e
manage tests, refactored ITs
csviri Sep 21, 2022
714f42c
additionl IT
csviri Sep 21, 2022
57428da
external resource
csviri Sep 21, 2022
13ff73e
external resource IT
csviri Sep 22, 2022
6984571
docs
csviri Sep 22, 2022
d5e814d
refactor: clean-up
metacosm Sep 27, 2022
4e44a50
refactor: make ReconcileResult immutable
metacosm Sep 27, 2022
408fc9c
fix: do not create map if resource is null
metacosm Sep 27, 2022
cabe07d
fix: make things work again after rebase
metacosm Sep 28, 2022
72ebce3
refactor: simplify by removing discriminator factory
metacosm Sep 28, 2022
b5f982f
fix: properly use indexed version
metacosm Sep 28, 2022
da8163b
revert strange comment
csviri Sep 29, 2022
1d303a0
fix: make it clearer when CRD file isn't found
metacosm Aug 29, 2022
e2e6b18
refactor: isolate index handling to BulkDependentResource interface
metacosm Sep 29, 2022
d30b9f7
Revert "refactor: isolate index handling to BulkDependentResource int…
csviri Sep 29, 2022
71b07e9
feat: make it clearer when the CRD file isn't found (#1503)
metacosm Sep 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@
public interface ResourceDiscriminator<R, P extends HasMetadata> {

Optional<R> distinguish(Class<R> resource, P primary, Context<P> context);

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class ReconcileResult<R> {

private final R resource;
private final Operation operation;
private final Map<R, Operation> resourceOperations;

public static <T> ReconcileResult<T> resourceCreated(T resource) {
return new ReconcileResult<>(resource, Operation.CREATED);
Expand All @@ -22,25 +22,49 @@ public static <T> ReconcileResult<T> noOperation(T resource) {
return new ReconcileResult<>(resource, Operation.NONE);
}

@SafeVarargs
public static <T> ReconcileResult<T> aggregatedResult(ReconcileResult<T>... results) {
if (results == null) {
throw new IllegalArgumentException("Should provide results to aggregate");
}
if (results.length == 1) {
return results[0];
}
final Map<T, Operation> operations = new HashMap<>(results.length);
for (ReconcileResult<T> res : results) {
res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
}
return new ReconcileResult<>(operations);
}

@Override
public String toString() {
return getResource()
.map(r -> r instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) r) : r)
.orElse("no resource")
+ " -> " + operation;
return resourceOperations.entrySet().stream().collect(Collectors.toMap(
e -> e instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) e) : e,
Map.Entry::getValue))
.toString();
}

private ReconcileResult(R resource, Operation operation) {
this.resource = resource;
this.operation = operation;
resourceOperations = resource != null ? Map.of(resource, operation) : Collections.emptyMap();
}

private ReconcileResult(Map<R, Operation> operations) {
resourceOperations = Collections.unmodifiableMap(operations);
}

public Optional<R> getSingleResource() {
return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getKey);
}

public Optional<R> getResource() {
return Optional.ofNullable(resource);
public Operation getSingleOperation() {
return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getValue)
.orElseThrow();
}

public Operation getOperation() {
return operation;
@SuppressWarnings("unused")
public Map<R, Operation> getResourceOperations() {
return resourceOperations;
}

public enum Operation {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.dependent;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
Expand All @@ -20,25 +22,73 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>

protected final boolean creatable = this instanceof Creator;
protected final boolean updatable = this instanceof Updater;
protected final boolean bulk = this instanceof BulkDependentResource;

protected Creator<R, P> creator;
protected Updater<R, P> updater;
protected BulkDependentResource<R, P> bulkDependentResource;

private ResourceDiscriminator<R, P> resourceDiscriminator;
private final List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);

@SuppressWarnings("unchecked")
public AbstractDependentResource() {
creator = creatable ? (Creator<R, P>) this : null;
updater = updatable ? (Updater<R, P>) this : null;

bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
}

@Override
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
Optional<R> maybeActual = getSecondaryResource(primary, context);
if (bulk) {
final var count = bulkDependentResource.count(primary, context);
deleteBulkResourcesIfRequired(count, lastKnownBulkSize(), primary, context);
adjustDiscriminators(count);
@SuppressWarnings("unchecked")
final ReconcileResult<R>[] results = new ReconcileResult[count];
for (int i = 0; i < count; i++) {
results[i] = reconcileIndexAware(primary, i, context);
}
return ReconcileResult.aggregatedResult(results);
} else {
return reconcileIndexAware(primary, 0, context);
}
}

protected void deleteBulkResourcesIfRequired(int targetCount, int actualCount, P primary,
Context<P> context) {
if (targetCount >= actualCount) {
return;
}
for (int i = targetCount; i < actualCount; i++) {
var resource = getSecondaryResourceIndexAware(primary, i, context);
var index = i;
resource.ifPresent(
r -> bulkDependentResource.deleteBulkResourceWithIndex(primary, r, index, context));
}
}

private void adjustDiscriminators(int count) {
if (resourceDiscriminator.size() == count) {
return;
}
if (resourceDiscriminator.size() < count) {
for (int i = resourceDiscriminator.size(); i < count; i++) {
resourceDiscriminator.add(bulkDependentResource.getResourceDiscriminator(i));
}
}
if (resourceDiscriminator.size() > count) {
resourceDiscriminator.subList(count, resourceDiscriminator.size()).clear();
}
}

protected ReconcileResult<R> reconcileIndexAware(P primary, int i, Context<P> context) {
Optional<R> maybeActual = bulk ? getSecondaryResourceIndexAware(primary, i, context)
: getSecondaryResource(primary, context);
if (creatable || updatable) {
if (maybeActual.isEmpty()) {
if (creatable) {
var desired = desired(primary, context);
var desired = desiredIndexAware(primary, i, context);
throwIfNull(desired, primary, "Desired");
logForOperation("Creating", primary, desired);
var createdResource = handleCreate(desired, primary, context);
Expand All @@ -47,9 +97,15 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) {
} else {
final var actual = maybeActual.get();
if (updatable) {
final var match = updater.match(actual, primary, context);
final Matcher.Result<R> match;
if (bulk) {
match = updater.match(actual, primary, i, context);
} else {
match = updater.match(actual, primary, context);
}
if (!match.matched()) {
final var desired = match.computedDesired().orElse(desired(primary, context));
final var desired =
match.computedDesired().orElse(desiredIndexAware(primary, i, context));
throwIfNull(desired, primary, "Desired");
logForOperation("Updating", primary, desired);
var updatedResource = handleUpdate(actual, desired, primary, context);
Expand All @@ -67,9 +123,18 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) {
return ReconcileResult.noOperation(maybeActual.orElse(null));
}

private R desiredIndexAware(P primary, int i, Context<P> context) {
return bulk ? desired(primary, i, context)
: desired(primary, context);
}

public Optional<R> getSecondaryResource(P primary, Context<P> context) {
return resourceDiscriminator == null ? context.getSecondaryResource(resourceType())
: resourceDiscriminator.distinguish(resourceType(), primary, context);
return resourceDiscriminator.isEmpty() ? context.getSecondaryResource(resourceType())
: resourceDiscriminator.get(0).distinguish(resourceType(), primary, context);
}

protected Optional<R> getSecondaryResourceIndexAware(P primary, int index, Context<P> context) {
return context.getSecondaryResource(resourceType(), resourceDiscriminator.get(index));
}

private void throwIfNull(R desired, P primary, String descriptor) {
Expand Down Expand Up @@ -97,7 +162,7 @@ protected R handleCreate(R desired, P primary, Context<P> context) {
}

/**
* Allows sub-classes to perform additional processing (e.g. caching) on the created resource if
* Allows subclasses to perform additional processing (e.g. caching) on the created resource if
* needed.
*
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
Expand Down Expand Up @@ -129,12 +194,29 @@ protected R desired(P primary, Context<P> context) {
"desired method must be implemented if this DependentResource can be created and/or updated");
}

public void setResourceDiscriminator(
protected R desired(P primary, int index, Context<P> context) {
throw new IllegalStateException(
"Must be implemented for bulk DependentResource creation");
}

public AbstractDependentResource<R, P> setResourceDiscriminator(
ResourceDiscriminator<R, P> resourceDiscriminator) {
this.resourceDiscriminator = resourceDiscriminator;
if (resourceDiscriminator != null) {
this.resourceDiscriminator.add(resourceDiscriminator);
}
return this;
}

public ResourceDiscriminator<R, P> getResourceDiscriminator() {
return resourceDiscriminator;
if (this.resourceDiscriminator.isEmpty()) {
return null;
} else {
return this.resourceDiscriminator.get(0);
}
}

protected int lastKnownBulkSize() {
return resourceDiscriminator.size();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.javaoperatorsdk.operator.processing.dependent;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;

/**
* Manages dynamic number of resources created for a primary resource. Since the point of a bulk
* dependent resource is to manage the number of secondary resources dynamically it implement
* {@link Creator} and {@link Deleter} interfaces out of the box. A concrete dependent resource can
* implement additionally also {@link Updater}.
*/
public interface BulkDependentResource<R, P extends HasMetadata> extends Creator<R, P>, Deleter<P> {

/**
* @return number of resources to create
*/
int count(P primary, Context<P> context);

R desired(P primary, int index, Context<P> context);

/**
* Used to delete resource if the desired count is lower than the actual count of a resource.
*
* @param primary resource
* @param resource actual resource from the cache for the index
* @param i index of the resource
* @param context actual context
*/
void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context);

ResourceDiscriminator<R, P> getResourceDiscriminator(int index);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.javaoperatorsdk.operator.processing.dependent;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;

/**
* Helper for the Bulk Dependent Resources to make it more explicit that bulk needs to only
* implement the index aware match method.
*
* @param <R> secondary resource type
* @param <P> primary resource type
*/
public interface BulkUpdater<R, P extends HasMetadata> extends Updater<R, P> {

default Matcher.Result<R> match(R actualResource, P primary, Context<P> context) {
throw new IllegalStateException();
}

Matcher.Result<R> match(R actualResource, P primary, int index, Context<P> context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ public Result<R> match(R actualResource, P primary, Context<P> context) {
var desired = abstractDependentResource.desired(primary, context);
return Result.computed(actualResource.equals(desired), desired);
}

@Override
public Result<R> match(R actualResource, P primary, int index, Context<P> context) {
var desired = abstractDependentResource.desired(primary, index, context);
return Result.computed(actualResource.equals(desired), desired);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,19 @@ public Optional<T> computedDesired() {
* {@link Result#computed(boolean, Object)})
*/
Result<R> match(R actualResource, P primary, Context<P> context);

/**
* Determines whether the specified secondary resource matches the desired state with target index
* of a bulk resource as defined from the specified primary resource, given the specified
* {@link Context}.
*
* @param actualResource the resource we want to determine whether it's matching the desired state
* @param primary the primary resource from which the desired state is inferred
* @param context the context in which the resource is being matched
* @return a {@link Result} encapsulating whether the resource matched its desired state and this
* associated state if it was computed as part of the matching process. Use the static
* convenience methods ({@link Result#nonComputed(boolean)} and
* {@link Result#computed(boolean, Object)})
*/
Result<R> match(R actualResource, P primary, int index, Context<P> context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ public interface Updater<R, P extends HasMetadata> {
R update(R actual, R desired, P primary, Context<P> context);

Result<R> match(R actualResource, P primary, Context<P> context);

default Result<R> match(R actualResource, P primary, int index, Context<P> context) {
throw new IllegalStateException("Implement this for bulk matching");
}
}
Loading