Skip to content

Commit 0041867

Browse files
Copilotalvarolopez
andcommitted
feat: add mapping configuration between record types and messengers
Co-authored-by: alvarolopez <[email protected]>
1 parent 741c448 commit 0041867

File tree

3 files changed

+320
-2
lines changed

3 files changed

+320
-2
lines changed

caso/messenger/__init__.py

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,78 @@
1717
"""Module containing the base class and manager for the cASO messengers."""
1818

1919
import abc
20+
import typing
2021

2122
from oslo_config import cfg
2223
from oslo_log import log
2324
import six
2425

26+
import caso.record
2527
from caso import loading
2628

2729
CONF = cfg.CONF
2830

2931
LOG = log.getLogger(__name__)
3032

33+
# Valid record types that can be configured
34+
VALID_RECORD_TYPES = frozenset(["cloud", "ip", "accelerator", "storage", "energy"])
35+
36+
# Default record types for SSM messenger (records from default extractors)
37+
# Default extractors are: nova, cinder, neutron
38+
# nova -> CloudRecord, AcceleratorRecord
39+
# cinder -> StorageRecord
40+
# neutron -> IPRecord
41+
DEFAULT_SSM_RECORD_TYPES = ["cloud", "ip", "accelerator", "storage"]
42+
43+
44+
def get_messenger_opts(messenger_name: str) -> typing.List[cfg.Opt]:
45+
"""Get the configuration options for a specific messenger.
46+
47+
:param messenger_name: Name of the messenger.
48+
:returns: List of configuration options for the messenger.
49+
"""
50+
# SSM messengers have a different default (only records from default extractors)
51+
if messenger_name in ("ssm", "ssmv4"):
52+
default_record_types = DEFAULT_SSM_RECORD_TYPES
53+
else:
54+
default_record_types = []
55+
56+
return [
57+
cfg.ListOpt(
58+
"record_types",
59+
default=default_record_types,
60+
help="List of record types to publish to this messenger. "
61+
"Valid values are: cloud, ip, accelerator, storage, energy. "
62+
"If empty, all record types will be published. "
63+
f"Default for {messenger_name}: "
64+
f"{default_record_types if default_record_types else 'all record types'}.",
65+
),
66+
]
67+
68+
69+
def register_messenger_opts(messenger_names: typing.Optional[typing.List[str]] = None):
70+
"""Register configuration options for the specified messengers.
71+
72+
:param messenger_names: List of messenger names to register options for.
73+
If None, registers options for all available messengers.
74+
"""
75+
if messenger_names is None:
76+
messenger_names = list(loading.get_available_messenger_names())
77+
78+
for messenger_name in messenger_names:
79+
group_name = f"messenger_{messenger_name}"
80+
CONF.register_opts(get_messenger_opts(messenger_name), group=group_name)
81+
82+
83+
# Mapping from record type names to record classes
84+
RECORD_TYPE_MAP: typing.Dict[str, type] = {
85+
"cloud": caso.record.CloudRecord,
86+
"ip": caso.record.IPRecord,
87+
"accelerator": caso.record.AcceleratorRecord,
88+
"storage": caso.record.StorageRecord,
89+
"energy": caso.record.EnergyRecord,
90+
}
91+
3192

3293
@six.add_metaclass(abc.ABCMeta)
3394
class BaseMessenger(object):
@@ -38,22 +99,77 @@ def push(self, records):
3899
"""Push the records."""
39100

40101

102+
def _filter_records(
103+
records: typing.List,
104+
record_types: typing.Optional[typing.List[str]],
105+
) -> typing.List:
106+
"""Filter records based on allowed record types.
107+
108+
:param records: List of records to filter.
109+
:param record_types: List of allowed record type names. If None or empty,
110+
all records are returned.
111+
:returns: Filtered list of records.
112+
"""
113+
if not record_types:
114+
return records
115+
116+
allowed_classes = tuple(
117+
RECORD_TYPE_MAP[rt] for rt in record_types if rt in RECORD_TYPE_MAP
118+
)
119+
if not allowed_classes:
120+
return records
121+
122+
return [r for r in records if isinstance(r, allowed_classes)]
123+
124+
41125
class Manager(object):
42126
"""Manager for all cASO messengers."""
43127

44128
def __init__(self):
45129
"""Init the manager with all the configured messengers."""
130+
# Register messenger options for all configured messengers
131+
register_messenger_opts(CONF.messengers)
132+
46133
try:
47134
self.mgr = loading.get_enabled_messengers(CONF.messengers)
48135
except Exception as e:
49136
# Capture exception so that we can continue working
50137
LOG.error(e)
51138
raise e
52139

