diff --git a/ydb/core/tx/schemeshard/olap/store/store.h b/ydb/core/tx/schemeshard/olap/store/store.h index b5719f3aed91..6d60b9319122 100644 --- a/ydb/core/tx/schemeshard/olap/store/store.h +++ b/ydb/core/tx/schemeshard/olap/store/store.h @@ -147,11 +147,11 @@ struct TOlapStoreInfo { ILayoutPolicy::TPtr GetTablesLayoutPolicy() const; - void UpdateShardStats(TShardIdx shardIdx, const TPartitionStats& newStats) { + void UpdateShardStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx shardIdx, const TPartitionStats& newStats, TInstant now) { Stats.Aggregated.PartCount = ColumnShards.size(); Stats.PartitionStats[shardIdx]; // insert if none - Stats.UpdateShardStats(shardIdx, newStats); + Stats.UpdateShardStats(diskSpaceUsageDelta, shardIdx, newStats, now); } }; -} \ No newline at end of file +} diff --git a/ydb/core/tx/schemeshard/olap/table/table.h b/ydb/core/tx/schemeshard/olap/table/table.h index 8bb0549be91d..6948361cb692 100644 --- a/ydb/core/tx/schemeshard/olap/table/table.h +++ b/ydb/core/tx/schemeshard/olap/table/table.h @@ -103,15 +103,15 @@ struct TColumnTableInfo { return Stats; } - void UpdateShardStats(const TShardIdx shardIdx, const TPartitionStats& newStats) { + void UpdateShardStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, const TShardIdx shardIdx, const TPartitionStats& newStats, TInstant now) { Stats.Aggregated.PartCount = GetColumnShards().size(); Stats.PartitionStats[shardIdx]; // insert if none - Stats.UpdateShardStats(shardIdx, newStats); + Stats.UpdateShardStats(diskSpaceUsageDelta, shardIdx, newStats, now); } - void UpdateTableStats(const TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats) { + void UpdateTableStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, const TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats, TInstant now) { Stats.TableStats[pathId].Aggregated.PartCount = GetColumnShards().size(); - Stats.UpdateTableStats(shardIdx, pathId, newStats); + Stats.UpdateTableStats(diskSpaceUsageDelta, shardIdx, pathId, newStats, now); } TConclusion> BuildEntity(const TPathId& pathId, const NOlap::NAlter::TEntityInitializationContext& iContext) const; diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 1e520769fe54..4cb2e5ff16a9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -1845,6 +1845,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase { } } + tableInfo->IsExternalBlobsEnabled = PartitionConfigHasExternalBlobsEnabled(tableInfo->PartitionConfig()); + TString alterTabletFull = std::get<4>(rec); TString alterTabletDiff = std::get<5>(rec); if (alterTabletFull) { @@ -2384,6 +2386,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { TPathId prevTableId; + TInstant now = AppData()->TimeProvider->Now(); while (!rowSet.EndOfSet()) { const TPathId tableId = TPathId( rowSet.GetValue(), @@ -2460,7 +2463,6 @@ struct TSchemeShard::TTxInit : public TTransactionBase { stats.RangeReads = rowSet.GetValue(); stats.RangeReadRows = rowSet.GetValue(); - TInstant now = AppData(ctx)->TimeProvider->Now(); stats.SetCurrentRawCpuUsage(rowSet.GetValue(), now); stats.Memory = rowSet.GetValue(); stats.Network = rowSet.GetValue(); @@ -2478,7 +2480,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase { stats.LocksWholeShard = rowSet.GetValueOrDefault(); stats.LocksBroken = rowSet.GetValueOrDefault(); - tableInfo->UpdateShardStats(shardIdx, stats); + TDiskSpaceUsageDelta unusedDelta; + tableInfo->UpdateShardStats(&unusedDelta, shardIdx, stats, now); // note that we don't update shard metrics here, because we will always update // the shard metrics in TSchemeShard::SetPartitioning diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp index 921fb7b552ea..f4c3dadd58ea 100644 --- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp @@ -22,12 +22,12 @@ class TTxStoreTopicStats: public TTxStoreStats::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override; + bool PersistSingleStats(const TPathId& pathId, const TStatsQueue::TItem& item, TInstant now, TTransactionContext& txc, const TActorContext& ctx) override; void ScheduleNextBatch(const TActorContext& ctx) override; }; -bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQueueItem& item, TTransactionContext& txc, const TActorContext& ctx) { +bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQueueItem& item, TInstant /*now*/, TTransactionContext& txc, const TActorContext& ctx) { const auto& rec = item.Ev->Get()->Record; TTopicStats newStats; diff --git a/ydb/core/tx/schemeshard/schemeshard__stats.h b/ydb/core/tx/schemeshard/schemeshard__stats.h index 58375da29339..e2cbfaef87c3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__stats.h +++ b/ydb/core/tx/schemeshard/schemeshard__stats.h @@ -124,7 +124,7 @@ class TTxStoreStats: public NTabletFlatExecutor::TTransactionBase bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; // returns true to continue batching - virtual bool PersistSingleStats(const TPathId& pathId, const TItem& item, TTransactionContext& txc, const TActorContext& ctx) = 0; + virtual bool PersistSingleStats(const TPathId& pathId, const TItem& item, TInstant now, TTransactionContext& txc, const TActorContext& ctx) = 0; virtual void ScheduleNextBatch(const TActorContext& ctx) = 0; }; diff --git a/ydb/core/tx/schemeshard/schemeshard__stats_impl.h b/ydb/core/tx/schemeshard/schemeshard__stats_impl.h index 25048a35af22..a9d35a001861 100644 --- a/ydb/core/tx/schemeshard/schemeshard__stats_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard__stats_impl.h @@ -118,13 +118,14 @@ bool TTxStoreStats::Execute(NTabletFlatExecutor::TTransactionContext& tx return true; } + TInstant now = AppData(ctx)->TimeProvider->Now(); TMonotonic start = TMonotonic::Now(); const ui32 maxBatchSize = Queue.MaxBatchSize(); ui32 batchSize = 0; for (; batchSize < maxBatchSize && !Queue.Empty(); ++batchSize) { auto item = Queue.Next(); Queue.WriteLatencyMetric(item); - if (!PersistSingleStats(item.PathId(), item, txc, ctx)) { + if (!PersistSingleStats(item.PathId(), item, now, txc, ctx)) { break; } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index ef6c57d7fbde..bad52de1e6f0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -6,31 +6,6 @@ #include #include -namespace { - -THashMap MapChannelsToStoragePoolKinds(const NActors::TActorContext& ctx, - const NKikimr::TStoragePools& pools, - const NKikimr::TChannelsBindings& bindings -) { - THashMap nameToKindMap(pools.size()); - for (const auto& pool : pools) { - nameToKindMap.emplace(pool.GetName(), pool.GetKind()); - } - THashMap channelsMapping(bindings.size()); - for (ui32 channel = 0u; channel < bindings.size(); ++channel) { - if (const auto* poolKind = nameToKindMap.FindPtr(bindings[channel].GetStoragePoolName())) { - channelsMapping.emplace(channel, *poolKind); - } else { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "MapChannelsToStoragePoolKinds: the subdomain has no info about the storage pool named " - << bindings[channel].GetStoragePoolName() - ); - } - } - return channelsMapping; -} - -} namespace NKikimr { namespace NSchemeShard { @@ -124,11 +99,11 @@ class TTxStoreTableStats: public TTxStoreStats::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override; + bool PersistSingleStats(const TPathId& pathId, const TStatsQueue::TItem& item, TInstant now, TTransactionContext& txc, const TActorContext& ctx) override; void ScheduleNextBatch(const TActorContext& ctx) override; template - TPartitionStats PrepareStats(const TActorContext& ctx, const T& rec, const THashMap& channelsMapping = {}) const; + TPartitionStats PrepareStats(const T& rec, TInstant now, const NKikimr::TStoragePools& pools, const NKikimr::TChannelsBindings& bindings) const; }; @@ -160,9 +135,10 @@ THolder MergeRequest( } template -TPartitionStats TTxStoreTableStats::PrepareStats(const TActorContext& ctx, - const T& rec, - const THashMap& channelsMapping +TPartitionStats TTxStoreTableStats::PrepareStats(const T& rec, + TInstant now, + const NKikimr::TStoragePools& pools, + const NKikimr::TChannelsBindings& bindings ) const { const auto& tableStats = rec.GetTableStats(); const auto& tabletMetrics = rec.GetTabletMetrics(); @@ -176,17 +152,15 @@ TPartitionStats TTxStoreTableStats::PrepareStats(const TActorContext& ctx, newStats.ByKeyFilterSize = tableStats.GetByKeyFilterSize(); newStats.LastAccessTime = TInstant::MilliSeconds(tableStats.GetLastAccessTime()); newStats.LastUpdateTime = TInstant::MilliSeconds(tableStats.GetLastUpdateTime()); + + Y_UNUSED(pools); + for (const auto& channelStats : tableStats.GetChannels()) { - if (const auto* poolKind = channelsMapping.FindPtr(channelStats.GetChannel())) { - auto& [dataSize, indexSize] = newStats.StoragePoolsStats[*poolKind]; - dataSize += channelStats.GetDataSize(); - indexSize += channelStats.GetIndexSize(); - } else { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "PrepareStats: SchemeShard has no info on DataShard " - << rec.GetDatashardId() << " channel " << channelStats.GetChannel() << " binding" - ); - } + const auto& channelBind = bindings[channelStats.GetChannel()]; + const auto& poolKind = channelBind.GetStoragePoolKind(); + auto& [dataSize, indexSize] = newStats.StoragePoolsStats[poolKind]; + dataSize += channelStats.GetDataSize(); + indexSize += channelStats.GetIndexSize(); } newStats.ImmediateTxCompleted = tableStats.GetImmediateTxCompleted(); @@ -206,7 +180,6 @@ TPartitionStats TTxStoreTableStats::PrepareStats(const TActorContext& ctx, newStats.LocksWholeShard = tableStats.GetLocksWholeShard(); newStats.LocksBroken = tableStats.GetLocksBroken(); - TInstant now = AppData(ctx)->TimeProvider->Now(); newStats.SetCurrentRawCpuUsage(tabletMetrics.GetCPU(), now); newStats.Memory = tabletMetrics.GetMemory(); newStats.Network = tabletMetrics.GetNetwork(); @@ -238,6 +211,7 @@ TPartitionStats TTxStoreTableStats::PrepareStats(const TActorContext& ctx, bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, const TStatsQueueItem& item, + TInstant now, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) { const auto& rec = item.Ev->Get()->Record; const auto datashardId = TTabletId(rec.GetDatashardId()); @@ -247,21 +221,43 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, ui64 dataSize = tableStats.GetDataSize(); ui64 rowCount = tableStats.GetRowCount(); - const bool isDataShard = Self->Tables.contains(pathId); - const bool isOlapStore = Self->OlapStores.contains(pathId); - const bool isColumnTable = Self->ColumnTables.contains(pathId); + const auto pathElementIt = Self->PathsById.find(pathId); + if (pathElementIt == Self->PathsById.end()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "PersistSingleStats for pathId " << pathId + << ", tabletId " << datashardId + << ", followerId " << followerId + << ": unknown pathId" + ); + return true; + } + const auto& pathElement = pathElementIt->second; + if (pathElement->Dropped()) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "PersistSingleStats for pathId " << pathId + << ", tabletId " << datashardId + << ", followerId " << followerId + << ": pathId is dropped" + ); + return true; + } + + const bool isDataShard = pathElement->IsTable(); + const bool isOlapStore = pathElement->IsOlapStore(); + const bool isColumnTable = pathElement->IsColumnTable(); if (!isDataShard && !isOlapStore && !isColumnTable) { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId); return true; } - if (!Self->TabletIdToShardIdx.contains(datashardId)) { + TShardIdx shardIdx = [this, &datashardId]() { + auto found = Self->TabletIdToShardIdx.find(datashardId); + return (found != Self->TabletIdToShardIdx.end()) ? found->second : InvalidShardIdx; + }(); + if (!shardIdx) { LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId); return true; } - TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId]; LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "PersistSingleStats for pathId " << pathId.LocalPathId << " shard idx " << shardIdx << " data size " << dataSize << " row count " << rowCount ); @@ -273,13 +269,9 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, return true; } - auto subDomainInfo = Self->ResolveDomainInfo(pathId); - const auto channelsMapping = MapChannelsToStoragePoolKinds(ctx, - subDomainInfo->EffectiveStoragePools(), - shardInfo->BindedChannels); + auto subDomainInfo = Self->ResolveDomainInfo(pathElement); - const auto pathElement = Self->PathsById[pathId]; - const TPartitionStats newStats = PrepareStats(ctx, rec, channelsMapping); + const TPartitionStats newStats = PrepareStats(rec, now, subDomainInfo->EffectiveStoragePools(), shardInfo->BindedChannels); LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxStoreTableStats.PersistSingleStats: main stats from" @@ -294,8 +286,6 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, NIceDb::TNiceDb db(txc.DB); TTableInfo::TPtr table; - TPartitionStats oldAggrStats; - TPartitionStats newAggrStats; bool updateSubdomainInfo = false; TMaybe nodeId; @@ -316,14 +306,15 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, return true; } + TDiskSpaceUsageDelta diskSpaceUsageDelta; + if (isDataShard) { table = Self->Tables[pathId]; - oldAggrStats = table->GetStats().Aggregated; - table->UpdateShardStats(shardIdx, newStats); + table->UpdateShardStats(&diskSpaceUsageDelta, shardIdx, newStats, now); if (!table->IsBackup) { Self->UpdateBackgroundCompaction(shardIdx, newStats); - Self->UpdateShardMetrics(shardIdx, newStats); + Self->UpdateShardMetrics(shardIdx, newStats, now); } if (!newStats.HasBorrowedData) { @@ -335,23 +326,20 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, } if (!table->IsBackup && !table->IsShardsStatsDetached()) { - newAggrStats = table->GetStats().Aggregated; updateSubdomainInfo = true; } Self->PersistTablePartitionStats(db, pathId, shardIdx, table); } else if (isOlapStore) { TOlapStoreInfo::TPtr olapStore = Self->OlapStores[pathId]; - oldAggrStats = olapStore->GetStats().Aggregated; - olapStore->UpdateShardStats(shardIdx, newStats); - newAggrStats = olapStore->GetStats().Aggregated; + olapStore->UpdateShardStats(&diskSpaceUsageDelta, shardIdx, newStats, now); updateSubdomainInfo = true; const auto tables = rec.GetTables(); LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "OLAP store contains " << tables.size() << " tables."); for (const auto& table : tables) { - const TPartitionStats newTableStats = PrepareStats(ctx, table); + const TPartitionStats newTableStats = PrepareStats(table, now, {}, {}); const TPathId tablePathId = TPathId(TOwnerId(pathId.OwnerId), TLocalPathId(table.GetTableLocalId())); @@ -359,28 +347,38 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "add stats for exists table with pathId=" << tablePathId); - Self->ColumnTables.GetVerifiedPtr(tablePathId)->UpdateTableStats(shardIdx, tablePathId, newTableStats); + Self->ColumnTables.GetVerifiedPtr(tablePathId)->UpdateTableStats(&diskSpaceUsageDelta, shardIdx, tablePathId, newTableStats, now); } else { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "failed add stats for table with pathId=" << tablePathId); } } + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Aggregated stats for pathId " << pathId.LocalPathId + << ": RowCount " << olapStore->Stats.Aggregated.RowCount + << ", DataSize " << olapStore->Stats.Aggregated.DataSize + ); + } else if (isColumnTable) { LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "PersistSingleStats: ColumnTable rec.GetColumnTables() size=" << rec.GetTables().size()); auto columnTable = Self->ColumnTables.GetVerifiedPtr(pathId); - oldAggrStats = columnTable->GetStats().Aggregated; - columnTable->UpdateShardStats(shardIdx, newStats); - newAggrStats = columnTable->GetStats().Aggregated; + columnTable->UpdateShardStats(&diskSpaceUsageDelta, shardIdx, newStats, now); updateSubdomainInfo = true; + + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Aggregated stats for pathId " << pathId.LocalPathId + << ": RowCount " << columnTable->Stats.Aggregated.RowCount + << ", DataSize " << columnTable->Stats.Aggregated.DataSize + ); } if (updateSubdomainInfo) { - auto subDomainId = Self->ResolvePathIdForDomain(pathId); - subDomainInfo->AggrDiskSpaceUsage(Self, newAggrStats, oldAggrStats); + subDomainInfo->AggrDiskSpaceUsage(Self, diskSpaceUsageDelta); if (subDomainInfo->CheckDiskSpaceQuotas(Self)) { + auto subDomainId = Self->ResolvePathIdForDomain(pathElement); Self->PersistSubDomainState(db, subDomainId, *subDomainInfo); // Publish is done in a separate transaction, so we may call this directly TDeque toPublish; @@ -390,9 +388,6 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, } if (isOlapStore || isColumnTable) { - LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Aggregated stats for pathId " << pathId.LocalPathId - << ": RowCount " << newAggrStats.RowCount << ", DataSize " << newAggrStats.DataSize); return true; } @@ -409,7 +404,6 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].DecrementFor(lag->Seconds()); } - const auto now = ctx.Now(); if (now >= shardInfo.LastCondErase) { lag = now - shardInfo.LastCondErase; } else { @@ -420,15 +414,15 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, } const TTableIndexInfo* index = Self->Indexes.Value(pathElement->ParentPathId, nullptr).Get(); - const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(pathId); + const TTableInfo* mainTableForIndex = (index ? Self->GetMainTableForIndex(pathId) : nullptr); TString errStr; const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings(); TVector shardsToMerge; TString mergeReason; if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady) - && Self->CheckInFlightLimit(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions, errStr) - && table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, Self->ShardInfos[shardIdx].TabletID, shardsToMerge, mainTableForIndex, mergeReason) + && Self->CheckInFlightLimit(TTxState::ETxType::TxSplitTablePartition, errStr) + && table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, Self->ShardInfos[shardIdx].TabletID, shardsToMerge, mainTableForIndex, now, mergeReason) ) { TTxId txId = Self->GetCachedTxId(ctx); @@ -484,18 +478,32 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, return true; } - auto path = TPath::Init(pathId, Self); - auto checks = path.Check(); - constexpr ui64 deltaShards = 2; - checks - .PathShardsLimit(deltaShards) - .ShardsLimit(deltaShards); - if (!checks) { - LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Do not request full stats from datashard" - << ", datashard: " << datashardId - << ", reason: " << checks.GetError()); - return true; + //NOTE: intentionally avoid using TPath.Check().{PathShardsLimit,ShardsLimit}() here. + // PathShardsLimit() performs pedantic validation by recalculating shard count through + // iteration over entire ShardInfos, which is too slow for this hot spot. It also performs + // additional lookups we want to avoid. + { + constexpr ui64 deltaShards = 2; + if ((pathElement->GetShardsInside() + deltaShards) > subDomainInfo->GetSchemeLimits().MaxShardsInPath) { + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Do not request full stats from datashard " << datashardId + << ", reason: shards count limit exceeded (in path)" + << ", limit: " << subDomainInfo->GetSchemeLimits().MaxShardsInPath + << ", current: " << pathElement->GetShardsInside() + << ", delta: " << deltaShards + ); + return true; + } + const auto currentShards = (subDomainInfo->GetShardsInside() - subDomainInfo->GetBackupShards()); + if ((currentShards + deltaShards) > subDomainInfo->GetSchemeLimits().MaxShards) { + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Do not request full stats from datashard " << datashardId + << ", datashard: " << datashardId + << ", reason: shards count limit exceeded (in subdomain)" + << ", limit: " << subDomainInfo->GetSchemeLimits().MaxShards + << ", current: " << currentShards + << ", delta: " << deltaShards + ); + return true; + } } if (newStats.HasBorrowedData) { @@ -505,9 +513,11 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId, return true; } - if (path.IsLocked()) { + // path.IsLocked() and path.LockedBy() equivalent + if (const auto& found = Self->LockedPaths.find(pathId); found != Self->LockedPaths.end()) { + const auto txId = found->second; LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Postpone split tablet " << datashardId << " because it is locked by " << path.LockedBy()); + "Postpone split tablet " << datashardId << " because it is locked by " << txId); return true; } @@ -614,8 +624,9 @@ void TSchemeShard::ScheduleTableStatsBatch(const TActorContext& ctx) { void TSchemeShard::UpdateShardMetrics( const TShardIdx& shardIdx, - const TPartitionStats& newStats) -{ + const TPartitionStats& newStats, + TInstant now +) { if (newStats.HasBorrowedData) ShardsWithBorrowed.insert(shardIdx); else @@ -647,7 +658,6 @@ void TSchemeShard::UpdateShardMetrics( metrics.RowDeletes = newStats.RowDeletes; TabletCounters->Percentile()[COUNTER_SHARDS_WITH_ROW_DELETES].IncrementFor(metrics.RowDeletes); - auto now = AppData()->TimeProvider->Now(); auto compactionTime = TInstant::Seconds(newStats.FullCompactionTs); if (now >= compactionTime) metrics.HoursSinceFullCompaction = (now - compactionTime).Hours(); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 8a6879a87818..26715bc8f5c0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -7623,6 +7623,7 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T newPartitioningSet.reserve(newPartitioning.size()); const auto& oldPartitioning = tableInfo->GetPartitions(); + TInstant now = AppData()->TimeProvider->Now(); for (const auto& p: newPartitioning) { if (!oldPartitioning.empty()) newPartitioningSet.insert(p.ShardIdx); @@ -7631,7 +7632,7 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T auto it = partitionStats.find(p.ShardIdx); if (it != partitionStats.end()) { EnqueueBackgroundCompaction(p.ShardIdx, it->second); - UpdateShardMetrics(p.ShardIdx, it->second); + UpdateShardMetrics(p.ShardIdx, it->second, now); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 00ac1acf0489..226abf154171 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -68,6 +68,8 @@ #include +#include + namespace NKikimr::NSchemeShard::NBackground { struct TEvListRequest; } @@ -383,7 +385,7 @@ class TSchemeShard TDuration StatsMaxExecuteTime; TDuration StatsBatchTimeout; ui32 StatsMaxBatchSize = 0; - THashMap InFlightLimits; + absl::flat_hash_map InFlightLimits; // time when we opened the batch bool TableStatsBatchScheduled = false; @@ -995,7 +997,7 @@ class TSchemeShard void RemoveBackgroundCleaning(const TPathId& pathId); std::optional ResolveTempDirInfo(const TPathId& pathId); - void UpdateShardMetrics(const TShardIdx& shardIdx, const TPartitionStats& newStats); + void UpdateShardMetrics(const TShardIdx& shardIdx, const TPartitionStats& newStats, TInstant now); void RemoveShardMetrics(const TShardIdx& shardIdx); NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardCompactionInfo& info); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index acdc4924587c..d3f41018e899 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -278,6 +278,31 @@ void TSubDomainInfo::AggrDiskSpaceUsage(IQuotaCounters* counters, const TPartiti } } +void TSubDomainInfo::AggrDiskSpaceUsage(IQuotaCounters* counters, const TDiskSpaceUsageDelta& diskSpaceUsageDelta) { + // see filling of diskSpaceUsageDelta in UpdateShardStats() + for (const auto& [poolKind, delta] : diskSpaceUsageDelta) { + if (poolKind.empty()) { + // total space + DiskSpaceUsage.Tables.DataSize += delta.DataSize; + counters->ChangeDiskSpaceTablesDataBytes(delta.DataSize); + + DiskSpaceUsage.Tables.IndexSize += delta.IndexSize; + counters->ChangeDiskSpaceTablesIndexBytes(delta.IndexSize); + + i64 oldTotalBytes = DiskSpaceUsage.Tables.TotalSize; + DiskSpaceUsage.Tables.TotalSize = DiskSpaceUsage.Tables.DataSize + DiskSpaceUsage.Tables.IndexSize; + i64 newTotalBytes = DiskSpaceUsage.Tables.TotalSize; + counters->ChangeDiskSpaceTablesTotalBytes(newTotalBytes - oldTotalBytes); + } else { + // space separated by storage pool kinds + auto& r = DiskSpaceUsage.StoragePoolsUsage[poolKind]; + r.DataSize += delta.DataSize; + r.IndexSize += delta.IndexSize; + counters->AddDiskSpaceTables(GetUserFacingStorageType(poolKind), delta.DataSize, delta.IndexSize); + } + } +} + void TSubDomainInfo::AggrDiskSpaceUsage(const TTopicStats& newAggr, const TTopicStats& oldAggr) { auto& topics = DiskSpaceUsage.Topics; topics.DataSize += (newAggr.DataSize - oldAggr.DataSize); @@ -1637,6 +1662,8 @@ void TTableInfo::FinishAlter() { partitionConfig.ClearShadowData(); } + IsExternalBlobsEnabled = PartitionConfigHasExternalBlobsEnabled(PartitionConfig()); + // Apply TTL params if (AlterData->TableDescriptionFull.Defined() && AlterData->TableDescriptionFull->HasTTLSettings()) { MutableTTLSettings().Swap(AlterData->TableDescriptionFull->MutableTTLSettings()); @@ -1755,16 +1782,18 @@ void TTableInfo::SetPartitioning(TVector&& newPartitioning) { } } -void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats) { - Stats.UpdateShardStats(datashardIdx, newStats); +void TTableInfo::UpdateShardStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx datashardIdx, const TPartitionStats& newStats, TInstant now) { + Stats.UpdateShardStats(diskSpaceUsageDelta, datashardIdx, newStats, now); } -void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats) { +void TTableAggregatedStats::UpdateShardStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx datashardIdx, const TPartitionStats& newStats, TInstant now) { + auto found = PartitionStats.find(datashardIdx); // Ignore stats from unknown datashard (it could have been split) - if (!PartitionStats.contains(datashardIdx)) + if (found == PartitionStats.end()) { return; + } - TPartitionStats& oldStats = PartitionStats[datashardIdx]; + TPartitionStats& oldStats = found->second; if (newStats.SeqNo <= oldStats.SeqNo) { // Ignore outdated message @@ -1784,29 +1813,52 @@ void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPart oldStats.RangeReadRows = 0; } - Aggregated.RowCount += (newStats.RowCount - oldStats.RowCount); - Aggregated.DataSize += (newStats.DataSize - oldStats.DataSize); - Aggregated.IndexSize += (newStats.IndexSize - oldStats.IndexSize); - Aggregated.ByKeyFilterSize += (newStats.ByKeyFilterSize - oldStats.ByKeyFilterSize); + // disk space related stuff + // first, total space aggregation + { + TStoragePoolStatsDelta delta{ + .DataSize = (static_cast(newStats.DataSize) - static_cast(oldStats.DataSize)), + .IndexSize = (static_cast(newStats.IndexSize) - static_cast(oldStats.IndexSize)), + }; + diskSpaceUsageDelta->emplace_back(TString(), delta); + Aggregated.DataSize += delta.DataSize; + Aggregated.IndexSize += delta.IndexSize; + } + // second, aggregation of space separated by storage pool kinds for (const auto& [poolKind, newStoragePoolStats] : newStats.StoragePoolsStats) { - auto& [dataSize, indexSize] = Aggregated.StoragePoolsStats[poolKind]; const auto* oldStoragePoolStats = oldStats.StoragePoolsStats.FindPtr(poolKind); // Missing old stats for a particular storage pool are interpreted as if this data // has just been written to the datashard and we need to increment the aggregate by the entire new stats' sizes. - dataSize += newStoragePoolStats.DataSize - (oldStoragePoolStats ? oldStoragePoolStats->DataSize : 0u); - indexSize += newStoragePoolStats.IndexSize - (oldStoragePoolStats ? oldStoragePoolStats->IndexSize : 0u); + TStoragePoolStatsDelta delta{ + .DataSize = (static_cast(newStoragePoolStats.DataSize) - (oldStoragePoolStats ? static_cast(oldStoragePoolStats->DataSize) : 0)), + .IndexSize = (static_cast(newStoragePoolStats.IndexSize) - (oldStoragePoolStats ? static_cast(oldStoragePoolStats->IndexSize) : 0)), + }; + diskSpaceUsageDelta->emplace_back(poolKind, delta); } for (const auto& [poolKind, oldStoragePoolStats] : oldStats.StoragePoolsStats) { if (const auto* newStoragePoolStats = newStats.StoragePoolsStats.FindPtr(poolKind); !newStoragePoolStats ) { - auto& [dataSize, indexSize] = Aggregated.StoragePoolsStats[poolKind]; // Missing new stats for a particular storage pool are interpreted as if this data // has been removed from the datashard and we need to subtract the old stats' sizes from the aggregate. - dataSize -= oldStoragePoolStats.DataSize; - indexSize -= oldStoragePoolStats.IndexSize; + TStoragePoolStatsDelta delta{ + .DataSize = -static_cast(oldStoragePoolStats.DataSize), + .IndexSize = -static_cast(oldStoragePoolStats.IndexSize), + }; + diskSpaceUsageDelta->emplace_back(poolKind, delta); } } + for (const auto& [poolKind, delta] : *diskSpaceUsageDelta) { + if (poolKind.empty()) { + continue; + } + auto& r = Aggregated.StoragePoolsStats[poolKind]; + r.DataSize += delta.DataSize; + r.IndexSize += delta.IndexSize; + } + + Aggregated.RowCount += (newStats.RowCount - oldStats.RowCount); + Aggregated.ByKeyFilterSize += (newStats.ByKeyFilterSize - oldStats.ByKeyFilterSize); Aggregated.LastAccessTime = Max(Aggregated.LastAccessTime, newStats.LastAccessTime); Aggregated.LastUpdateTime = Max(Aggregated.LastUpdateTime, newStats.LastUpdateTime); Aggregated.ImmediateTxCompleted += (newStats.ImmediateTxCompleted - oldStats.ImmediateTxCompleted); @@ -1824,7 +1876,6 @@ void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPart i64 cpuUsageDelta = newStats.GetCurrentRawCpuUsage() - oldStats.GetCurrentRawCpuUsage(); i64 prevCpuUsage = Aggregated.GetCurrentRawCpuUsage(); ui64 newAggregatedCpuUsage = std::max(0, prevCpuUsage + cpuUsageDelta); - TInstant now = AppData()->TimeProvider->Now(); Aggregated.SetCurrentRawCpuUsage(newAggregatedCpuUsage, now); Aggregated.Memory += (newStats.Memory - oldStats.Memory); Aggregated.Network += (newStats.Network - oldStats.Network); @@ -1859,10 +1910,10 @@ void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPart UpdatedStats.insert(datashardIdx); } -void TAggregatedStats::UpdateTableStats(TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats) { +void TAggregatedStats::UpdateTableStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats, TInstant now) { auto& tableStats = TableStats[pathId]; tableStats.PartitionStats[shardIdx]; // insert if none - tableStats.UpdateShardStats(shardIdx, newStats); + tableStats.UpdateShardStats(diskSpaceUsageDelta, shardIdx, newStats, now); } void TTableInfo::RegisterSplitMergeOp(TOperationId opId, const TTxState& txState) { @@ -1931,6 +1982,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, TShardIdx shardIdx, TVector& shardsToMerge, THashSet& partOwners, ui64& totalSize, float& totalLoad, float cpuUsageThreshold, const TTableInfo* mainTableForIndex, + TInstant now, TString& reason) const { if (ExpectedPartitionCount + 1 - shardsToMerge.size() <= GetMinPartitionsCount()) { @@ -1970,7 +2022,6 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, } // Check if we can try merging by load - TInstant now = AppData()->TimeProvider->Now(); TDuration minUptime = TDuration::Seconds(splitSettings.MergeByLoadMinUptimeSec); if (!canMerge && IsMergeByLoadEnabled(mainTableForIndex) && stats->StartTime && stats->StartTime + minUptime < now) { reason = "merge by load"; @@ -2018,6 +2069,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, const TForceShardSplitSettings& forceShardSplitSettings, TShardIdx shardIdx, const TTabletId& tabletId, TVector& shardsToMerge, const TTableInfo* mainTableForIndex, + TInstant now, TString& reason) const { // Don't split/merge backup tables @@ -2057,7 +2109,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, TString shardMergeReason; // Make sure we can actually merge current shard first - if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad, cpuMergeThreshold, mainTableForIndex, shardMergeReason)) { + if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad, cpuMergeThreshold, mainTableForIndex, now, shardMergeReason)) { return false; } @@ -2065,7 +2117,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, << " " << shardMergeReason; for (i64 pi = partitionIdx - 1; pi >= 0; --pi) { - if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, cpuMergeThreshold, mainTableForIndex, shardMergeReason)) { + if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, cpuMergeThreshold, mainTableForIndex, now, shardMergeReason)) { break; } } @@ -2073,7 +2125,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings, Reverse(shardsToMerge.begin(), shardsToMerge.end()); for (ui64 pi = partitionIdx + 1; pi < GetPartitions().size(); ++pi) { - if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, cpuMergeThreshold, mainTableForIndex, shardMergeReason)) { + if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, cpuMergeThreshold, mainTableForIndex, now, shardMergeReason)) { break; } } @@ -2111,14 +2163,11 @@ bool TTableInfo::CheckSplitByLoad( } // Ignore stats from unknown datashard (it could have been split) - if (!Stats.PartitionStats.contains(shardIdx)) { + const auto* stats = Stats.PartitionStats.FindPtr(shardIdx); + if (!stats) { reason = "UnknownDataShard"; return false; } - if (!Shard2PartitionIdx.contains(shardIdx)) { - reason = "ShardNotInIndex"; - return false; - } if (!IsSplitByLoadEnabled(mainTableForIndex)) { reason = "SplitByLoadNotEnabledForTable"; @@ -2155,18 +2204,17 @@ bool TTableInfo::CheckSplitByLoad( // operations (which reduce the expected partition count). const ui64 effectiveShardCount = Max(ExpectedPartitionCount, Stats.PartitionStats.size()); - const auto& stats = *Stats.PartitionStats.FindPtr(shardIdx); if (rowCount < MIN_ROWS_FOR_SPLIT_BY_LOAD || dataSize < MIN_SIZE_FOR_SPLIT_BY_LOAD || effectiveShardCount >= maxShards || - stats.GetCurrentRawCpuUsage() < cpuUsageThreshold * 1000000) + stats->GetCurrentRawCpuUsage() < cpuUsageThreshold * 1000000) { reason = TStringBuilder() << "ConditionsNotMet" << " rowCount: " << rowCount << " minRowCount: " << MIN_ROWS_FOR_SPLIT_BY_LOAD << " shardSize: " << dataSize << " minShardSize: " << MIN_SIZE_FOR_SPLIT_BY_LOAD << " shardCount: " << Stats.PartitionStats.size() << " expectedShardCount: " << ExpectedPartitionCount << " maxShardCount: " << maxShards - << " cpuUsage: " << stats.GetCurrentRawCpuUsage() << " cpuUsageThreshold: " << cpuUsageThreshold * 1000000; + << " cpuUsage: " << stats->GetCurrentRawCpuUsage() << " cpuUsageThreshold: " << cpuUsageThreshold * 1000000; return false; } @@ -2178,7 +2226,7 @@ bool TTableInfo::CheckSplitByLoad( << "shardCount: " << Stats.PartitionStats.size() << ", " << "expectedShardCount: " << ExpectedPartitionCount << ", " << "maxShardCount: " << maxShards << ", " - << "cpuUsage: " << stats.GetCurrentRawCpuUsage() << ", " + << "cpuUsage: " << stats->GetCurrentRawCpuUsage() << ", " << "cpuUsageThreshold: " << cpuUsageThreshold * 1000000 << ")"; return true; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 8cc1cd3bf085..24d2887cc103 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -394,6 +394,12 @@ struct TPartitionStats { ui64 CPU = 0; }; +struct TStoragePoolStatsDelta { + i64 DataSize = 0; + i64 IndexSize = 0; +}; +using TDiskSpaceUsageDelta = TVector>; + struct TTableAggregatedStats { TPartitionStats Aggregated; THashMap PartitionStats; @@ -405,13 +411,13 @@ struct TTableAggregatedStats { return Aggregated.PartCount && UpdatedStats.size() == Aggregated.PartCount; } - void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats); + void UpdateShardStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx datashardIdx, const TPartitionStats& newStats, TInstant now); }; struct TAggregatedStats : public TTableAggregatedStats { THashMap TableStats; - void UpdateTableStats(TShardIdx datashardIdx, const TPathId& pathId, const TPartitionStats& newStats); + void UpdateTableStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx datashardIdx, const TPathId& pathId, const TPartitionStats& newStats, TInstant now); }; struct TSubDomainInfo; @@ -530,6 +536,8 @@ struct TTableInfo : public TSimpleRefCount { THashMap PerShardPartitionConfig; + bool IsExternalBlobsEnabled = false; + const NKikimrSchemeOp::TPartitionConfig& PartitionConfig() const { return TableDescription.GetPartitionConfig(); } NKikimrSchemeOp::TPartitionConfig& MutablePartitionConfig() { return *TableDescription.MutablePartitionConfig(); } @@ -638,6 +646,7 @@ struct TTableInfo : public TSimpleRefCount { , IsRestore(alterData.IsRestore) { TableDescription.Swap(alterData.TableDescriptionFull.Get()); + IsExternalBlobsEnabled = PartitionConfigHasExternalBlobsEnabled(TableDescription.GetPartitionConfig()); } static TTableInfo::TPtr DeepCopy(const TTableInfo& other) { @@ -731,7 +740,7 @@ struct TTableInfo : public TSimpleRefCount { ShardsStatsDetached = true; } - void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats); + void UpdateShardStats(TDiskSpaceUsageDelta* diskSpaceUsageDelta, TShardIdx datashardIdx, const TPartitionStats& newStats, TInstant now); void RegisterSplitMergeOp(TOperationId txId, const TTxState& txState); @@ -755,12 +764,12 @@ struct TTableInfo : public TSimpleRefCount { const TForceShardSplitSettings& forceShardSplitSettings, TShardIdx shardIdx, TVector& shardsToMerge, THashSet& partOwners, ui64& totalSize, float& totalLoad, - float cpuUsageThreshold, const TTableInfo* mainTableForIndex, TString& reason) const; + float cpuUsageThreshold, const TTableInfo* mainTableForIndex, TInstant now, TString& reason) const; bool CheckCanMergePartitions(const TSplitSettings& splitSettings, const TForceShardSplitSettings& forceShardSplitSettings, TShardIdx shardIdx, const TTabletId& tabletId, TVector& shardsToMerge, - const TTableInfo* mainTableForIndex, TString& reason) const; + const TTableInfo* mainTableForIndex, TInstant now, TString& reason) const; bool CheckSplitByLoad( const TSplitSettings& splitSettings, TShardIdx shardIdx, @@ -773,7 +782,7 @@ struct TTableInfo : public TSimpleRefCount { return false; } // Auto split is always enabled, unless table is using external blobs - return !PartitionConfigHasExternalBlobsEnabled(PartitionConfig()); + return (IsExternalBlobsEnabled == false); } bool IsMergeBySizeEnabled(const TForceShardSplitSettings& params) const { @@ -819,7 +828,7 @@ struct TTableInfo : public TSimpleRefCount { bool IsSplitByLoadEnabled(const TTableInfo* mainTableForIndex) const { // We cannot split when external blobs are enabled - if (PartitionConfigHasExternalBlobsEnabled(PartitionConfig())) { + if (IsExternalBlobsEnabled) { return false; } @@ -2027,6 +2036,7 @@ struct TSubDomainInfo: TSimpleRefCount { } void AggrDiskSpaceUsage(IQuotaCounters* counters, const TPartitionStats& newAggr, const TPartitionStats& oldAggr = {}); + void AggrDiskSpaceUsage(IQuotaCounters* counters, const TDiskSpaceUsageDelta& delta); void AggrDiskSpaceUsage(const TTopicStats& newAggr, const TTopicStats& oldAggr = {});