1+ #include " StreamHandler.h"
2+
3+ #include < catch2/catch.hpp>
4+
5+ using namespace visor ;
6+
7+ namespace group {
8+ enum TestHandler : MetricGroupIntType {
9+ Test,
10+ };
11+ }
12+
13+ class HandlerBucket : public AbstractMetricsBucket
14+ {
15+ public:
16+ std::string type;
17+ void specialized_merge (const AbstractMetricsBucket &, Metric::Aggregate) override {};
18+ void to_json (json &) const override {};
19+ void to_prometheus (std::stringstream &,
20+ Metric::LabelMap) const override {};
21+ void to_opentelemetry (metrics::v1::ScopeMetrics &, timespec &, timespec &, Metric::LabelMap) const override {};
22+ void update_topn_metrics (size_t , uint64_t ) override {};
23+ };
24+
25+ class TestHandlerMetricsManager : public AbstractMetricsManager <HandlerBucket>
26+ {
27+ const Configurable _config;
28+ HandlerBucket _bucket;
29+
30+ public:
31+ TestHandlerMetricsManager (const Configurable *window_config)
32+ : AbstractMetricsManager<HandlerBucket>(window_config)
33+ , _config(*window_config){};
34+ auto current_periods () const
35+ {
36+ return _config.config_get <uint64_t >(" period" );
37+ }
38+ const HandlerBucket *bucket (uint64_t ) const
39+ {
40+ return &_bucket;
41+ }
42+ void window_single_json (json &j, const std::string &, uint64_t ) const
43+ {
44+ j[" window" ] = " single" ;
45+ }
46+ void window_single_prometheus (std::stringstream &out, uint64_t period, Metric::LabelMap) const
47+ {
48+ if (period) {
49+ out << " first_window" ;
50+ } else {
51+ out << " live_window" ;
52+ }
53+ }
54+ void window_single_opentelemetry (metrics::v1::ScopeMetrics &scope, uint64_t period, Metric::LabelMap) const
55+ {
56+ if (period) {
57+ scope.add_metrics ()->set_name (" first_window" );
58+ } else {
59+ scope.add_metrics ()->set_name (" live_window" );
60+ }
61+ }
62+ void window_external_opentelemetry (metrics::v1::ScopeMetrics &scope, AbstractMetricsBucket *, Metric::LabelMap) const
63+ {
64+ scope.add_metrics ()->set_name (" external_window" );
65+ }
66+ void window_external_prometheus (std::stringstream &out, AbstractMetricsBucket *, Metric::LabelMap) const
67+ {
68+ out << " external_window" ;
69+ }
70+ void window_external_json (json &j, const std::string &, AbstractMetricsBucket *) const
71+ {
72+ j[" window" ] = " external" ;
73+ }
74+ void window_merged_json (json &j, const std::string &, uint64_t ) const
75+ {
76+ j[" window" ] = " merged" ;
77+ }
78+ std::unique_ptr<AbstractMetricsBucket> simple_merge (AbstractMetricsBucket *, uint64_t )
79+ {
80+ auto result = std::make_unique<HandlerBucket>();
81+ result->type = " simple" ;
82+ return result;
83+ }
84+ std::unique_ptr<AbstractMetricsBucket> multiple_merge (AbstractMetricsBucket *, uint64_t )
85+ {
86+ auto result = std::make_unique<HandlerBucket>();
87+ result->type = " multiple" ;
88+ return result;
89+ }
90+ };
91+
92+ class TestStreamMetricsHandler : public StreamMetricsHandler <TestHandlerMetricsManager>
93+ {
94+ public:
95+ TestStreamMetricsHandler (const std::string &name, const Configurable *config)
96+ : StreamMetricsHandler<TestHandlerMetricsManager>(name, config){};
97+ void start () override {};
98+ void stop () override {};
99+ std::string schema_key () const override
100+ {
101+ return " test" ;
102+ }
103+ void test_common_info (json &j) const
104+ {
105+ common_info_json (j);
106+ }
107+ void test_process_groups ()
108+ {
109+ const StreamMetricsHandler::GroupDefType group_defs = {
110+ {" test" , group::TestHandler::Test},
111+ };
112+ process_groups (group_defs);
113+ }
114+ };
115+
116+ TEST_CASE (" StreamMetricsHandler tests" , " [metrics][handler]" )
117+ {
118+ Configurable config;
119+
120+ SECTION (" Common info" )
121+ {
122+ config.config_set <uint64_t >(" period" , 1 );
123+ TestStreamMetricsHandler handler (" my_handler" , &config);
124+ nlohmann::json j;
125+ handler.test_common_info (j);
126+ CHECK (j[" module" ][" name" ] == " my_handler" );
127+ CHECK (handler.metrics ()->current_periods () == 1 );
128+ }
129+
130+ SECTION (" Process groups" )
131+ {
132+ config.config_set <uint64_t >(" period" , 1 );
133+ TestStreamMetricsHandler handler (" my_handler" , &config);
134+ handler.config_set <Configurable::StringList>(" disable" , {" all" });
135+ handler.config_set <Configurable::StringList>(" enable" , {" all" });
136+ CHECK_NOTHROW (handler.test_process_groups ());
137+
138+ handler.config_set <Configurable::StringList>(" disable" , {" test" });
139+ handler.config_set <Configurable::StringList>(" enable" , {" test" });
140+ CHECK_NOTHROW (handler.test_process_groups ());
141+
142+ handler.config_set <Configurable::StringList>(" disable" , {" invalid" });
143+ CHECK_THROWS (handler.test_process_groups ());
144+
145+ handler.config_set <Configurable::StringList>(" disable" , {" test" });
146+ handler.config_set <Configurable::StringList>(" enable" , {" invalid" });
147+ CHECK_THROWS (handler.test_process_groups ());
148+ }
149+
150+ SECTION (" Period shift" )
151+ {
152+ config.config_set <uint64_t >(" period" , 0 );
153+ TestStreamMetricsHandler handler (" my_handler" , &config);
154+ timespec now;
155+ timespec_get (&now, TIME_UTC);
156+ CHECK_NOTHROW (handler.check_period_shift (now));
157+ }
158+
159+ SECTION (" JSON window" )
160+ {
161+ TestStreamMetricsHandler handler (" my_handler" , &config);
162+ json j;
163+ handler.window_json (j, 0 , false );
164+ CHECK (j[" window" ] == " single" );
165+ handler.window_json (j, 0 , true );
166+ CHECK (j[" window" ] == " merged" );
167+ handler.window_json (j, nullptr );
168+ CHECK (j[" window" ] == " external" );
169+ }
170+
171+ SECTION (" Prometheus window" )
172+ {
173+ std::string line;
174+ std::stringstream out;
175+ config.config_set <uint64_t >(" period" , 0 );
176+ auto handler = std::make_unique<TestStreamMetricsHandler>(" my_handler" , &config);
177+ handler->window_prometheus (out);
178+ std::getline (out, line);
179+ CHECK (line == " live_window" );
180+ out.clear ();
181+
182+ config.config_set <uint64_t >(" period" , 3 );
183+ handler = std::make_unique<TestStreamMetricsHandler>(" my_handler" , &config);
184+ handler->window_prometheus (out);
185+ std::getline (out, line);
186+ CHECK (line == " first_window" );
187+ out.clear ();
188+
189+ handler->window_prometheus (out, nullptr );
190+ std::getline (out, line);
191+ CHECK (line == " external_window" );
192+ out.clear ();
193+ }
194+
195+ SECTION (" Opentelemetry window" )
196+ {
197+ metrics::v1::ScopeMetrics scope;
198+ config.config_set <uint64_t >(" period" , 0 );
199+ auto handler = std::make_unique<TestStreamMetricsHandler>(" my_handler" , &config);
200+ handler->window_opentelemetry (scope);
201+ CHECK (scope.metrics ().at (0 ).name () == " live_window" );
202+ scope.Clear ();
203+
204+ config.config_set <uint64_t >(" period" , 3 );
205+ handler = std::make_unique<TestStreamMetricsHandler>(" my_handler" , &config);
206+ handler->window_opentelemetry (scope);
207+ CHECK (scope.metrics ().at (0 ).name () == " first_window" );
208+ scope.Clear ();
209+
210+ handler->window_opentelemetry (scope, nullptr );
211+ CHECK (scope.metrics ().at (0 ).name () == " external_window" );
212+ scope.Clear ();
213+ }
214+
215+ SECTION (" Merge" )
216+ {
217+ config.config_set <uint64_t >(" period" , 0 );
218+ auto handler = std::make_unique<TestStreamMetricsHandler>(" my_handler" , &config);
219+ auto result = handler->merge (nullptr , 0 , false , true );
220+ CHECK (dynamic_cast <HandlerBucket *>(result.get ())->type == " multiple" );
221+
222+ result = handler->merge (nullptr , 0 , true , false );
223+ CHECK (dynamic_cast <HandlerBucket *>(result.get ())->type == " simple" );
224+ }
225+ }
0 commit comments