Skip to content

PYTHON-1419 Connection failure to SNI endpoint when first host is unavailable #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
23 changes: 17 additions & 6 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ def create(self, row):
class SniEndPoint(EndPoint):
"""SNI Proxy EndPoint implementation."""

def __init__(self, proxy_address, server_name, port=9042):
def __init__(self, proxy_address, server_name, port=9042, init_index=0):
self._proxy_address = proxy_address
self._index = 0
self._index = init_index
self._resolved_address = None # resolved address
self._port = port
self._server_name = server_name
Expand All @@ -267,8 +267,7 @@ def ssl_options(self):

def resolve(self):
try:
resolved_addresses = socket.getaddrinfo(self._proxy_address, self._port,
socket.AF_UNSPEC, socket.SOCK_STREAM)
resolved_addresses = self._resolve_proxy_addresses()
except socket.gaierror:
log.debug('Could not resolve sni proxy hostname "%s" '
'with port %d' % (self._proxy_address, self._port))
Expand All @@ -280,6 +279,10 @@ def resolve(self):

return self._resolved_address, self._port

def _resolve_proxy_addresses(self):
return socket.getaddrinfo(self._proxy_address, self._port,
socket.AF_UNSPEC, socket.SOCK_STREAM)

def __eq__(self, other):
return (isinstance(other, SniEndPoint) and
self.address == other.address and self.port == other.port and
Expand All @@ -305,16 +308,24 @@ class SniEndPointFactory(EndPointFactory):
def __init__(self, proxy_address, port):
self._proxy_address = proxy_address
self._port = port
# Initial lookup index to prevent all SNI endpoints to be resolved
# into the same starting IP address (which might not be available currently).
# If SNI resolves to 3 IPs, first endpoint will connect to first
# IP address, and subsequent resolutions to next IPs in round-robin
# fusion.
self._init_index = -1

def create(self, row):
host_id = row.get("host_id")
if host_id is None:
raise ValueError("No host_id to create the SniEndPoint")

return SniEndPoint(self._proxy_address, str(host_id), self._port)
self._init_index += 1
return SniEndPoint(self._proxy_address, str(host_id), self._port, self._init_index)

def create_from_sni(self, sni):
return SniEndPoint(self._proxy_address, sni, self._port)
self._init_index += 1
return SniEndPoint(self._proxy_address, sni, self._port, self._init_index)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worthwhile being clear on the consequence of this change. If our proxy hostname resolves to N IP address we're basically exchanging a 1-in-N chance of complete failure if the first node is unavailable (because all our endpoints will return a common IP address) for an essentially guaranteed failure that the connection for one of our nodes will fail (since with the code in this PR at least one of our nodes will return the failing IP address from resolve()).

I'm not saying it's a problem that we're making this exchange; it probably is better to have a failure with connections to one of our nodes rather than to fail completely at connect time. But it is worth pointing out that this isn't a zero-cost abstraction.



@total_ordering
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import logging
import socket
import uuid

from unittest.mock import patch, Mock

from cassandra import ConsistencyLevel, DriverException, Timeout, Unavailable, RequestExecutionException, ReadTimeout, WriteTimeout, CoordinationFailure, ReadFailure, WriteFailure, FunctionFailure, AlreadyExists,\
InvalidRequest, Unauthorized, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, RequestValidationException, ConfigurationException, ProtocolVersion
from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT
from cassandra.connection import SniEndPoint, SniEndPointFactory
from cassandra.pool import Host
from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy
from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory
Expand All @@ -31,6 +33,7 @@

log = logging.getLogger(__name__)


class ExceptionTypeTest(unittest.TestCase):

def test_exception_types(self):
Expand Down Expand Up @@ -85,6 +88,12 @@ def test_exception_types(self):
self.assertTrue(issubclass(UnsupportedOperation, DriverException))


class MockOrderedPolicy(RoundRobinPolicy):
all_hosts = set()

def make_query_plan(self, working_keyspace=None, query=None):
return sorted(self.all_hosts, key=lambda x: x.endpoint.ssl_options['server_hostname'])

class ClusterTest(unittest.TestCase):

def test_tuple_for_contact_points(self):
Expand Down Expand Up @@ -119,6 +128,26 @@ def test_requests_in_flight_threshold(self):
for n in (0, mn, 128):
self.assertRaises(ValueError, c.set_max_requests_per_connection, d, n)

# Validate that at least the default LBP can create a query plan with end points that resolve
# to different addresses initially. This may not be exactly how things play out in practice
# (the control connection will muck with this even if nothing else does) but it should be
# a pretty good approximation.
def test_query_plan_for_sni_contains_unique_addresses(self):
node_cnt = 5
def _mocked_proxy_dns_resolution(self):
return [(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, ('127.0.0.%s' % (i,), 9042)) for i in range(node_cnt)]

c = Cluster()
lbp = c.load_balancing_policy
lbp.local_dc = "dc1"
factory = SniEndPointFactory("proxy.foo.bar", 9042)
for host in (Host(factory.create({"host_id": uuid.uuid4().hex, "dc": "dc1"}), SimpleConvictionPolicy) for _ in range(node_cnt)):
lbp.on_up(host)
with patch.object(SniEndPoint, '_resolve_proxy_addresses', _mocked_proxy_dns_resolution):
addrs = [host.endpoint.resolve() for host in lbp.make_query_plan()]
# single SNI endpoint should be resolved to multiple unique IP addresses
self.assertEqual(len(addrs), len(set(addrs)))


class SchedulerTest(unittest.TestCase):
# TODO: this suite could be expanded; for now just adding a test covering a ticket
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,15 @@ def test_endpoint_resolve(self):
for i in range(10):
(address, _) = endpoint.resolve()
self.assertEqual(address, next(it))

def test_sni_resolution_start_index(self):
factory = SniEndPointFactory("proxy.datastax.com", 9999)
initial_index = factory._init_index

endpoint1 = factory.create_from_sni('sni1')
self.assertEqual(factory._init_index, initial_index + 1)
self.assertEqual(endpoint1._index, factory._init_index)

endpoint2 = factory.create_from_sni('sni2')
self.assertEqual(factory._init_index, initial_index + 2)
self.assertEqual(endpoint2._index, factory._init_index)