140+
# Build mapping of messenger name to allowed record types
141+
self.messenger_record_types: typing.Dict[
142+
str, typing.Optional[typing.List[str]]
143+
] = {}
144+
for messenger_name in CONF.messengers:
145+
group_name = f"messenger_{messenger_name}"
146+
if hasattr(CONF, group_name):
147+
group = getattr(CONF, group_name)
148+
if hasattr(group, "record_types"):
149+
record_types = group.record_types
150+
if record_types:
151+
self.messenger_record_types[messenger_name] = list(record_types)
152+
else:
153+
self.messenger_record_types[messenger_name] = None
154+
else:
155+
self.messenger_record_types[messenger_name] = None
156+
else:
157+
self.messenger_record_types[messenger_name] = None
158+
53159
def push_to_all(self, records):
54160
"""Push records to all the configured messengers."""
55161
try:
56-
self.mgr.map_method("push", records)
162+
for ext in self.mgr:
163+
messenger_name = ext.name
164+
record_types = self.messenger_record_types.get(messenger_name)
165+
filtered_records = _filter_records(records, record_types)
166+
if filtered_records:
167+
ext.obj.push(filtered_records)
168+
else:
169+
LOG.debug(
170+
f"No records to push to messenger {messenger_name} "
171+
f"after filtering for record types: {record_types}"
172+
)
57173
except Exception as e:
58174
# Capture exception so that we can continue working
59175
LOG.error("Something happeneded when pushing records.")

caso/opts.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@
2323
import caso.extract.openstack.nova
2424
import caso.extract.prometheus
2525
import caso.keystone_client
26+
import caso.loading
2627
import caso.manager
28+
import caso.messenger
2729
import caso.messenger.logstash
2830
import caso.messenger.ssm
2931

3032

