Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 58 additions & 44 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,70 +529,84 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

void Terminate(bool success, const TIssues& issues) {
if (MemoryQuota) {
MemoryQuota->TryReleaseQuota();
// This method is exception-unsafe - prevent calling it twice
if (Terminated) {
CA_LOG_W("Compute Actor " << this->SelfId() << " for task " << Task.GetId() << " tried to call Terminate twice: "
<< " success: " << success
<< " issues: " << issues.ToOneLineString());
return;
}

if (Channels) {
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Channels->SelfId(), this->SelfId(),
new NActors::TEvents::TEvPoison);
Channels->Receive(handle);
}
try {
if (MemoryQuota) {
MemoryQuota->TryReleaseQuota();
}

if (Checkpoints) {
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), this->SelfId(),
new NActors::TEvents::TEvPoison);
Checkpoints->Receive(handle);
}
if (Channels) {
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Channels->SelfId(), this->SelfId(),
new NActors::TEvents::TEvPoison);
Channels->Receive(handle);
}

{
auto guard = BindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound
if (Checkpoints) {
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), this->SelfId(),
new NActors::TEvents::TEvPoison);
Checkpoints->Receive(handle);
}

{
auto guard = BindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound

for (auto& [_, source] : SourcesMap) {
if (source.Actor) {
source.AsyncInput->PassAway();
for (auto& [_, source] : SourcesMap) {
if (source.Actor) {
source.AsyncInput->PassAway();
}
}
}

for (auto& [_, transform] : InputTransformsMap) {
if (transform.Actor) {
transform.AsyncInput->PassAway();
for (auto& [_, transform] : InputTransformsMap) {
if (transform.Actor) {
transform.AsyncInput->PassAway();
}
}
}

for (auto& [_, sink] : SinksMap) {
if (sink.Actor) {
sink.AsyncOutput->PassAway();
for (auto& [_, sink] : SinksMap) {
if (sink.Actor) {
sink.AsyncOutput->PassAway();
}
}
}

for (auto& [_, transform] : OutputTransformsMap) {
if (transform.Actor) {
transform.AsyncOutput->PassAway();
for (auto& [_, transform] : OutputTransformsMap) {
if (transform.Actor) {
transform.AsyncOutput->PassAway();
}
}
}

if (OutputChannelSize) {
OutputChannelSize->Sub(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
}
if (OutputChannelSize) {
OutputChannelSize->Sub(OutputChannelsMap.size() * MemoryLimits.ChannelBufferSize);
}

for (auto& [_, outputChannel] : OutputChannelsMap) {
if (outputChannel.Channel) {
outputChannel.Channel->Terminate();
for (auto& [_, outputChannel] : OutputChannelsMap) {
if (outputChannel.Channel) {
outputChannel.Channel->Terminate();
}
}

// free MKQL memory then destroy TaskRunner and Allocator
Free();
}

// free MKQL memory then destroy TaskRunner and Allocator
Free();
}
if (RuntimeSettings.TerminateHandler) {
RuntimeSettings.TerminateHandler(success, issues);
}

if (RuntimeSettings.TerminateHandler) {
RuntimeSettings.TerminateHandler(success, issues);
Terminated = true;
this->PassAway();
} catch (const std::exception&) {
// Try to guarantee actor destruction to prevent recursive exception throwing - assume that basic PassAway doesn't throw.
NActors::IActor::PassAway();
throw;
}

Terminated = true;
this->PassAway();

DoTerminateImpl();
}

Expand Down
Loading