@@ -40,6 +40,8 @@ ABSL_RETIRED_FLAG(uint32_t, allow_partial_sync_with_lsn_diff, 0,
4040ABSL_DECLARE_FLAG (bool , info_replication_valkey_compatible);
4141ABSL_DECLARE_FLAG (uint32_t , replication_timeout);
4242ABSL_DECLARE_FLAG (uint32_t , shard_repl_backlog_len);
43+ ABSL_FLAG (bool , experimental_force_takeover, false ,
44+ " Attempts to force takeover in case of stuck connections" );
4345
4446namespace dfly {
4547
@@ -466,6 +468,45 @@ std::optional<LSN> DflyCmd::ParseLsnVec(std::string_view last_master_lsn,
466468 return {lsn_vec[flow_id]};
467469}
468470
471+ void DflyCmd::ForceShutdownStuckConnections (uint64_t timeout) {
472+ vector<facade::Connection::WeakRef> conn_refs;
473+ auto cb = [&](unsigned thread_index, util::Connection* conn) {
474+ facade::Connection* dcon = static_cast <facade::Connection*>(conn);
475+ LOG (INFO) << dcon->DebugInfo ();
476+ // Kill Connection here
477+ facade::Connection* dfly_conn = static_cast <facade::Connection*>(conn);
478+ using Phase = facade::Connection::Phase;
479+ auto phase = dfly_conn->phase ();
480+ if (dfly_conn->cntx () && dfly_conn->cntx ()->replica_conn ) {
481+ return ;
482+ }
483+
484+ bool idle_read = phase == Phase::READ_SOCKET && dfly_conn->idle_time () > timeout;
485+
486+ bool stuck_sending = dfly_conn->IsSending () && dfly_conn->GetSendWaitTimeSec () > timeout;
487+
488+ if (idle_read || stuck_sending) {
489+ LOG (INFO) << " Connection check: " << dfly_conn->GetClientInfo ()
490+ << " , phase=" << static_cast <int >(phase) << " , idle_time=" << dfly_conn->idle_time ()
491+ << " , is_sending=" << dfly_conn->IsSending () << " , idle_read=" << idle_read
492+ << " , stuck_sending=" << stuck_sending;
493+ }
494+ conn_refs.push_back (dfly_conn->Borrow ());
495+ };
496+
497+ for (auto * listener : sf_->GetListeners ()) {
498+ listener->TraverseConnections (cb);
499+ }
500+
501+ VLOG (1 ) << " Found " << conn_refs.size () << " stucked connections " ;
502+ for (auto & ref : conn_refs) {
503+ facade::Connection* conn = ref.Get ();
504+ if (conn) {
505+ conn->ShutdownSelfBlocking ();
506+ }
507+ }
508+ }
509+
469510// DFLY TAKEOVER <timeout_sec> [SAVE] <sync_id>
470511// timeout_sec - number of seconds to wait for TAKEOVER to converge.
471512// SAVE option is used only by tests.
@@ -506,7 +547,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
506547 LOG (INFO) << " Takeover initiated, locking down the database." ;
507548 absl::Duration timeout_dur = absl::Seconds (timeout);
508549 absl::Time end_time = absl::Now () + timeout_dur;
509- AggregateStatus status;
550+ OpStatus status = OpStatus::OK ;
510551
511552 // We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
512553 // after this function exits but before the actual shutdown.
@@ -520,13 +561,22 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
520561 LOG (WARNING) << " Couldn't wait for commands to finish dispatching. " << timeout_dur;
521562 status = OpStatus::TIMED_OUT;
522563
523- auto cb = [&](unsigned thread_index, util::Connection* conn) {
524- facade::Connection* dcon = static_cast <facade::Connection*>(conn);
525- LOG (INFO) << dcon->DebugInfo ();
526- };
527-
528- for (auto * listener : sf_->GetListeners ()) {
529- listener->TraverseConnections (cb);
564+ // Force takeover on the same duration if flag is set
565+ if (absl::GetFlag (FLAGS_experimental_force_takeover)) {
566+ ForceShutdownStuckConnections (uint64_t (timeout));
567+
568+ // Safety net.
569+ facade::DispatchTracker tracker{sf_->GetNonPriviligedListeners (), cntx->conn (), false , false };
570+ shard_set->pool ()->AwaitFiberOnAll ([&](unsigned index, auto * pb) {
571+ sf_->CancelBlockingOnThread ();
572+ tracker.TrackOnThread ();
573+ });
574+
575+ status = OpStatus::OK;
576+ if (!tracker.Wait (timeout_dur)) {
577+ LOG (ERROR) << " Could not force execute takeover" ;
578+ status = OpStatus::TIMED_OUT;
579+ }
530580 }
531581 }
532582
@@ -540,10 +590,11 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
540590 });
541591
542592 atomic_bool catchup_success = true ;
543- if (* status == OpStatus::OK) {
593+ if (status == OpStatus::OK) {
544594 dfly::SharedLock lk{replica_ptr->shared_mu };
545- auto cb = [replica_ptr = replica_ptr, end_time, &catchup_success](EngineShard* shard) {
546- if (!WaitReplicaFlowToCatchup (end_time, replica_ptr.get (), shard)) {
595+ auto time = end_time + timeout_dur;
596+ auto cb = [replica_ptr = replica_ptr, time, &catchup_success](EngineShard* shard) {
597+ if (!WaitReplicaFlowToCatchup (time, replica_ptr.get (), shard)) {
547598 catchup_success.store (false );
548599 }
549600 };
@@ -552,8 +603,9 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
552603
553604 VLOG (1 ) << " WaitReplicaFlowToCatchup done" ;
554605
555- if (* status != OpStatus::OK || !catchup_success.load ()) {
606+ if (status != OpStatus::OK || !catchup_success.load ()) {
556607 sf_->service ().SwitchState (GlobalState::TAKEN_OVER, GlobalState::ACTIVE);
608+ LOG (INFO) << status << " " << catchup_success.load () << " " << &status;
557609 return rb->SendError (" Takeover failed!" );
558610 }
559611
0 commit comments