Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ ABSL_RETIRED_FLAG(uint32_t, allow_partial_sync_with_lsn_diff, 0,
ABSL_DECLARE_FLAG(bool, info_replication_valkey_compatible);
ABSL_DECLARE_FLAG(uint32_t, replication_timeout);
ABSL_DECLARE_FLAG(uint32_t, shard_repl_backlog_len);
ABSL_FLAG(bool, experimental_force_takeover, false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange @dranikpg

Maybe instead introduce a subcommand to takeover, like force instead of a flag ? That way if we have a failed Takeover we can follow up with Forced one ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we have the flag and improve TAKEOVER than adding those micro behaviors.

"Attempts to force takeover in case of stuck connections");

namespace dfly {

Expand Down Expand Up @@ -466,6 +468,49 @@ std::optional<LSN> DflyCmd::ParseLsnVec(std::string_view last_master_lsn,
return {lsn_vec[flow_id]};
}

void DflyCmd::ForceShutdownStuckConnections(uint64_t timeout) {
// per proactor map
vector<facade::Connection::WeakRef> conn_refs;

auto cb = [&](unsigned thread_index, util::Connection* conn) {
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
LOG(INFO) << dcon->DebugInfo();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe vlog? and more meaningful message

// Kill Connection here
facade::Connection* dfly_conn = static_cast<facade::Connection*>(conn);
using Phase = facade::Connection::Phase;
auto phase = dfly_conn->phase();
if (dfly_conn->cntx() && dfly_conn->cntx()->replica_conn) {
return;
}

bool idle_read = phase == Phase::READ_SOCKET && dfly_conn->idle_time() > timeout;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is idle_read really an interesting state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, being blocked on pipeline overflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but that's for the logs, we kill all the connections

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand actually. How idle read can happen for stuck connections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange because in DispatchSingle we block when pipeline is over limits but we set last_interaction_ = time(nullptr); after we block. This shall reflect that.

Either way I kill all the connections unconditionally. I added the logs because I wanted us to be aware of those stucked cases


bool stuck_sending = dfly_conn->IsSending() && dfly_conn->GetSendWaitTimeSec() > timeout;

if (idle_read || stuck_sending) {
LOG(INFO) << "Connection check: " << dfly_conn->GetClientInfo()
<< ", phase=" << static_cast<int>(phase) << ", idle_time=" << dfly_conn->idle_time()
<< ", is_sending=" << dfly_conn->IsSending() << ", idle_read=" << idle_read
<< ", stuck_sending=" << stuck_sending;
}
conn_refs.push_back(dfly_conn->Borrow());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shut the connections anyway regardless of if they are stuck on send or not. The rationale behind this is that a connection can become active at any point if the client starts reading from the socket. If that's the case, we will fail again because the connection won't show up as stuck. To mitigate this, I force shutdown all connections that were not closed previously

};

for (auto* listener : sf_->GetListeners()) {
if (listener->IsMainInterface()) {
listener->TraverseConnectionsOnThread(cb, UINT32_MAX, nullptr);
}
}

VLOG(1) << "Found " << conn_refs.size() << " stucked connections ";
for (auto& ref : conn_refs) {
facade::Connection* conn = ref.Get();
if (conn) {
conn->ShutdownSelfBlocking();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should do it from the connection thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, its unsafe to do so from foreign threads, WeakRef has it in the comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I will also sent a separate PR as we have this bug in another place as well

}
}
}

// DFLY TAKEOVER <timeout_sec> [SAVE] <sync_id>
// timeout_sec - number of seconds to wait for TAKEOVER to converge.
// SAVE option is used only by tests.
Expand Down Expand Up @@ -506,7 +551,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
LOG(INFO) << "Takeover initiated, locking down the database.";
absl::Duration timeout_dur = absl::Seconds(timeout);
absl::Time end_time = absl::Now() + timeout_dur;
AggregateStatus status;
OpStatus status = OpStatus::OK;

// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown.
Expand All @@ -520,13 +565,20 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur;
status = OpStatus::TIMED_OUT;

auto cb = [&](unsigned thread_index, util::Connection* conn) {
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
LOG(INFO) << dcon->DebugInfo();
};

for (auto* listener : sf_->GetListeners()) {
listener->TraverseConnections(cb);
// Force takeover on the same duration if flag is set
if (absl::GetFlag(FLAGS_experimental_force_takeover)) {
facade::DispatchTracker tracker{sf_->GetNonPriviligedListeners(), cntx->conn(), false, false};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really needed to wait again tbh. We forcefully kill connections and the ones that were in good state were already shutdown (because of the previous dispatch tracker

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connections stuck on socket writes are not all the kinds of bad connections

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, but I kill them all anyway. Even for connections stuck on sent, it could be that the client reads so the send gets unstuck. We should shutdown the connection regardless as we can't know when the client will read

shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto* pb) {
ForceShutdownStuckConnections(uint64_t(timeout));
sf_->CancelBlockingOnThread();
tracker.TrackOnThread();
});

status = OpStatus::OK;
if (!tracker.Wait(timeout_dur)) {
LOG(ERROR) << "Could not force execute takeover";
status = OpStatus::TIMED_OUT;
}
}
}

Expand All @@ -540,10 +592,11 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
});

