1414#include < utility>
1515
1616#include " absl/cleanup/cleanup.h"
17+ #include " absl/debugging/stacktrace.h"
18+ #include " absl/flags/internal/flag.h"
1719#include " absl/strings/numbers.h"
1820#include " base/flags.h"
1921#include " base/logging.h"
@@ -40,6 +42,8 @@ ABSL_RETIRED_FLAG(uint32_t, allow_partial_sync_with_lsn_diff, 0,
4042ABSL_DECLARE_FLAG (bool , info_replication_valkey_compatible);
4143ABSL_DECLARE_FLAG (uint32_t , replication_timeout);
4244ABSL_DECLARE_FLAG (uint32_t , shard_repl_backlog_len);
45+ ABSL_FLAG (bool , experimental_force_takeover, false ,
46+ " Attempts to force takeover in case of stuck connections" );
4347
4448namespace dfly {
4549
@@ -466,6 +470,45 @@ std::optional<LSN> DflyCmd::ParseLsnVec(std::string_view last_master_lsn,
466470 return {lsn_vec[flow_id]};
467471}
468472
473+ void DflyCmd::ForceShutdownStuckConnections (uint64_t timeout) {
474+ vector<facade::Connection::WeakRef> conn_refs;
475+ auto cb = [&](unsigned thread_index, util::Connection* conn) {
476+ facade::Connection* dcon = static_cast <facade::Connection*>(conn);
477+ LOG (INFO) << dcon->DebugInfo ();
478+ // Kill Connection here
479+ facade::Connection* dfly_conn = static_cast <facade::Connection*>(conn);
480+ using Phase = facade::Connection::Phase;
481+ auto phase = dfly_conn->phase ();
482+ if (dfly_conn->cntx () && dfly_conn->cntx ()->replica_conn ) {
483+ return ;
484+ }
485+
486+ bool idle_read = phase == Phase::READ_SOCKET && dfly_conn->idle_time () > timeout;
487+
488+ bool stuck_sending = dfly_conn->IsSending () && dfly_conn->GetSendWaitTimeSec () > timeout;
489+
490+ if (idle_read || stuck_sending) {
491+ LOG (INFO) << " Connection check: " << dfly_conn->GetClientInfo ()
492+ << " , phase=" << static_cast <int >(phase) << " , idle_time=" << dfly_conn->idle_time ()
493+ << " , is_sending=" << dfly_conn->IsSending () << " , idle_read=" << idle_read
494+ << " , stuck_sending=" << stuck_sending;
495+ }
496+ conn_refs.push_back (dfly_conn->Borrow ());
497+ };
498+
499+ for (auto * listener : sf_->GetListeners ()) {
500+ listener->TraverseConnections (cb);
501+ }
502+
503+ VLOG (1 ) << " Found " << conn_refs.size () << " stucked connections " ;
504+ for (auto & ref : conn_refs) {
505+ facade::Connection* conn = ref.Get ();
506+ if (conn) {
507+ conn->ShutdownSelfBlocking ();
508+ }
509+ }
510+ }
511+
469512// DFLY TAKEOVER <timeout_sec> [SAVE] <sync_id>
470513// timeout_sec - number of seconds to wait for TAKEOVER to converge.
471514// SAVE option is used only by tests.
@@ -506,7 +549,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
506549 LOG (INFO) << " Takeover initiated, locking down the database." ;
507550 absl::Duration timeout_dur = absl::Seconds (timeout);
508551 absl::Time end_time = absl::Now () + timeout_dur;
509- AggregateStatus status;
552+ OpStatus status = OpStatus::OK ;
510553
511554 // We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
512555 // after this function exits but before the actual shutdown.
@@ -520,13 +563,22 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
520563 LOG (WARNING) << " Couldn't wait for commands to finish dispatching. " << timeout_dur;
521564 status = OpStatus::TIMED_OUT;
522565
523- auto cb = [&](unsigned thread_index, util::Connection* conn) {
524- facade::Connection* dcon = static_cast <facade::Connection*>(conn);
525- LOG (INFO) << dcon->DebugInfo ();
526- };
527-
528- for (auto * listener : sf_->GetListeners ()) {
529- listener->TraverseConnections (cb);
566+ // Force takeover on the same duration if flag is set
567+ if (absl::GetFlag (FLAGS_experimental_force_takeover)) {
568+ ForceShutdownStuckConnections (uint64_t (timeout));
569+
570+ // Safety net.
571+ facade::DispatchTracker tracker{sf_->GetNonPriviligedListeners (), cntx->conn (), false , false };
572+ shard_set->pool ()->AwaitFiberOnAll ([&](unsigned index, auto * pb) {
573+ sf_->CancelBlockingOnThread ();
574+ tracker.TrackOnThread ();
575+ });
576+
577+ status = OpStatus::OK;
578+ if (!tracker.Wait (timeout_dur)) {
579+ LOG (ERROR) << " Could not force execute takeover" ;
580+ status = OpStatus::TIMED_OUT;
581+ }
530582 }
531583 }
532584
@@ -540,10 +592,11 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
540592 });
541593
542594 atomic_bool catchup_success = true ;
543- if (* status == OpStatus::OK) {
595+ if (status == OpStatus::OK) {
544596 dfly::SharedLock lk{replica_ptr->shared_mu };
545- auto cb = [replica_ptr = replica_ptr, end_time, &catchup_success](EngineShard* shard) {
546- if (!WaitReplicaFlowToCatchup (end_time, replica_ptr.get (), shard)) {
597+ auto time = end_time + timeout_dur;
598+ auto cb = [replica_ptr = replica_ptr, time, &catchup_success](EngineShard* shard) {
599+ if (!WaitReplicaFlowToCatchup (time, replica_ptr.get (), shard)) {
547600 catchup_success.store (false );
548601 }
549602 };
@@ -552,8 +605,9 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
552605
553606 VLOG (1 ) << " WaitReplicaFlowToCatchup done" ;
554607
555- if (* status != OpStatus::OK || !catchup_success.load ()) {
608+ if (status != OpStatus::OK || !catchup_success.load ()) {
556609 sf_->service ().SwitchState (GlobalState::TAKEN_OVER, GlobalState::ACTIVE);
610+ LOG (INFO) << status << " " << catchup_success.load () << " " << &status;
557611 return rb->SendError (" Takeover failed!" );
558612 }
559613
0 commit comments