Skip to content

Commit a82d830

Browse files
committed
tmp
1 parent 083bc49 commit a82d830

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1273
-644
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package flp
2+
3+
import (
4+
"fmt"
5+
"hash/fnv"
6+
"strconv"
7+
"strings"
8+
9+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
10+
"github.com/netobserv/network-observability-operator/controllers/constants"
11+
"github.com/netobserv/network-observability-operator/pkg/helper"
12+
"gopkg.in/yaml.v2"
13+
14+
corev1 "k8s.io/api/core/v1"
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
)
17+
18+
const (
19+
flpCacheName = constants.FLPName + "-cache"
20+
flpCacheConfigMap = "flp-cache-config"
21+
flpCacheTopic = "informers"
22+
)
23+
24+
func (b *builder) cachePodTemplate(annotations map[string]string) corev1.PodTemplateSpec {
25+
advancedConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
26+
var ports []corev1.ContainerPort
27+
28+
if advancedConfig.ProfilePort != nil {
29+
ports = append(ports, corev1.ContainerPort{
30+
Name: profilePortName,
31+
ContainerPort: *advancedConfig.ProfilePort,
32+
Protocol: corev1.ProtocolTCP,
33+
})
34+
}
35+
36+
volumeMounts := b.volumes.AppendMounts([]corev1.VolumeMount{{
37+
MountPath: configPath + "-cache",
38+
Name: configVolume + "-cache",
39+
}})
40+
volumes := b.volumes.AppendVolumes([]corev1.Volume{{
41+
Name: configVolume + "-cache",
42+
VolumeSource: corev1.VolumeSource{
43+
ConfigMap: &corev1.ConfigMapVolumeSource{
44+
LocalObjectReference: corev1.LocalObjectReference{
45+
Name: flpCacheConfigMap,
46+
},
47+
},
48+
},
49+
}})
50+
51+
var envs []corev1.EnvVar
52+
envs = append(envs, constants.EnvNoHTTP2)
53+
imageName := strings.Replace(b.info.Image, "flowlogs-pipeline", "flowlogs-pipeline-cache", 1)
54+
55+
container := corev1.Container{
56+
Name: flpCacheName,
57+
Image: imageName,
58+
ImagePullPolicy: corev1.PullPolicy(b.desired.Processor.ImagePullPolicy),
59+
Args: []string{fmt.Sprintf(`--config=%s/%s`, configPath+"-cache", configFile)},
60+
Resources: *b.desired.Processor.Resources.DeepCopy(),
61+
VolumeMounts: volumeMounts,
62+
Ports: ports,
63+
Env: envs,
64+
SecurityContext: helper.ContainerDefaultSecurityContext(),
65+
}
66+
return corev1.PodTemplateSpec{
67+
ObjectMeta: metav1.ObjectMeta{
68+
Labels: b.cacheLabels,
69+
Annotations: annotations,
70+
},
71+
Spec: corev1.PodSpec{
72+
Volumes: volumes,
73+
Containers: []corev1.Container{container},
74+
ServiceAccountName: b.name(),
75+
NodeSelector: advancedConfig.Scheduling.NodeSelector,
76+
Tolerations: advancedConfig.Scheduling.Tolerations,
77+
Affinity: advancedConfig.Scheduling.Affinity,
78+
PriorityClassName: advancedConfig.Scheduling.PriorityClassName,
79+
},
80+
}
81+
}
82+
83+
type CacheConfig struct {
84+
KubeConfigPath string `yaml:"kubeConfigPath"`
85+
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
86+
PProfPort int32 `yaml:"pprofPort"`
87+
LogLevel string `yaml:"logLevel"`
88+
}
89+
90+
// returns a configmap with a digest of its configuration contents, which will be used to
91+
// detect any configuration change
92+
func (b *builder) cacheConfigMap() (*corev1.ConfigMap, string, error) {
93+
// Re-use the initial stage (which should be Kafka ingester), with a different topic
94+
// TODO: that's ugly and deserves more refactoring
95+
params := b.pipeline.GetStageParams()[0]
96+
97+
kafkaSpec := b.desired.Kafka
98+
cc := CacheConfig{
99+
LogLevel: b.desired.Processor.LogLevel,
100+
KafkaConfig: api.EncodeKafka{
101+
Address: kafkaSpec.Address,
102+
Topic: flpCacheTopic,
103+
TLS: params.Ingest.Kafka.TLS,
104+
SASL: params.Ingest.Kafka.SASL,
105+
},
106+
}
107+
advancedConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
108+
if advancedConfig.ProfilePort != nil {
109+
cc.PProfPort = *advancedConfig.ProfilePort
110+
}
111+
112+
bs, err := yaml.Marshal(cc)
113+
if err != nil {
114+
return nil, "", err
115+
}
116+
117+
configMap := corev1.ConfigMap{
118+
ObjectMeta: metav1.ObjectMeta{
119+
Name: flpCacheConfigMap,
120+
Namespace: b.info.Namespace,
121+
Labels: b.cacheLabels,
122+
},
123+
Data: map[string]string{
124+
configFile: string(bs),
125+
},
126+
}
127+
hasher := fnv.New64a()
128+
_, _ = hasher.Write(bs)
129+
digest := strconv.FormatUint(hasher.Sum64(), 36)
130+
return &configMap, digest, nil
131+
}