3133
def list_opts():
3234
"""Get the list of all configured options."""
33-
return [
35+
opts = [
3436
(
3537
"DEFAULT",
3638
itertools.chain(
@@ -47,3 +49,10 @@ def list_opts():
4749
("prometheus", caso.extract.prometheus.opts),
4850
("ssm", caso.messenger.ssm.opts),
4951
]
52+
53+
# Add messenger-specific record_types options for all available messengers
54+
for messenger_name in caso.loading.get_available_messenger_names():
55+
group_name = f"messenger_{messenger_name}"
56+
opts.append((group_name, caso.messenger.get_messenger_opts(messenger_name)))
57+
58+
return opts

caso/tests/test_messenger.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2014 Spanish National Research Council (CSIC)
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License. You may obtain
7+
# a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
# License for the specific language governing permissions and limitations
15+
# under the License.
16+
17+
"""Tests for messenger module."""
18+
19+
import caso.messenger
20+
import caso.record
21+
22+
23+
class TestFilterRecords:
24+
"""Test cases for the _filter_records function."""
25+
26+
def test_filter_records_returns_all_when_no_filter(
27+
self, cloud_record, ip_record, storage_record
28+
):
29+
"""Test that all records are returned when no filter is specified."""
30+
records = [cloud_record, ip_record, storage_record]
31+
result = caso.messenger._filter_records(records, None)
32+
assert result == records
33+
34+
def test_filter_records_returns_all_when_empty_filter(
35+
self, cloud_record, ip_record, storage_record
36+
):
37+
"""Test that all records are returned when filter list is empty."""
38+
records = [cloud_record, ip_record, storage_record]
39+
result = caso.messenger._filter_records(records, [])
40+
assert result == records
41+
42+
def test_filter_records_filters_by_cloud(
43+
self, cloud_record, ip_record, storage_record
44+
):
45+
"""Test filtering for cloud records only."""
46+
records = [cloud_record, ip_record, storage_record]
47+
result = caso.messenger._filter_records(records, ["cloud"])
48+
assert len(result) == 1
49+
assert isinstance(result[0], caso.record.CloudRecord)
50+
51+
def test_filter_records_filters_by_ip(
52+
self, cloud_record, ip_record, storage_record
53+
):
54+
"""Test filtering for IP records only."""
55+
records = [cloud_record, ip_record, storage_record]
56+
result = caso.messenger._filter_records(records, ["ip"])
57+
assert len(result) == 1
58+
assert isinstance(result[0], caso.record.IPRecord)
59+
60+
def test_filter_records_filters_by_storage(
61+
self, cloud_record, ip_record, storage_record
62+
):
63+
"""Test filtering for storage records only."""
64+
records = [cloud_record, ip_record, storage_record]
65+
result = caso.messenger._filter_records(records, ["storage"])
66+
assert len(result) == 1
67+
assert isinstance(result[0], caso.record.StorageRecord)
68+
69+
def test_filter_records_filters_by_multiple_types(
70+
self, cloud_record, ip_record, storage_record
71+
):
72+
"""Test filtering for multiple record types."""
73+
records = [cloud_record, ip_record, storage_record]
74+
result = caso.messenger._filter_records(records, ["cloud", "ip"])
75+
assert len(result) == 2
76+
assert any(isinstance(r, caso.record.CloudRecord) for r in result)
77+
assert any(isinstance(r, caso.record.IPRecord) for r in result)
78+
79+
def test_filter_records_handles_accelerator(self, accelerator_record):
80+
"""Test filtering for accelerator records."""
81+
records = [accelerator_record]
82+
result = caso.messenger._filter_records(records, ["accelerator"])
83+
assert len(result) == 1
84+
assert isinstance(result[0], caso.record.AcceleratorRecord)
85+
86+
def test_filter_records_handles_energy(self, energy_record):
87+
"""Test filtering for energy records."""
88+
records = [energy_record]
89+
result = caso.messenger._filter_records(records, ["energy"])
90+
assert len(result) == 1
91+
assert isinstance(result[0], caso.record.EnergyRecord)
92+
93+
def test_filter_records_returns_empty_when_no_match(
94+
self, cloud_record, ip_record
95+
):
96+
"""Test that empty list is returned when no records match filter."""
97+
records = [cloud_record, ip_record]
98+
result = caso.messenger._filter_records(records, ["storage", "energy"])
99+
assert result == []
100+
101+
def test_filter_records_ignores_invalid_types(
102+
self, cloud_record, ip_record
103+
):
104+
"""Test that invalid record types are ignored in the filter."""
105+
records = [cloud_record, ip_record]
106+
result = caso.messenger._filter_records(records, ["invalid_type"])
107+
# When all filter types are invalid, return all records
108+
assert result == records
109+
110+
def test_filter_records_mixed_valid_invalid_types(
111+
self, cloud_record, ip_record, storage_record
112+
):
113+
"""Test filtering with mix of valid and invalid types."""
114+
records = [cloud_record, ip_record, storage_record]
115+
result = caso.messenger._filter_records(
116+
records, ["cloud", "invalid_type"]
117+
)
118+
assert len(result) == 1
119+
assert isinstance(result[0], caso.record.CloudRecord)
120+
121+
122+
class TestGetMessengerOpts:
123+
"""Test cases for the get_messenger_opts function."""
124+
125+
def test_ssm_messenger_has_default_record_types(self):
126+
"""Test that SSM messenger has default record types."""
127+
opts = caso.messenger.get_messenger_opts("ssm")
128+
assert len(opts) == 1
129+
opt = opts[0]
130+
assert opt.name == "record_types"
131+
assert opt.default == caso.messenger.DEFAULT_SSM_RECORD_TYPES
132+
133+
def test_ssmv4_messenger_has_default_record_types(self):
134+
"""Test that SSMv4 messenger has default record types."""
135+
opts = caso.messenger.get_messenger_opts("ssmv4")
136+
assert len(opts) == 1
137+
opt = opts[0]
138+
assert opt.name == "record_types"
139+
assert opt.default == caso.messenger.DEFAULT_SSM_RECORD_TYPES
140+
141+
def test_noop_messenger_has_empty_default(self):
142+
"""Test that noop messenger has empty default (all record types)."""
143+
opts = caso.messenger.get_messenger_opts("noop")
144+
assert len(opts) == 1
145+
opt = opts[0]
146+
assert opt.name == "record_types"
147+
assert opt.default == []
148+
149+
def test_logstash_messenger_has_empty_default(self):
150+
"""Test that logstash messenger has empty default (all record types)."""
151+
opts = caso.messenger.get_messenger_opts("logstash")
152+
assert len(opts) == 1
153+
opt = opts[0]
154+
assert opt.name == "record_types"
155+
assert opt.default == []
156+
157+
158+
class TestDefaultSsmRecordTypes:
159+
"""Test cases for default SSM record types."""
160+
161+
def test_default_ssm_record_types_includes_cloud(self):
162+
"""Test that default SSM record types includes cloud."""
163+
assert "cloud" in caso.messenger.DEFAULT_SSM_RECORD_TYPES
164+
165+
def test_default_ssm_record_types_includes_ip(self):
166+
"""Test that default SSM record types includes ip."""
167+
assert "ip" in caso.messenger.DEFAULT_SSM_RECORD_TYPES
168+
169+
def test_default_ssm_record_types_includes_accelerator(self):
170+
"""Test that default SSM record types includes accelerator."""
171+
assert "accelerator" in caso.messenger.DEFAULT_SSM_RECORD_TYPES
172+
173+
def test_default_ssm_record_types_includes_storage(self):
174+
"""Test that default SSM record types includes storage."""
175+
assert "storage" in caso.messenger.DEFAULT_SSM_RECORD_TYPES
176+
177+
def test_default_ssm_record_types_excludes_energy(self):
178+
"""Test that default SSM record types excludes energy."""
179+
assert "energy" not in caso.messenger.DEFAULT_SSM_RECORD_TYPES
180+
181+
182+
class TestValidRecordTypes:
183+
"""Test cases for VALID_RECORD_TYPES constant."""
184+
185+
def test_valid_record_types_contains_all_types(self):
186+
"""Test that VALID_RECORD_TYPES contains all expected types."""
187+
expected = {"cloud", "ip", "accelerator", "storage", "energy"}
188+
assert caso.messenger.VALID_RECORD_TYPES == expected
189+
190+
def test_record_type_map_matches_valid_record_types(self):
191+
"""Test that RECORD_TYPE_MAP keys match VALID_RECORD_TYPES."""
192+
assert set(caso.messenger.RECORD_TYPE_MAP.keys()) == \
193+
caso.messenger.VALID_RECORD_TYPES

0 commit comments

Comments
 (0)