atomic_bool catchup_success = true;
if (*status == OpStatus::OK) {
if (status == OpStatus::OK) {
dfly::SharedLock lk{replica_ptr->shared_mu};
auto cb = [replica_ptr = replica_ptr, end_time, &catchup_success](EngineShard* shard) {
if (!WaitReplicaFlowToCatchup(end_time, replica_ptr.get(), shard)) {
auto time = end_time + timeout_dur;
auto cb = [replica_ptr = replica_ptr, time, &catchup_success](EngineShard* shard) {
if (!WaitReplicaFlowToCatchup(time, replica_ptr.get(), shard)) {
catchup_success.store(false);
}
};
Expand All @@ -552,8 +605,9 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext

VLOG(1) << "WaitReplicaFlowToCatchup done";

if (*status != OpStatus::OK || !catchup_success.load()) {
if (status != OpStatus::OK || !catchup_success.load()) {
sf_->service().SwitchState(GlobalState::TAKEN_OVER, GlobalState::ACTIVE);
LOG(INFO) << status << " " << catchup_success.load() << " " << &status;
return rb->SendError("Takeover failed!");
}

Expand Down
5 changes: 5 additions & 0 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ class DflyCmd {
// Switch to stable state replication.
void StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* rb);

// Helper for takeover flow. Sometimes connections get stuck on send (because of pipelines)
// and this causes the takeover flow to fail because checkpoint messages are not processed.
// This function force shuts down those connection and allows the node to complete the takeover.
void ForceShutdownStuckConnections(uint64_t timeout);
Comment on lines +186 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't necessarily a pipeline, it can also be a connection stuck on a large send


// TAKEOVER <syncid>
// Shut this master down atomically with replica promotion.
void TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cntx);
Expand Down
58 changes: 58 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3666,3 +3666,61 @@ async def test_replica_of_self(async_client):

with pytest.raises(redis.exceptions.ResponseError):
await async_client.execute_command(f"replicaof 127.0.0.1 {port}")


@dfly_args({"proactor_threads": 2, "experimental_force_takeover": True})
async def test_takeover_with_stuck_connections(df_factory: DflyInstanceFactory):
master = df_factory.create()
master.start()

async_client = master.client()
await async_client.execute_command("debug populate 2000")

reader, writer = await asyncio.open_connection("127.0.0.1", master.port)
writer.write(f"client setname writer_test\n".encode())
await writer.drain()
assert "OK" in (await reader.readline()).decode()
size = 1024 * 1024
writer.write(f"SET a {'v'*size}\n".encode())
await writer.drain()

async def get_task():
while True:
# Will get killed by takeover because it's stucked
try:
writer.write(f"GET a\n".encode())
await writer.drain()
except:
return

await asyncio.sleep(0.1)

get = asyncio.create_task(get_task())

@assert_eventually(times=600)
async def wait_for_stuck_on_send():
clients = await async_client.client_list()
logging.info("wait_for_stuck_on_send clients: %s", clients)
phase = next(
(client["phase"] for client in clients if client["name"] == "writer_test"), None
)
assert phase == "send"

await wait_for_stuck_on_send()

replica = df_factory.create()
replica.start()

replica_cl = replica.client()

res = await replica_cl.execute_command(f"replicaof localhost {master.port}")
assert res == "OK"

# Wait for all replicas to transition into stable sync
async with async_timeout.timeout(240):
await wait_for_replicas_state(replica_cl)

res = await replica_cl.execute_command("REPLTAKEOVER 5")
assert res == "OK"

await get
Loading