Skip to content

Commit 0aa70fc

Browse files
committed
Fixed an error in calculating metrics in the PQ tablet (#29321)
1 parent 8cef58f commit 0aa70fc

File tree

5 files changed

+72
-37
lines changed

5 files changed

+72
-37
lines changed

ydb/core/persqueue/mirrorer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ TMirrorer::TMirrorer(
4646
, Config(config)
4747
{
4848
Counters.Populate(counters);
49+
Counters.ResetCounters();
4950
}
5051

5152
void TMirrorer::Bootstrap(const TActorContext& ctx) {
@@ -251,6 +252,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext&
251252
void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) {
252253
ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
253254
ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters));
255+
Counters.Cumulative().ResetCounters();
256+
Counters.Percentile().ResetCounters();
254257

255258
if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) {
256259
LastStateLogTimestamp = ctx.Now();

ydb/core/persqueue/partition.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,11 @@ bool TPartition::LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const
171171
}
172172

173173
struct TMirrorerInfo {
174-
TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
174+
TMirrorerInfo(const TActorId& actor)
175175
: Actor(actor) {
176-
Baseline.Populate(baseline);
177176
}
178177

179178
TActorId Actor;
180-
TTabletCountersBase Baseline;
181179
};
182180

183181
const TString& TPartition::TopicName() const {
@@ -340,8 +338,9 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo
340338
, WriteLagMs(TDuration::Minutes(1), 100)
341339
, LastEmittedHeartbeat(TRowVersion::Min())
342340
, SamplingControl(samplingControl) {
343-
341+
{
344342
TabletCounters.Populate(Counters);
343+
TabletCounters.ResetCounters();
345344
}
346345

347346
void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
@@ -437,6 +436,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
437436

438437
ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup());
439438
ctx.Send(Tablet, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters));
439+
TabletCounters.Cumulative().ResetCounters();
440+
TabletCounters.Percentile().ResetCounters();
440441

441442
ui64 usedStorage = GetUsedStorage(now);
442443
if (usedStorage > 0) {
@@ -592,9 +593,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
592593

593594
void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) {
594595
if (Mirrorer) {
595-
auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline);
596-
TabletCounters.Populate(*diff.Get());
597-
ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline);
596+
TabletCounters.Populate(ev->Get()->Counters);
598597
}
599598
}
600599

@@ -4084,8 +4083,12 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) {
40844083

40854084
void TPartition::CreateMirrorerActor() {
40864085
Mirrorer = MakeHolder<TMirrorerInfo>(
4086+
<<<<<<< HEAD:ydb/core/persqueue/partition.cpp
40874087
Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)),
40884088
TabletCounters
4089+
=======
4090+
RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters))
4091+
>>>>>>> 05ce178e7e3 (Fixed an error in calculating metrics in the PQ tablet (#29321)):ydb/core/persqueue/pqtablet/partition/partition.cpp
40894092
);
40904093
}
40914094

ydb/core/persqueue/pq_impl.cpp

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -911,8 +911,7 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi
911911
Partitions.emplace(std::piecewise_construct,
912912
std::forward_as_tuple(partitionId),
913913
std::forward_as_tuple(actorId,
914-
GetPartitionKeyRange(config, partition),
915-
*Counters));
914+
GetPartitionKeyRange(config, partition)));
916915
++OriginalPartitionsCount;
917916
}
918917

@@ -949,8 +948,7 @@ void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId)
949948
{
950949
Partitions.emplace(partitionId,
951950
TPartitionInfo(TActorId(),
952-
{},
953-
*Counters));
951+
{}));
954952
NewSupportivePartitions.insert(partitionId);
955953
}
956954

@@ -1359,11 +1357,12 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte
13591357
PQ_LOG_T("Handle TEvPQ::TEvPartitionCounters" <<
13601358
" PartitionId " << ev->Get()->Partition);
13611359

1362-
const auto& partitionId = ev->Get()->Partition;
1360+
auto& partitionId = ev->Get()->Partition;
13631361
auto& partition = GetPartitionInfo(partitionId);
1364-
auto diff = ev->Get()->Counters.MakeDiffForAggr(partition.Baseline);
1365-
ui64 cpuUsage = diff->Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get();
1366-
ui64 networkBytesUsage = diff->Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get();
1362+
1363+
auto& counters = ev->Get()->Counters;
1364+
ui64 cpuUsage = counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get();
1365+
ui64 networkBytesUsage = counters.Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get();
13671366
if (ResourceMetrics) {
13681367
if (cpuUsage > 0) {
13691368
ResourceMetrics->CPU.Increment(cpuUsage);
@@ -1375,17 +1374,15 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte
13751374
ResourceMetrics->TryUpdate(ctx);
13761375
}
13771376
}
1377+
Counters->Percentile().Populate(counters.Percentile());
1378+
Counters->Cumulative().Populate(counters.Cumulative());
13781379

1379-
Counters->Populate(*diff.Get());
1380-
ev->Get()->Counters.RememberCurrentStateAsBaseline(partition.Baseline);
1380+
partition.ReservedBytes = counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get();
13811381

