Skip to content
This repository was archived by the owner on Jan 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions lib/moped/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,23 @@ def write(operations)
#
# @since 1.2.9
def read_data(socket, length)
data = socket.read(length)
unless data
raise Errors::ConnectionFailure.new(
"Attempted to read #{length} bytes from the socket but nothing was returned."
)
# Block on data to read for op_timeout seconds
# using the suggested implementation of http://www.ruby-doc.org/core-2.1.3/Kernel.html#method-i-select
# to work with SSL connections
time_left = op_timeout = @options[:op_timeout] || timeout
begin
raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0
data = socket.read_nonblock(length)
rescue IO::WaitReadable
Kernel::select([socket], nil, [socket], 0.1)
retry
rescue IO::WaitWritable
Kernel::select(nil, [socket], [socket], 0.1)
retry
rescue SystemCallError, IOError => e
raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
end

if data.length < length
data << read_data(socket, length - data.length)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class PoolTimeout < RuntimeError; end
# Generic error class for exceptions related to connection failures.
class ConnectionFailure < StandardError; end

# Generic error class for exceptions related to read timeout failures.
class OperationTimeout < StandardError; end

# Raised when a database name is invalid.
class InvalidDatabaseName < StandardError; end

Expand Down
3 changes: 2 additions & 1 deletion lib/moped/failover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module Failover
Errors::ConnectionFailure => Retry,
Errors::CursorNotFound => Ignore,
Errors::OperationFailure => Reconfigure,
Errors::QueryFailure => Reconfigure
Errors::QueryFailure => Reconfigure,
Errors::PoolTimeout => Retry
}.freeze

# Get the appropriate failover handler given the provided exception.
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Failover
module Retry
extend self

# Executes the failover strategy. In the case of retyr, we disconnect and
# Executes the failover strategy. In the case of retry, we disconnect and
# reconnect, then try the operation one more time.
#
# @example Execute the retry strategy.
Expand All @@ -24,11 +24,13 @@ module Retry
#
# @since 2.0.0
def execute(exception, node)
node.disconnect
node.disconnect unless exception.is_a?(Errors::PoolTimeout)
begin
node.connection do |conn|
yield(conn) if block_given?
end
rescue Errors::PoolTimeout => e
raise Errors::ConnectionFailure.new e
rescue Exception => e
node.down!
raise(e)
Expand Down
16 changes: 14 additions & 2 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,14 @@ def connected?
#
# @since 2.0.0
def connection
pool.with do |conn|
yield(conn)
connection_acquired = false
begin
pool.with do |conn|
connection_acquired = true
yield(conn)
end
rescue Timeout::Error => e
raise connection_acquired ? e : Errors::PoolTimeout.new(e)
end
end

Expand Down Expand Up @@ -156,6 +162,12 @@ def down!
Connection::Manager.shutdown(self)
end

def flush_connection_credentials
connection do |conn|
conn.credentials.clear
end
end

# Yields the block if a connection can be established, retrying when a
# connection error is raised.
#
Expand Down
5 changes: 3 additions & 2 deletions lib/moped/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))
authentication_error = e.is_a?(Errors::PotentialReconfiguration) && e.message.match(/not (master|primary|authorized)/i)
raise e if e.is_a?(Errors::PotentialReconfiguration) && !authentication_error

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.nodes.each { |node| node.flush_connection_credentials } if authentication_error
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
Expand Down
5 changes: 5 additions & 0 deletions lib/moped/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ def logout
# @since 2.0.0
option(:timeout).allow(Optionable.any(Numeric))

# Setup validation of allowed timeout options. (Any numeric)
#
# @since 2.0.0
option(:op_timeout).allow(Optionable.any(Numeric))

# Pass an object that responds to instrument as an instrumenter.
#
# @since 2.0.0
Expand Down
43 changes: 43 additions & 0 deletions spec/moped/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,47 @@
end
end
end

describe "#connection" do
let(:node) do
described_class.new("127.0.0.1:27017", pool_size: 1, pool_timeout: 0.1)
end

context "when take a long time to get a connection from pool" do
it "raise a Errors::PoolTimeout error" do
expect {

exception = nil
100.times.map do |i|
Thread.new do
begin
node.connection do |conn|
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
end
rescue => e
exception = e if exception.nil?
end
end
end.each {|t| t.join }
raise exception unless exception.nil?

}.to raise_error(Moped::Errors::PoolTimeout)
end
end

context "when the timeout happens after get a connection from pool" do
it "raise a Timeout::Error" do
expect {
node.connection do |conn|
Timeout::timeout(0.01) do
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
sleep(0.1) # just to simulate a long block which raise a timeout
end
end
}.to raise_error(Timeout::Error)
end
end
end
end
36 changes: 36 additions & 0 deletions spec/moped/query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,42 @@
end
end

context "with test commands enabled" do

let(:session) do
Moped::Session.new([ "127.0.0.1:#{port}" ], database: "moped_test")
end

let(:users) do
session.with(safe: true)[:users]
end

describe "when a query take too long" do
let(:port) { 31104 }

before do
start_mongo_server(port, "--setParameter enableTestCommands=1")
Process.detach(spawn("echo 'db.adminCommand({sleep: 1, w: true, secs: 10})' | mongo localhost:#{port} 2>&1 > /dev/null"))
sleep 1 # to sleep command on mongodb begins work
end

after do
stop_mongo_server(port)
end

it "raises a operation timeout exception" do
time = Benchmark.realtime do
expect {
Timeout::timeout(7) do
users.find("age" => { "$gte" => 65 }).first
end
}.to raise_exception("Took more than 5 seconds to receive data.")
end
expect(time).to be < 5.5
end
end
end

context "with a remote connection", mongohq: :auth do

before(:all) do
Expand Down
20 changes: 20 additions & 0 deletions spec/moped/session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,24 @@
nodes.last.should be_down
end
end

context "when connections on pool are busy" do
let(:session) do
Moped::Session.new([ "127.0.0.1:27017" ], database: "moped_test", pool_size: 1, pool_timeout: 0.2, max_retries: 30, retry_interval: 1)
end

it "should retry the operation" do
session[:test].find({ name: "test_counter" }).update({'$set' => {'cnt' => 1}}, {upsert: true})

results = []

300.times.map do |i|
Thread.new do
results.push session[:test].find({ name: "test_counter" }).first["cnt"]
end
end.each {|t| t.join }

expect(results.count).to eql(300)
end
end
end