|
6 | 6 | namespace NKikimr::NBridge { |
7 | 7 |
|
8 | 8 | TSyncerActor::TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TGroupId sourceGroupId, TGroupId targetGroupId, |
9 | | - std::shared_ptr<TSyncerDataStats> syncerDataStats) |
| 9 | + std::shared_ptr<TSyncerDataStats> syncerDataStats, TReplQuoter::TPtr syncRateQuoter, |
| 10 | + TBlobStorageGroupType sourceGroupType) |
10 | 11 | : Info(std::move(info)) |
11 | 12 | , SourceGroupId(sourceGroupId) |
12 | 13 | , TargetGroupId(targetGroupId) |
13 | 14 | , SyncerDataStats(std::move(syncerDataStats)) |
| 15 | + , SyncRateQuoter(std::move(syncRateQuoter)) |
| 16 | + , SourceGroupType(sourceGroupType) |
14 | 17 | { |
15 | 18 | Y_ABORT_UNLESS(Info); |
16 | 19 | Y_ABORT_UNLESS(Info->IsBridged()); |
@@ -368,7 +371,8 @@ namespace NKikimr::NBridge { |
368 | 371 | return false; |
369 | 372 | } |
370 | 373 |
|
371 | | - void TSyncerActor::IssueQuery(bool toTargetGroup, std::unique_ptr<IEventBase> ev, TQueryPayload payload) { |
| 374 | + void TSyncerActor::IssueQuery(bool toTargetGroup, std::unique_ptr<IEventBase> ev, TQueryPayload payload, |
| 375 | + ui64 quoterBytes) { |
372 | 376 | switch (ev->Type()) { |
373 | 377 | #define MSG(TYPE) \ |
374 | 378 | case TEvBlobStorage::TYPE: { \ |
@@ -399,11 +403,21 @@ namespace NKikimr::NBridge { |
399 | 403 | std::unique_ptr<IEventHandle> handle(CreateEventForBSProxy(SelfId(), |
400 | 404 | toTargetGroup ? TargetGroupId : SourceGroupId, ev.release(), cookie)); |
401 | 405 |
|
| 406 | + const TMonotonic now = TActivationContext::Monotonic(); |
| 407 | + const TDuration timeout = SyncRateQuoter && quoterBytes |
| 408 | + ? SyncRateQuoter->Take(now, quoterBytes) |
| 409 | + : TDuration::Zero(); |
| 410 | + const TMonotonic timestamp = now + timeout; |
| 411 | + |
402 | 412 | if (QueriesInFlight < MaxQueriesInFlight) { |
403 | | - TActivationContext::Send(handle.release()); |
| 413 | + if (now < timestamp) { |
| 414 | + TActivationContext::Schedule(timestamp, handle.release()); |
| 415 | + } else { |
| 416 | + TActivationContext::Send(handle.release()); |
| 417 | + } |
404 | 418 | ++QueriesInFlight; |
405 | 419 | } else { |
406 | | - PendingQueries.push_back(std::move(handle)); |
| 420 | + PendingQueries.emplace_back(std::move(handle), timestamp); |
407 | 421 | } |
408 | 422 | } |
409 | 423 |
|
@@ -479,8 +493,9 @@ namespace NKikimr::NBridge { |
479 | 493 | ++SyncerDataStats->BlobsDone; |
480 | 494 | } else if (r.Status == NKikimrProto::NODATA) { |
481 | 495 | // we have to query this blob and do full rewrite -- there was no data for it |
| 496 | + const ui64 quoterBytes = r.Id.BlobSize() * SourceGroupType.TotalPartCount() / SourceGroupType.DataParts(); |
482 | 497 | IssueQuery(false, std::make_unique<TEvBlobStorage::TEvGet>(r.Id, 0, 0, TInstant::Max(), |
483 | | - NKikimrBlobStorage::FastRead)); |
| 498 | + NKikimrBlobStorage::FastRead), {}, quoterBytes); |
484 | 499 | } else if (r.Status == NKikimrProto::ERROR) { |
485 | 500 | SyncerDataStats->BytesError += r.Id.BlobSize(); |
486 | 501 | ++SyncerDataStats->BlobsError; |
@@ -512,7 +527,13 @@ namespace NKikimr::NBridge { |
512 | 527 | --QueriesInFlight; |
513 | 528 | } else { |
514 | 529 | Y_ABORT_UNLESS(QueriesInFlight == MaxQueriesInFlight); |
515 | | - TActivationContext::Send(PendingQueries.front().release()); |
| 530 | + TMonotonic now = TActivationContext::Monotonic(); |
| 531 | + auto& [handle, timestamp] = PendingQueries.front(); |
| 532 | + if (now < timestamp) { |
| 533 | + TActivationContext::Schedule(timestamp, handle.release()); |
| 534 | + } else { |
| 535 | + TActivationContext::Send(handle.release()); |
| 536 | + } |
516 | 537 | PendingQueries.pop_front(); |
517 | 538 | } |
518 | 539 |
|
@@ -601,8 +622,10 @@ namespace NKikimr::NBridge { |
601 | 622 | ) |
602 | 623 |
|
603 | 624 | IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TGroupId sourceGroupId, TGroupId targetGroupId, |
604 | | - std::shared_ptr<TSyncerDataStats> syncerDataStats) { |
605 | | - return new TSyncerActor(std::move(info), sourceGroupId, targetGroupId, std::move(syncerDataStats)); |
| 625 | + std::shared_ptr<TSyncerDataStats> syncerDataStats, TReplQuoter::TPtr syncRateQuoter, |
| 626 | + TBlobStorageGroupType sourceGroupType) { |
| 627 | + return new TSyncerActor(std::move(info), sourceGroupId, targetGroupId, std::move(syncerDataStats), |
| 628 | + std::move(syncRateQuoter), sourceGroupType); |
606 | 629 | } |
607 | 630 |
|
608 | 631 | } // NKikimr::NBridge |
0 commit comments