13821382
// restore cache's simple counters cleaned by partition's counters
13831383
SetCacheCounters(CacheCounters);
1384-
ui64 reservedSize = 0;
1385-
for (auto& p : Partitions) {
1386-
if (p.second.Baseline.Simple().Size() > 0) //there could be no counters from this partition yet
1387-
reservedSize += p.second.Baseline.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get();
1388-
}
1384+
ui64 reservedSize = std::accumulate(Partitions.begin(), Partitions.end(), 0ul,
1385+
[](ui64 sum, const auto& p) { return sum + p.second.ReservedBytes; });
13891386
Counters->Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(reservedSize);
13901387

13911388
// Features of the implementation of SimpleCounters. It is necessary to restore the value of

ydb/core/persqueue/pq_impl_types.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ namespace NKikimr::NPQ {
66

77
struct TPartitionInfo {
88
TPartitionInfo(const TActorId& actor,
9-
TMaybe<TPartitionKeyRange>&& keyRange,
10-
const TTabletCountersBase& baseline)
9+
TMaybe<TPartitionKeyRange>&& keyRange)
1110
: Actor(actor)
1211
, KeyRange(std::move(keyRange))
1312
, InitDone(false)
1413
{
15-
Baseline.Populate(baseline);
1614
}
1715

1816
TPartitionInfo(const TPartitionInfo& info)
@@ -21,14 +19,13 @@ struct TPartitionInfo {
2119
, InitDone(info.InitDone)
2220
, PendingRequests(info.PendingRequests)
2321
{
24-
Baseline.Populate(info.Baseline);
2522
}
2623

2724
TActorId Actor;
2825
TMaybe<TPartitionKeyRange> KeyRange;
2926
bool InitDone;
30-
TTabletCountersBase Baseline;
3127
THashMap<TString, TTabletLabeledCountersBase> LabeledCounters;
28+
size_t ReservedBytes = 0;
3229

3330
struct TPendingRequest {
3431
TPendingRequest(ui64 cookie,

ydb/core/tablet/tablet_counters.h

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,16 @@ class TTabletCumulativeCounter : public TTabletSimpleCounterBase{
129129
void Initialize(const TTabletCumulativeCounter& rp) {
130130
SetTo(rp);
131131
}
132+
133+
void Set(ui64 value) {
134+
Value = value;
135+
}
136+
132137
void AdjustToBaseLine(const TTabletCumulativeCounter& baseLine) {
133138
Y_DEBUG_ABORT_UNLESS(Value >= baseLine.Value);
134139
Value -= baseLine.Value;
135140
}
141+
136142
void SetTo(const TTabletCumulativeCounter& rp) {
137143
Value = rp.Value;
138144
}
@@ -375,6 +381,18 @@ class TCountersArray : TNonCopyable {
375381
return CountersQnt;
376382
}
377383

384+
void Populate(const TCountersArray<T>& rp) {
385+
if (CountersQnt != rp.CountersQnt) {
386+
Reset(rp);
387+
} else {
388+
for (ui32 i = 0, e = CountersQnt; i < e; ++i) {
389+
Counters[i].Populate(rp.Counters[i]);
390+
}
391+
}
392+
}
393+
394+
void ResetCounters();
395+
378396
private:
379397
//
380398
void Reset(const TCountersArray<T>& rp) {
@@ -408,22 +426,33 @@ class TCountersArray : TNonCopyable {
408426
}
409427
}
410428

411-
void Populate(const TCountersArray<T>& rp) {
412-
if (CountersQnt != rp.CountersQnt) {
413-
Reset(rp);
414-
} else {
415-
for (ui32 i = 0, e = CountersQnt; i < e; ++i) {
416-
Counters[i].Populate(rp.Counters[i]);
417-
}
418-
}
419-
}
420-
421429
//
422430
ui32 CountersQnt;
423431
TCountersHolder CountersHolder;
424432
T* Counters;
425433
};
426434

435+
template <>
436+
inline void TCountersArray<TTabletSimpleCounter>::ResetCounters() {
437+
for (ui32 i = 0; i < CountersQnt; ++i) {
438+
Counters[i].Set(0);
439+
}
440+
}
441+
442+
template <>
443+
inline void TCountersArray<TTabletCumulativeCounter>::ResetCounters() {
444+
for (ui32 i = 0; i < CountersQnt; ++i) {
445+
Counters[i].Set(0);
446+
}
447+
}
448+
449+
template <>
450+
inline void TCountersArray<TTabletPercentileCounter>::ResetCounters() {
451+
for (ui32 i = 0; i < CountersQnt; ++i) {
452+
Counters[i].Clear();
453+
}
454+
}
455+
427456
////////////////////////////////////////////
428457
/// The TTabletCountersBase class
429458
////////////////////////////////////////////
@@ -538,6 +567,12 @@ class TTabletCountersBase {
538567
}
539568
}
540569

570+
void ResetCounters() {
571+
SimpleCounters.ResetCounters();
572+
CumulativeCounters.ResetCounters();
573+
PercentileCounters.ResetCounters();
574+
}
575+
541576
private:
542577
//
543578
TTabletCountersBase(const TTabletCountersBase&);

0 commit comments

Comments
 (0)