Skip to content
Merged

Fix/740 #1067

Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
<plugin>
<groupId>io.spring.javaformat</groupId>
<artifactId>spring-javaformat-maven-plugin</artifactId>
<version>0.0.34</version>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,21 @@ public class KubernetesClientEventBasedConfigMapChangeDetector extends Configura
private final ResourceEventHandler<V1ConfigMap> handler = new ResourceEventHandler<>() {

@Override
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just renames here - nothing else

public void onAdd(V1ConfigMap obj) {
LOG.debug(() -> "ConfigMap " + obj.getMetadata().getName() + " was added.");
onEvent(obj);
public void onAdd(V1ConfigMap configMap) {
LOG.debug(() -> "ConfigMap " + configMap.getMetadata().getName() + " was added.");
onEvent(configMap);
}

@Override
public void onUpdate(V1ConfigMap oldObj, V1ConfigMap newObj) {
LOG.debug(() -> "ConfigMap " + newObj.getMetadata().getName() + " was updated.");
onEvent(newObj);
public void onUpdate(V1ConfigMap oldConfigMap, V1ConfigMap newConfigMap) {
LOG.debug(() -> "ConfigMap " + newConfigMap.getMetadata().getName() + " was updated.");
onEvent(newConfigMap);
}

@Override
public void onDelete(V1ConfigMap obj, boolean deletedFinalStateUnknown) {
LOG.debug(() -> "ConfigMap " + obj.getMetadata() + " was deleted.");
onEvent(obj);
public void onDelete(V1ConfigMap configMap, boolean deletedFinalStateUnknown) {
LOG.debug(() -> "ConfigMap " + configMap.getMetadata().getName() + " was deleted.");
onEvent(configMap);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,21 @@ public class KubernetesClientEventBasedSecretsChangeDetector extends Configurati
private final ResourceEventHandler<V1Secret> handler = new ResourceEventHandler<>() {

@Override
public void onAdd(V1Secret obj) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renames only

LOG.debug(() -> "Secret " + obj.getMetadata().getName() + " was added.");
onEvent(obj);
public void onAdd(V1Secret secret) {
LOG.debug(() -> "Secret " + secret.getMetadata().getName() + " was added.");
onEvent(secret);
}

@Override
public void onUpdate(V1Secret oldObj, V1Secret newObj) {
LOG.debug(() -> "Secret " + newObj.getMetadata().getName() + " was updated.");
onEvent(newObj);
public void onUpdate(V1Secret oldSecret, V1Secret newSecret) {
LOG.debug(() -> "Secret " + newSecret.getMetadata().getName() + " was updated.");
onEvent(newSecret);
}

@Override
public void onDelete(V1Secret obj, boolean deletedFinalStateUnknown) {
LOG.debug(() -> "Secret " + obj.getMetadata() + " was deleted.");
onEvent(obj);
public void onDelete(V1Secret secret, boolean deletedFinalStateUnknown) {
LOG.debug(() -> "Secret " + secret.getMetadata().getName() + " was deleted.");
onEvent(secret);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ final class BusEventBasedConfigMapWatcherChangeDetector extends ConfigMapWatcher
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject configMap) {
return busRefreshTrigger.triggerRefresh(configMap);
public Mono<Void> triggerRefresh(KubernetesObject configMap, String appName) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have multiple apps based in a configMap/secret now

return busRefreshTrigger.triggerRefresh(configMap, appName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ final class BusEventBasedSecretsWatcherChangeDetector extends SecretsWatcherChan
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject secret) {
return busRefreshTrigger.triggerRefresh(secret);
public Mono<Void> triggerRefresh(KubernetesObject secret, String appName) {
return busRefreshTrigger.triggerRefresh(secret, appName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ final class BusRefreshTrigger implements RefreshTrigger {
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject configMap) {
public Mono<Void> triggerRefresh(KubernetesObject configMap, String appName) {
applicationEventPublisher.publishEvent(new RefreshRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(configMap.getMetadata().getName())));
new PathDestinationFactory().getDestination(appName)));
return Mono.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.CONFIG_MAP_APPS_ANNOTATION;
import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.CONFIG_MAP_LABEL;

/**
* @author Ryan Baxter
* @author Kris Iyer
*/
abstract class ConfigMapWatcherChangeDetector extends KubernetesClientEventBasedConfigMapChangeDetector
implements RefreshTrigger {
abstract sealed class ConfigMapWatcherChangeDetector extends KubernetesClientEventBasedConfigMapChangeDetector
implements
RefreshTrigger permits BusEventBasedConfigMapWatcherChangeDetector, HttpBasedConfigMapWatchChangeDetector {

private final ScheduledExecutorService executorService;

Expand All @@ -56,8 +60,8 @@ abstract class ConfigMapWatcherChangeDetector extends KubernetesClientEventBased
@Override
protected final void onEvent(KubernetesObject configMap) {
// this::refreshTrigger is coming from BusEventBasedConfigMapWatcherChangeDetector
WatcherUtil.onEvent(configMap, ConfigurationWatcherConfigurationProperties.CONFIG_LABEL, refreshDelay,
executorService, "config-map", this::triggerRefresh);
WatcherUtil.onEvent(configMap, CONFIG_MAP_LABEL, CONFIG_MAP_APPS_ANNOTATION, refreshDelay, executorService,
"config-map", this::triggerRefresh);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@ public class ConfigurationWatcherConfigurationProperties {
/**
* label to enable refresh/restart when using configmaps.
*/
public static final String CONFIG_LABEL = "spring.cloud.kubernetes.config";
public static final String CONFIG_MAP_LABEL = "spring.cloud.kubernetes.config";

/**
* label to enable refresh/restart when using secrets.
*/
public static final String SECRET_LABEL = "spring.cloud.kubernetes.secret";

/**
* annotation name to enable refresh/restart for specific apps when using configmaps.
*/
public static final String CONFIG_MAP_APPS_ANNOTATION = "spring.cloud.kubernetes.configmap.apps";

/**
* annotation name to enable refresh/restart for specific apps when using secrets.
*/
public static final String SECRET_APPS_ANNOTATION = "spring.cloud.kubernetes.secret.apps";

/**
* Annotation key for actuator port and path.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ final class HttpBasedConfigMapWatchChangeDetector extends ConfigMapWatcherChange
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject configMap) {
return httpRefreshTrigger.triggerRefresh(configMap);
public Mono<Void> triggerRefresh(KubernetesObject configMap, String appName) {
return httpRefreshTrigger.triggerRefresh(configMap, appName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ final class HttpBasedSecretsWatchChangeDetector extends SecretsWatcherChangeDete
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject secret) {
return httpRefreshTrigger.triggerRefresh(secret);
public Mono<Void> triggerRefresh(KubernetesObject secret, String appName) {
return httpRefreshTrigger.triggerRefresh(secret, appName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,14 @@ final class HttpRefreshTrigger implements RefreshTrigger {
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject kubernetesObject) {
public Mono<Void> triggerRefresh(KubernetesObject kubernetesObject, String appName) {

String name = kubernetesObject.getMetadata().getName();

return kubernetesReactiveDiscoveryClient.getInstances(name).flatMap(si -> {
return kubernetesReactiveDiscoveryClient.getInstances(appName).flatMap(si -> {
URI actuatorUri = getActuatorUri(si, k8SConfigurationProperties.getActuatorPath(),
k8SConfigurationProperties.getActuatorPort());
LOG.debug(() -> "Sending refresh request for " + name + " to URI " + actuatorUri);
LOG.debug(() -> "Sending refresh request for " + appName + " to URI " + actuatorUri);
return webClient.post().uri(actuatorUri).retrieve().toBodilessEntity()
.doOnSuccess(onSuccess(name, actuatorUri)).doOnError(onError(name));
.doOnSuccess(onSuccess(appName, actuatorUri)).doOnError(onError(appName));
}).then();
}

Expand Down Expand Up @@ -102,8 +100,7 @@ private void setActuatorUriFromAnnotation(UriComponentsBuilder actuatorUriBuilde

// The URI may not contain a host so if that is the case the port in the URI will
// be -1. The authority of the URI will be :<port> for example :9090, we just need
// the
// 9090 in this case
// the 9090 in this case
if (annotationUri.getPort() < 0) {
if (annotationUri.getAuthority() != null) {
actuatorUriBuilder.port(annotationUri.getAuthority().replaceFirst(":", ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
*
* @author wind57
*/
interface RefreshTrigger {
sealed interface RefreshTrigger permits BusRefreshTrigger, ConfigMapWatcherChangeDetector, HttpRefreshTrigger, SecretsWatcherChangeDetector {

/**
* @param kubernetesObject either a config-map or secret at the moment.
* @param appName which is not necessarily equal to
* kubernetesObject.getMetadata().getName()
*/
Mono<Void> triggerRefresh(KubernetesObject kubernetesObject);
Mono<Void> triggerRefresh(KubernetesObject kubernetesObject, String appName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.SECRET_APPS_ANNOTATION;
import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.SECRET_LABEL;

/**
* @author Ryan Baxter
* @author Kris Iyer
*/
abstract class SecretsWatcherChangeDetector extends KubernetesClientEventBasedSecretsChangeDetector
implements RefreshTrigger {
abstract sealed class SecretsWatcherChangeDetector extends KubernetesClientEventBasedSecretsChangeDetector implements
RefreshTrigger permits BusEventBasedSecretsWatcherChangeDetector, HttpBasedSecretsWatchChangeDetector {

private final ScheduledExecutorService executorService;

Expand All @@ -56,8 +59,8 @@ abstract class SecretsWatcherChangeDetector extends KubernetesClientEventBasedSe
@Override
protected final void onEvent(KubernetesObject secret) {
// this::refreshTrigger is coming from BusEventBasedSecretsWatcherChangeDetector
WatcherUtil.onEvent(secret, ConfigurationWatcherConfigurationProperties.SECRET_LABEL, refreshDelay,
executorService, "secret", this::triggerRefresh);
WatcherUtil.onEvent(secret, SECRET_LABEL, SECRET_APPS_ANNOTATION, refreshDelay, executorService, "secret",
this::triggerRefresh);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@

package org.springframework.cloud.kubernetes.configuration.watcher;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main changes are in this class.

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.core.log.LogAccessor;

import static java.util.Collections.emptySet;

/**
* A common place where 'onEvent' code delegates to.
*
Expand All @@ -40,37 +47,93 @@ final class WatcherUtil {
private WatcherUtil() {
}

static void onEvent(KubernetesObject kubernetesObject, String label, long refreshDelay,
static void onEvent(KubernetesObject kubernetesObject, String label, String annotationName, long refreshDelay,
ScheduledExecutorService executorService, String type,
Function<KubernetesObject, Mono<Void>> triggerRefresh) {
BiFunction<KubernetesObject, String, Mono<Void>> triggerRefresh) {

String name = kubernetesObject.getMetadata().getName();
boolean isSpringCloudKubernetes = isSpringCloudKubernetes(kubernetesObject, label);

if (isSpringCloudKubernetes) {

LOG.debug(() -> "Scheduling remote refresh event to be published for " + type + ": " + name
+ " to be published in " + refreshDelay + " milliseconds");
executorService.schedule(() -> {
try {
triggerRefresh.apply(kubernetesObject).subscribe();
}
catch (Throwable t) {
LOG.warn(t, "Error when refreshing ConfigMap " + name);
}
}, refreshDelay, TimeUnit.MILLISECONDS);
Set<String> apps = apps(kubernetesObject, annotationName);

if (!apps.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the crox of the change is here: if the annotation that says we should restart multiple apps is present - try to get apps from it, and send events based on those names

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could simplify this a bit if we add the name to the set if its empty

LOG.info(() -> "will schedule remote refresh based on apps : " + apps);
apps.forEach(appName -> schedule(type, appName, refreshDelay, executorService, triggerRefresh,
kubernetesObject));
}
else {
LOG.info(() -> "will schedule remote refresh based on name : " + name);
schedule(type, name, refreshDelay, executorService, triggerRefresh, kubernetesObject);
}

}
else {
LOG.debug(() -> "Not publishing event." + type + ": + name + does not contain the label " + label);
LOG.debug(() -> "Not publishing event." + type + ": " + name + " does not contain the label " + label);
}
}

private static boolean isSpringCloudKubernetes(KubernetesObject kubernetesObject, String label) {
static boolean isSpringCloudKubernetes(KubernetesObject kubernetesObject, String label) {
if (kubernetesObject.getMetadata() == null) {
return false;
}
return Boolean.parseBoolean(Optional.ofNullable(kubernetesObject.getMetadata().getLabels())
.orElse(Collections.emptyMap()).getOrDefault(label, "false"));
return Boolean.parseBoolean(labels(kubernetesObject).getOrDefault(label, "false"));
}

static Set<String> apps(KubernetesObject kubernetesObject, String annotationName) {

Map<String, String> annotations = annotations(kubernetesObject);

if (annotations.isEmpty()) {
LOG.debug(() -> annotationName + " not present (empty data)");
return emptySet();
}

String appsValue = annotations.get(annotationName);

if (appsValue == null) {
LOG.debug(() -> annotationName + " not present (missing in annotations)");
return emptySet();
}

if (appsValue.isBlank()) {
LOG.debug(() -> appsValue + " not present (blanks only)");
return emptySet();
}

return Arrays.stream(appsValue.split(",")).map(String::trim).collect(Collectors.toSet());
}

static Map<String, String> labels(KubernetesObject kubernetesObject) {
V1ObjectMeta metadata = kubernetesObject.getMetadata();
if (metadata == null) {
return Map.of();
}
return Optional.ofNullable(metadata.getLabels()).orElse(Map.of());
}

static Map<String, String> annotations(KubernetesObject kubernetesObject) {
V1ObjectMeta metadata = kubernetesObject.getMetadata();
if (metadata == null) {
return Map.of();
}
return Optional.ofNullable(metadata.getAnnotations()).orElse(Collections.emptyMap());
}

private static void schedule(String type, String appName, long refreshDelay,
ScheduledExecutorService executorService, BiFunction<KubernetesObject, String, Mono<Void>> triggerRefresh,
KubernetesObject kubernetesObject) {
LOG.debug(() -> "Scheduling remote refresh event to be published for " + type + ": with appName : " + appName
+ " to be published in " + refreshDelay + " milliseconds");
executorService.schedule(() -> {
try {
triggerRefresh.apply(kubernetesObject, appName).subscribe();
}
catch (Throwable t) {
LOG.warn(t, "Error when refreshing appName " + appName);
}
}, refreshDelay, TimeUnit.MILLISECONDS);
}

}
Loading