Skip to content

Commit 05ce178

Browse files
authored
Fixed an error in calculating metrics in the PQ tablet (#29321)
1 parent 5b5bb2e commit 05ce178

File tree

5 files changed

+68
-38
lines changed

5 files changed

+68
-38
lines changed

ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ TMirrorer::TMirrorer(
4141
, Config(config)
4242
{
4343
Counters.Populate(counters);
44+
Counters.ResetCounters();
4445
}
4546

4647
void TMirrorer::Bootstrap(const TActorContext& ctx) {
@@ -239,6 +240,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext&
239240
void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) {
240241
ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
241242
ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters));
243+
Counters.Cumulative().ResetCounters();
244+
Counters.Percentile().ResetCounters();
242245

243246
if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) {
244247
LastStateLogTimestamp = ctx.Now();

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,11 @@ bool TPartition::LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const
174174
}
175175

176176
struct TMirrorerInfo {
177-
TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
177+
TMirrorerInfo(const TActorId& actor)
178178
: Actor(actor) {
179-
Baseline.Populate(baseline);
180179
}
181180

182181
TActorId Actor;
183-
TTabletCountersBase Baseline;
184182
};
185183

186184
const TString& TPartition::TopicName() const {
@@ -370,6 +368,7 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo
370368
, SamplingControl(samplingControl)
371369
{
372370
TabletCounters.Populate(*Counters);
371+
TabletCounters.ResetCounters();
373372
}
374373

375374
void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
@@ -467,6 +466,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
467466

468467
ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup());
469468
ctx.Send(TabletActorId, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters));
469+
TabletCounters.Cumulative().ResetCounters();
470+
TabletCounters.Percentile().ResetCounters();
470471

471472
ui64 usedStorage = GetUsedStorage(now);
472473
if (usedStorage > 0) {
@@ -619,9 +620,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
619620

620621
void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) {
621622
if (Mirrorer) {
622-
auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline);
623-
TabletCounters.Populate(*diff.Get());
624-
ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline);
623+
TabletCounters.Populate(ev->Get()->Counters);
625624
}
626625
}
627626

@@ -4232,8 +4231,7 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) {
42324231

42334232
void TPartition::CreateMirrorerActor() {
42344233
Mirrorer = MakeHolder<TMirrorerInfo>(
4235-
RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)),
4236-
TabletCounters
4234+
RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters))
42374235
);
42384236
}
42394237

ydb/core/persqueue/pqtablet/pq_impl.cpp

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -636,8 +636,7 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi
636636
Partitions.emplace(std::piecewise_construct,
637637
std::forward_as_tuple(partitionId),
638638
std::forward_as_tuple(actorId,
639-
GetPartitionKeyRange(config, partition),
640-
*Counters));
639+
GetPartitionKeyRange(config, partition)));
641640
++OriginalPartitionsCount;
642641
}
643642

@@ -674,8 +673,7 @@ void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId)
674673
{
675674
Partitions.emplace(partitionId,
676675
TPartitionInfo(TActorId(),
677-
{},
678-
*Counters));
676+
{}));
679677
NewSupportivePartitions.insert(partitionId);
680678
}
681679

@@ -1077,11 +1075,12 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte
10771075
PQ_LOG_T("Handle TEvPQ::TEvPartitionCounters" <<
10781076
" PartitionId " << ev->Get()->Partition);
10791077

1080-
const auto& partitionId = ev->Get()->Partition;
1078+
auto& partitionId = ev->Get()->Partition;
10811079
auto& partition = GetPartitionInfo(partitionId);
1082-
auto diff = ev->Get()->Counters.MakeDiffForAggr(partition.Baseline);
1083-
ui64 cpuUsage = diff->Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get();
1084-
ui64 networkBytesUsage = diff->Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get();
1080+
1081+
auto& counters = ev->Get()->Counters;
1082+
ui64 cpuUsage = counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get();
1083+
ui64 networkBytesUsage = counters.Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get();
10851084
if (ResourceMetrics) {
10861085
if (cpuUsage > 0) {
10871086
ResourceMetrics->CPU.Increment(cpuUsage);
@@ -1093,17 +1092,15 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte
10931092
ResourceMetrics->TryUpdate(ctx);
10941093
}
10951094
}
1095+
Counters->Percentile().Populate(counters.Percentile());
1096+
Counters->Cumulative().Populate(counters.Cumulative());
10961097

1097-
Counters->Populate(*diff.Get());
1098-
ev->Get()->Counters.RememberCurrentStateAsBaseline(partition.Baseline);
1098+
partition.ReservedBytes = counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get();
10991099

