|
| 1 | +#include "hive_impl.h" |
| 2 | +#include "hive_log.h" |
| 3 | + |
| 4 | +namespace NKikimr { |
| 5 | +namespace NHive { |
| 6 | + |
| 7 | +class TTxProcessMetrics : public TTransactionBase<THive> { |
| 8 | + TSideEffects SideEffects; |
| 9 | + |
| 10 | + static constexpr size_t MAX_UPDATES_PROCESSED = 200; |
| 11 | +public: |
| 12 | + TTxProcessMetrics(THive* hive) |
| 13 | + : TBase(hive) |
| 14 | + {} |
| 15 | + |
| 16 | + TTxType GetTxType() const override { return NHive::TXTYPE_PROCESS_METRICS; } |
| 17 | + |
| 18 | + bool Execute(TTransactionContext& txc, const TActorContext&) override { |
| 19 | + BLOG_D("TTxProcessMetrics::Execute()"); |
| 20 | + NIceDb::TNiceDb db(txc.DB); |
| 21 | + SideEffects.Reset(Self->SelfId()); |
| 22 | + for (size_t i = 0; !Self->ProcessMetricsQueue.empty() && i < MAX_UPDATES_PROCESSED; ++i) { |
| 23 | + auto tabletId = Self->ProcessMetricsQueue.front(); |
| 24 | + Self->ProcessMetricsQueue.pop(); |
| 25 | + auto* tablet = Self->FindTablet(tabletId); |
| 26 | + if (tablet == nullptr) { |
| 27 | + continue; |
| 28 | + } |
| 29 | + tablet->UpdateMetricsEnqueued = false; |
| 30 | + NKikimrTabletBase::TMetrics protoMetrics; |
| 31 | + tablet->GetResourceValues().ToProto(&protoMetrics); |
| 32 | + db.Table<Schema::Metrics>().Key(tabletId).Update<Schema::Metrics::ProtoMetrics>(protoMetrics); |
| 33 | + } |
| 34 | + if (Self->ProcessMetricsQueue.empty()) { |
| 35 | + Self->ProcessMetricsScheduled = false; |
| 36 | + } else { |
| 37 | + SideEffects.Send(Self->SelfId(), new TEvPrivate::TEvProcessMetrics); |
| 38 | + } |
| 39 | + return true; |
| 40 | + } |
| 41 | + |
| 42 | + void Complete(const TActorContext& ctx) override { |
| 43 | + SideEffects.Complete(ctx); |
| 44 | + } |
| 45 | +}; |
| 46 | + |
| 47 | +ITransaction* THive::CreateProcessMetrics() { |
| 48 | + return new TTxProcessMetrics(this); |
| 49 | +} |
| 50 | + |
| 51 | +} // NHive |
| 52 | +} // NKikimr |
0 commit comments