Skip to content

Commit 33f5bfa

Browse files
authored
Best effort to make CA termination exception-safe (#29073)
1 parent a708b34 commit 33f5bfa

File tree

1 file changed

+58
-44
lines changed

1 file changed

+58
-44
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -530,70 +530,84 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
530530
}
531531

532532
void Terminate(bool success, const TIssues& issues) {
533-
if (MemoryQuota) {
534-
MemoryQuota->TryReleaseQuota();
533+
// This method is exception-unsafe - prevent calling it twice
534+
if (Terminated) {
535+
CA_LOG_W("Compute Actor " << this->SelfId() << " for task " << Task.GetId() << " tried to call Terminate twice: "
536+
<< " success: " << success
537+
<< " issues: " << issues.ToOneLineString());
538+
return;
535539
}
536540

537-
if (Channels) {
538-
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Channels->SelfId(), this->SelfId(),
539-
new NActors::TEvents::TEvPoison);
540-
Channels->Receive(handle);
541-
}
541+
try {
542+
if (MemoryQuota) {
543+
MemoryQuota->TryReleaseQuota();
544+
}
542545

543-
if (Checkpoints) {
544-
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), this->SelfId(),
545-
new NActors::TEvents::TEvPoison);
546-
Checkpoints->Receive(handle);
547-
}
546+
if (Channels) {
547+
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Channels->SelfId(), this->SelfId(),
548+
new NActors::TEvents::TEvPoison);
549+
Channels->Receive(handle);
550+
}
548551

549-
{
550-
auto guard = BindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound
552+
if (Checkpoints) {
553+
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), this->SelfId(),
554+
new NActors::TEvents::TEvPoison);
555+
Checkpoints->Receive(handle);
556+
}
557+
558+
{
559+
auto guard = BindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound
551560

552-
for (auto& [_, source] : SourcesMap) {
553-
if (source.Actor) {
554-
source.AsyncInput->PassAway();
561+
for (auto& [_, source] : SourcesMap) {
562+
if (source.Actor) {
563+
source.AsyncInput->PassAway();
564+
}
555565
}
556-
}
557566

558-
for (auto& [_, transform] : InputTransformsMap) {
559-
if (transform.Actor) {
560-
transform.AsyncInput->PassAway();
567+
for (auto& [_, transform] : InputTransformsMap) {
568+
if (transform.Actor) {
569+
transform.AsyncInput->PassAway();
570+
}
561571
}
562-
}
563572

564-
for (auto& [_, sink] : SinksMap) {
565-
if (sink.Actor) {
566-
sink.AsyncOutput->PassAway();
573+
for (auto& [_, sink] : SinksMap) {
574+
if (sink.Actor) {
575+
sink.AsyncOutput->PassAway();
576+
}
567577
}
568-
}
569578

570-
for (auto& [_, transform] : OutputTransformsMap) {
571-
if (transform.Actor) {
572-
transform.AsyncOutput->PassAway();
579+
for (auto& [_, transform] : OutputTransformsMap) {
580+
if (transform.Actor) {
581+
transform.AsyncOutput->PassAway();
582+
}
573583
}
574-
}
575584

576-
if (OutputChannelSize) {
577-
OutputChannelSize->Sub(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
578-
}
585+
if (OutputChannelSize) {
586+
OutputChannelSize->Sub(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
587+
}
579588

580-
for (auto& [_, outputChannel] : OutputChannelsMap) {
581-
if (outputChannel.Channel) {
582-
outputChannel.Channel->Terminate();
589+
for (auto& [_, outputChannel] : OutputChannelsMap) {
590+
if (outputChannel.Channel) {
591+
outputChannel.Channel->Terminate();
592+
}
583593
}
594+
595+
// free MKQL memory then destroy TaskRunner and Allocator
596+
Free();
584597
}
585598

586-
// free MKQL memory then destroy TaskRunner and Allocator
587-
Free();
588-
}
599+
if (RuntimeSettings.TerminateHandler) {
600+
RuntimeSettings.TerminateHandler(success, issues);
601+
}
589602

590-
if (RuntimeSettings.TerminateHandler) {
591-
RuntimeSettings.TerminateHandler(success, issues);
603+
Terminated = true;
604+
this->PassAway();
605+
} catch (const std::exception&) {
606+
// Try to guarantee actor destruction to prevent recursive exception throwing - assume that basic PassAway doesn't throw.
607+
NActors::IActor::PassAway();
608+
throw;
592609
}
593610

594-
Terminated = true;
595-
this->PassAway();
596-
597611
DoTerminateImpl();
598612
}
599613

0 commit comments

Comments
 (0)