11001100
// restore cache's simple counters cleaned by partition's counters
11011101
SetCacheCounters(CacheCounters);
1102-
ui64 reservedSize = 0;
1103-
for (auto& p : Partitions) {
1104-
if (p.second.Baseline.Simple().Size() > 0) //there could be no counters from this partition yet
1105-
reservedSize += p.second.Baseline.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get();
1106-
}
1102+
ui64 reservedSize = std::accumulate(Partitions.begin(), Partitions.end(), 0ul,
1103+
[](ui64 sum, const auto& p) { return sum + p.second.ReservedBytes; });
11071104
Counters->Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(reservedSize);
11081105

11091106
// Features of the implementation of SimpleCounters. It is necessary to restore the value of

ydb/core/persqueue/pqtablet/pq_impl_types.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ namespace NKikimr::NPQ {
66

77
struct TPartitionInfo {
88
TPartitionInfo(const TActorId& actor,
9-
TMaybe<TPartitionKeyRange>&& keyRange,
10-
const TTabletCountersBase& baseline)
9+
TMaybe<TPartitionKeyRange>&& keyRange)
1110
: Actor(actor)
1211
, KeyRange(std::move(keyRange))
1312
, InitDone(false)
1413
{
15-
Baseline.Populate(baseline);
1614
}
1715

1816
TPartitionInfo(const TPartitionInfo& info)
@@ -21,14 +19,13 @@ struct TPartitionInfo {
2119
, InitDone(info.InitDone)
2220
, PendingRequests(info.PendingRequests)
2321
{
24-
Baseline.Populate(info.Baseline);
2522
}
2623

2724
TActorId Actor;
2825
TMaybe<TPartitionKeyRange> KeyRange;
2926
bool InitDone;
30-
TTabletCountersBase Baseline;
3127
THashMap<TString, TTabletLabeledCountersBase> LabeledCounters;
28+
size_t ReservedBytes = 0;
3229

3330
struct TPendingRequest {
3431
TPendingRequest(ui64 cookie,

ydb/core/tablet/tablet_counters.h

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,16 @@ class TTabletCumulativeCounter : public TTabletSimpleCounterBase{
129129
void Initialize(const TTabletCumulativeCounter& rp) {
130130
SetTo(rp);
131131
}
132+
133+
void Set(ui64 value) {
134+
Value = value;
135+
}
136+
132137
void AdjustToBaseLine(const TTabletCumulativeCounter& baseLine) {
133138
Y_DEBUG_ABORT_UNLESS(Value >= baseLine.Value);
134139
Value -= baseLine.Value;
135140
}
141+
136142
void SetTo(const TTabletCumulativeCounter& rp) {
137143
Value = rp.Value;
138144
}
@@ -375,6 +381,18 @@ class TCountersArray : TNonCopyable {
375381
return CountersQnt;
376382
}
377383

384+
void Populate(const TCountersArray<T>& rp) {
385+
if (CountersQnt != rp.CountersQnt) {
386+
Reset(rp);
387+
} else {
388+
for (ui32 i = 0, e = CountersQnt; i < e; ++i) {
389+
Counters[i].Populate(rp.Counters[i]);
390+
}
391+
}
392+
}
393+
394+
void ResetCounters();
395+
378396
private:
379397
//
380398
void Reset(const TCountersArray<T>& rp) {
@@ -408,22 +426,33 @@ class TCountersArray : TNonCopyable {
408426
}
409427
}
410428

411-
void Populate(const TCountersArray<T>& rp) {
412-
if (CountersQnt != rp.CountersQnt) {
413-
Reset(rp);
414-
} else {
415-
for (ui32 i = 0, e = CountersQnt; i < e; ++i) {
416-
Counters[i].Populate(rp.Counters[i]);
417-
}
418-
}
419-
}
420-
421429
//
422430
ui32 CountersQnt;
423431
TCountersHolder CountersHolder;
424432
T* Counters;
425433
};
426434

435+
template <>
436+
inline void TCountersArray<TTabletSimpleCounter>::ResetCounters() {
437+
for (ui32 i = 0; i < CountersQnt; ++i) {
438+
Counters[i].Set(0);
439+
}
440+
}
441+
442+
template <>
443+
inline void TCountersArray<TTabletCumulativeCounter>::ResetCounters() {
444+
for (ui32 i = 0; i < CountersQnt; ++i) {
445+
Counters[i].Set(0);
446+
}
447+
}
448+
449+
template <>
450+
inline void TCountersArray<TTabletPercentileCounter>::ResetCounters() {
451+
for (ui32 i = 0; i < CountersQnt; ++i) {
452+
Counters[i].Clear();
453+
}
454+
}
455+
427456
////////////////////////////////////////////
428457
/// The TTabletCountersBase class
429458
////////////////////////////////////////////
@@ -539,6 +568,12 @@ class TTabletCountersBase {
539568
}
540569
}
541570

571+
void ResetCounters() {
572+
SimpleCounters.ResetCounters();
573+
CumulativeCounters.ResetCounters();
574+
PercentileCounters.ResetCounters();
575+
}
576+
542577
private:
543578
//
544579
TTabletCountersBase(const TTabletCountersBase&);

0 commit comments

Comments
 (0)