controllers/flp/flp_common_objects.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type Builder struct {
5353
info *reconcilers.Instance
5454
labels map[string]string
5555
selector map[string]string
56+
cacheLabels map[string]string
57+
cacheSelector map[string]string
5658
desired *flowslatest.FlowCollectorSpec
5759
flowMetrics *metricslatest.FlowMetricList
5860
detectedSubnets []flowslatest.SubnetLabel
@@ -95,6 +97,13 @@ func NewBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSp
9597
selector: map[string]string{
9698
"app": name,
9799
},
100+
cacheLabels: map[string]string{
101+
"app": flpCacheName,
102+
"version": helper.MaxLabelLength(version),
103+
},
104+
cacheSelector: map[string]string{
105+
"app": flpCacheName,
106+
},
98107
desired: desired,
99108
flowMetrics: flowMetrics,
100109
detectedSubnets: detectedSubnets,
@@ -192,7 +201,6 @@ func (b *builder) podTemplate(hasHostPort, hostNetwork bool, annotations map[str
192201
}})
193202

194203
var envs []corev1.EnvVar
195-
advancedConfig = helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
196204
// we need to sort env map to keep idempotency,
197205
// as equal maps could be iterated in different order
198206
for _, pair := range helper.KeySorted(advancedConfig.Env) {

controllers/flp/flp_pipeline_builder.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,18 @@ func (b *PipelineBuilder) AddProcessorStages() error {
126126
}...)
127127
}
128128

129+
// enrichment using Kafka cache
130+
var kafkaCache *api.IngestKafka
131+
if helper.UseKafka(b.desired) {
132+
// Re-use the initial stage (which should be Kafka ingester), with a different topic
133+
// TODO: that's ugly and deserves more refactoring
134+
params := b.GetStageParams()[0]
135+
kc := *params.Ingest.Kafka
136+
kc.Topic = flpCacheTopic
137+
kc.GroupID = ""
138+
kafkaCache = &kc
139+
}
140+
129141
// enrich stage (transform) configuration
130142
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
131143
Rules: rules,
@@ -135,7 +147,8 @@ func (b *PipelineBuilder) AddProcessorStages() error {
135147
DstHostField: "DstK8S_HostIP",
136148
FlowDirectionField: "FlowDirection",
137149
},
138-
SubnetLabels: flpLabels,
150+
SubnetLabels: flpLabels,
151+
KafkaCacheConfig: kafkaCache,
139152
})
140153

141154
// loki stage (write) configuration

controllers/flp/flp_transfo_objects.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
corev1 "k8s.io/api/core/v1"
77
rbacv1 "k8s.io/api/rbac/v1"
88
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/utils/ptr"
910

1011
flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
1112
metricslatest "github.com/netobserv/network-observability-operator/apis/flowmetrics/v1alpha1"
@@ -41,6 +42,24 @@ func (b *transfoBuilder) deployment(annotations map[string]string) *appsv1.Deplo
4142
}
4243
}
4344

45+
func (b *transfoBuilder) cacheDeployment(annotations map[string]string) *appsv1.Deployment {
46+
pod := b.generic.cachePodTemplate(annotations)
47+
return &appsv1.Deployment{
48+
ObjectMeta: metav1.ObjectMeta{
49+
Name: flpCacheName,
50+
Namespace: b.generic.info.Namespace,
51+
Labels: b.generic.cacheLabels,
52+
},
53+
Spec: appsv1.DeploymentSpec{
54+
Replicas: ptr.To(int32(1)),
55+
Selector: &metav1.LabelSelector{
56+
MatchLabels: b.generic.cacheSelector,
57+
},
58+
Template: pod,
59+
},
60+
}
61+
}
62+
4463
func (b *transfoBuilder) staticConfigMap() (*corev1.ConfigMap, string, error) {
4564
pipeline := b.generic.NewKafkaPipeline()
4665
err := pipeline.AddProcessorStages()

controllers/flp/flp_transfo_reconciler.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import (
2222
type transformerReconciler struct {
2323
*reconcilers.Instance
2424
deployment *appsv1.Deployment
25+
cacheDeployment *appsv1.Deployment
2526
promService *corev1.Service
2627
hpa *ascv2.HorizontalPodAutoscaler
2728
serviceAccount *corev1.ServiceAccount
2829
staticConfigMap *corev1.ConfigMap
2930
dynamicConfigMap *corev1.ConfigMap
31+
cacheConfigMap *corev1.ConfigMap
3032
roleBinding *rbacv1.ClusterRoleBinding
3133
serviceMonitor *monitoringv1.ServiceMonitor
3234
prometheusRule *monitoringv1.PrometheusRule
@@ -37,11 +39,13 @@ func newTransformerReconciler(cmn *reconcilers.Instance) *transformerReconciler
3739
rec := transformerReconciler{
3840
Instance: cmn,
3941
deployment: cmn.Managed.NewDeployment(name),
42+
cacheDeployment: cmn.Managed.NewDeployment(flpCacheName),
4043
promService: cmn.Managed.NewService(promServiceName(ConfKafkaTransformer)),
4144
hpa: cmn.Managed.NewHPA(name),
4245
serviceAccount: cmn.Managed.NewServiceAccount(name),
4346
staticConfigMap: cmn.Managed.NewConfigMap(staticConfigMapName(ConfKafkaTransformer)),
4447
dynamicConfigMap: cmn.Managed.NewConfigMap(dynamicConfigMapName(ConfKafkaTransformer)),
48+
cacheConfigMap: cmn.Managed.NewConfigMap(flpCacheConfigMap),
4549
roleBinding: cmn.Managed.NewCRB(RoleBindingName(ConfKafkaTransformer)),
4650
}
4751
if cmn.AvailableAPIs.HasSvcMonitor() {
@@ -86,13 +90,13 @@ func (r *transformerReconciler) reconcile(ctx context.Context, desired *flowslat
8690
if err != nil {
8791
return err
8892
}
93+
94+
// Main, static config map
8995
newSCM, configDigest, err := builder.staticConfigMap()
9096
if err != nil {
9197
return err
9298
}
93-
annotations := map[string]string{
94-
constants.PodConfigurationDigest: configDigest,
95-
}
99+
annotations := map[string]string{constants.PodConfigurationDigest: configDigest}
96100
if !r.Managed.Exists(r.staticConfigMap) {
97101
if err := r.CreateOwned(ctx, newSCM); err != nil {
98102
return err
@@ -103,6 +107,23 @@ func (r *transformerReconciler) reconcile(ctx context.Context, desired *flowslat
103107
}
104108
}
105109

110+
// Cache config map
111+
// TODO: factorize with main static CM code
112+
newCCM, configDigest, err := builder.generic.cacheConfigMap()
113+
if err != nil {
114+
return err
115+
}
116+
cacheAnnotations := map[string]string{constants.PodConfigurationDigest: configDigest}
117+
if !r.Managed.Exists(r.cacheConfigMap) {
118+
if err := r.CreateOwned(ctx, newCCM); err != nil {
119+
return err
120+
}
121+
} else if !equality.Semantic.DeepDerivative(newCCM.Data, r.cacheConfigMap.Data) {
122+
if err := r.UpdateIfOwned(ctx, r.cacheConfigMap, newCCM); err != nil {
123+
return err
124+
}
125+
}
126+
106127
if err := r.reconcileDynamicConfigMap(ctx, &builder); err != nil {
107128
return err
108129
}
@@ -125,6 +146,7 @@ func (r *transformerReconciler) reconcile(ctx context.Context, desired *flowslat
125146
}
126147

127148
// Watch for Kafka certificate if necessary; need to restart pods in case of cert rotation
149+
// TODO: cacheAnnotations
128150
if err = annotateKafkaCerts(ctx, r.Common, &desired.Spec.Kafka, "kafka", annotations); err != nil {
129151
return err
130152
}
@@ -137,7 +159,7 @@ func (r *transformerReconciler) reconcile(ctx context.Context, desired *flowslat
137159
return err
138160
}
139161

140-
if err = r.reconcileDeployment(ctx, &desired.Spec.Processor, &builder, annotations); err != nil {
162+
if err = r.reconcileDeployment(ctx, &desired.Spec.Processor, &builder, annotations, cacheAnnotations); err != nil {
141163
return err
142164
}
143165

@@ -161,11 +183,11 @@ func (r *transformerReconciler) reconcileDynamicConfigMap(ctx context.Context, b
161183
return nil
162184
}
163185

164-
func (r *transformerReconciler) reconcileDeployment(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *transfoBuilder, annotations map[string]string) error {
186+
func (r *transformerReconciler) reconcileDeployment(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *transfoBuilder, annotations, cacheAnnots map[string]string) error {
165187
report := helper.NewChangeReport("FLP Deployment")
166188
defer report.LogIfNeeded(ctx)
167189

168-
return reconcilers.ReconcileDeployment(
190+
if err := reconcilers.ReconcileDeployment(
169191
ctx,
170192
r.Instance,
171193
r.deployment,
@@ -174,7 +196,10 @@ func (r *transformerReconciler) reconcileDeployment(ctx context.Context, desired
174196
helper.PtrInt32(desiredFLP.KafkaConsumerReplicas),
175197
&desiredFLP.KafkaConsumerAutoscaler,
176198
&report,
177-
)
199+
); err != nil {
200+
return err
201+
}
202+
return reconcilers.ReconcileDeployment(ctx, r.Instance, r.cacheDeployment, builder.cacheDeployment(cacheAnnots), flpCacheName, 1, nil, &report)
178203
}
179204

180205
func (r *transformerReconciler) reconcileHPA(ctx context.Context, desiredFLP *flowslatest.FlowCollectorFLP, builder *transfoBuilder) error {

go.mod

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ require (
5757
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
5858
github.com/pkg/errors v0.9.1 // indirect
5959
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
60-
github.com/prometheus/client_golang v1.19.0 // indirect
60+
github.com/prometheus/client_golang v1.19.1 // indirect
6161
github.com/prometheus/client_model v0.5.0 // indirect
6262
github.com/prometheus/procfs v0.12.0 // indirect
6363
github.com/sirupsen/logrus v1.9.3 // indirect
@@ -66,15 +66,15 @@ require (
6666
go.uber.org/multierr v1.11.0 // indirect
6767
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
6868
golang.org/x/net v0.25.0 // indirect
69-
golang.org/x/oauth2 v0.17.0 // indirect
69+
golang.org/x/oauth2 v0.18.0 // indirect
7070
golang.org/x/sys v0.20.0 // indirect
7171
golang.org/x/term v0.20.0 // indirect
7272
golang.org/x/text v0.15.0 // indirect
7373
golang.org/x/time v0.5.0 // indirect
7474
golang.org/x/tools v0.21.0 // indirect
7575
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
7676
google.golang.org/appengine v1.6.8 // indirect
77-
google.golang.org/protobuf v1.33.0 // indirect
77+
google.golang.org/protobuf v1.34.1 // indirect
7878
gopkg.in/inf.v0 v0.9.1 // indirect
7979
gopkg.in/yaml.v3 v3.0.1 // indirect
8080
k8s.io/klog/v2 v2.120.1 // indirect
@@ -84,3 +84,5 @@ require (
8484
)
8585

8686
replace github.com/prometheus/common v0.48.0 => github.com/netobserv/prometheus-common v0.48.0-netobserv
87+
88+
replace github.com/netobserv/flowlogs-pipeline => ../flowlogs-pipeline

0 commit comments

Comments
 (0)