diff --git a/neo4j/_conf.py b/neo4j/_conf.py index d40528ff1..116f69bcd 100644 --- a/neo4j/_conf.py +++ b/neo4j/_conf.py @@ -423,9 +423,8 @@ def get_ssl_context(self): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) # For recommended security options see - # https://docs.python.org/3.7/library/ssl.html#protocol-versions - ssl_context.options |= ssl.OP_NO_TLSv1 # Python 3.2 - ssl_context.options |= ssl.OP_NO_TLSv1_1 # Python 3.4 + # https://docs.python.org/3.10/library/ssl.html#protocol-versions + ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2 if isinstance(self.trusted_certificates, TrustAll): # trust any certificate diff --git a/testkit/Dockerfile b/testkit/Dockerfile index e3322d281..be32548e5 100644 --- a/testkit/Dockerfile +++ b/testkit/Dockerfile @@ -41,17 +41,20 @@ RUN git clone https://github.com/pyenv/pyenv.git .pyenv ENV PYENV_ROOT /.pyenv ENV PATH $PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH -# Set minimum supported Python version -RUN pyenv install 3.7:latest -RUN pyenv install 3.8:latest -RUN pyenv install 3.9:latest -RUN pyenv install 3.10:latest +# Setup python version +ENV PYTHON_VERSIONS 3.7 3.8 3.9 3.10 + +RUN for version in $PYTHON_VERSIONS; do \ + pyenv install $version:latest; \ + done RUN pyenv rehash RUN pyenv global $(pyenv versions --bare --skip-aliases) -# Install Latest pip for each environment +# Install Latest pip and setuptools for each environment +# + tox and tools for starting the tests # https://pip.pypa.io/en/stable/news/ -RUN python -m pip install --upgrade pip - -# Install Python Testing Tools -RUN python -m pip install coverage tox tox-factor +RUN for version in 3.7 3.8 3.9 3.10; do \ + python$version -m pip install -U pip && \ + python$version -m pip install -U setuptools && \ + python$version -m pip install -U coverage tox tox-factor; \ + done diff --git a/testkit/_common.py b/testkit/_common.py new file mode 100644 index 000000000..ff285ad5d --- /dev/null +++ b/testkit/_common.py @@ -0,0 +1,17 @@ +import os +import subprocess +import sys + + +TEST_BACKEND_VERSION = os.getenv("TEST_BACKEND_VERSION", "python") + + +def run(args, env=None): + return subprocess.run( + args, universal_newlines=True, stdout=sys.stdout, stderr=sys.stderr, + check=True, env=env + ) + + +def run_python(args, env=None): + run([TEST_BACKEND_VERSION, "-W", "error", *args], env=env) diff --git a/testkit/backend.py b/testkit/backend.py index b0c3c09f8..702ea8a5b 100644 --- a/testkit/backend.py +++ b/testkit/backend.py @@ -19,12 +19,12 @@ import os -import subprocess -import sys + +from _common import run_python if __name__ == "__main__": - cmd = ["python", "-W", "error", "-m", "testkitbackend"] + cmd = ["-m", "testkitbackend"] if "TEST_BACKEND_SERVER" in os.environ: cmd.append(os.environ["TEST_BACKEND_SERVER"]) - subprocess.check_call(cmd, stdout=sys.stdout, stderr=sys.stderr) + run_python(cmd) diff --git a/testkit/build.py b/testkit/build.py index 2206d709f..6bad785ba 100644 --- a/testkit/build.py +++ b/testkit/build.py @@ -24,17 +24,11 @@ """ -import subprocess -import sys - - -def run(args, env=None): - subprocess.run(args, universal_newlines=True, stdout=sys.stdout, - stderr=sys.stderr, check=True, env=env) +from _common import run_python if __name__ == "__main__": - run(["python", "setup.py", "build"]) - run(["python", "-m", "pip", "install", "-U", "pip"]) - run(["python", "-m", "pip", "install", "-Ur", - "testkitbackend/requirements.txt"]) + run_python(["setup.py", "build"]) + run_python(["-m", "pip", "install", "-U", "pip"]) + run_python(["-m", "pip", "install", "-Ur", + "testkitbackend/requirements.txt"]) diff --git a/testkit/integration.py b/testkit/integration.py index 81b864f0c..edb08bd79 100644 --- a/testkit/integration.py +++ b/testkit/integration.py @@ -18,13 +18,8 @@ # limitations under the License. -import subprocess - - -def run(args): - subprocess.run( - args, universal_newlines=True, stderr=subprocess.STDOUT, check=True) +from _common import run_python if __name__ == "__main__": - run(["python", "-W", "error", "-m", "tox", "-f", "integration"]) + run_python(["-m", "tox", "-f", "integration"]) diff --git a/testkit/unittests.py b/testkit/unittests.py index 3cea1355c..0b714d3d1 100644 --- a/testkit/unittests.py +++ b/testkit/unittests.py @@ -18,13 +18,8 @@ # limitations under the License. -import subprocess - - -def run(args): - subprocess.run( - args, universal_newlines=True, stderr=subprocess.STDOUT, check=True) +from _common import run_python if __name__ == "__main__": - run(["python", "-W", "error", "-m", "tox", "-f", "unit"]) + run_python(["-m", "tox", "-f", "unit"]) diff --git a/tests/unit/common/test_conf.py b/tests/unit/common/test_conf.py index 9b895a69a..9e6ef2324 100644 --- a/tests/unit/common/test_conf.py +++ b/tests/unit/common/test_conf.py @@ -16,6 +16,8 @@ # limitations under the License. +import ssl + import pytest from neo4j import ( @@ -266,3 +268,150 @@ def test_init_session_config_with_not_valid_key(): _ = SessionConfig.consume(test_config_b) assert session_config.connection_acquisition_timeout == 333 + + +@pytest.mark.parametrize("config", ( + {}, + {"encrypted": False}, + {"trusted_certificates": TrustSystemCAs()}, + {"trusted_certificates": TrustAll()}, + {"trusted_certificates": TrustCustomCAs("foo", "bar")}, +)) +def test_no_ssl_mock(config, mocker): + ssl_context_mock = mocker.patch("ssl.SSLContext", autospec=True) + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is False + assert pool_config.get_ssl_context() is None + ssl_context_mock.assert_not_called() + + +@pytest.mark.parametrize("config", ( + {"encrypted": True}, + {"encrypted": True, "trusted_certificates": TrustSystemCAs()}, +)) +def test_trust_system_cas_mock(config, mocker): + ssl_context_mock = mocker.patch("ssl.SSLContext", autospec=True) + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is True + ssl_context = pool_config.get_ssl_context() + _assert_mock_tls_1_2(ssl_context_mock) + assert ssl_context.minimum_version == ssl.TLSVersion.TLSv1_2 + ssl_context_mock.return_value.load_default_certs.assert_called_once_with() + ssl_context_mock.return_value.load_verify_locations.assert_not_called() + assert ssl_context.check_hostname is True + assert ssl_context.verify_mode == ssl.CERT_REQUIRED + + +@pytest.mark.parametrize("config", ( + {"encrypted": True, "trusted_certificates": TrustCustomCAs("foo", "bar")}, + {"encrypted": True, "trusted_certificates": TrustCustomCAs()}, +)) +def test_trust_custom_cas_mock(config, mocker): + ssl_context_mock = mocker.patch("ssl.SSLContext", autospec=True) + certs = config["trusted_certificates"].certs + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is True + ssl_context = pool_config.get_ssl_context() + _assert_mock_tls_1_2(ssl_context_mock) + assert ssl_context.minimum_version == ssl.TLSVersion.TLSv1_2 + ssl_context_mock.return_value.load_default_certs.assert_not_called() + assert ( + ssl_context_mock.return_value.load_verify_locations.call_args_list + == [((cert,), {}) for cert in certs] + ) + assert ssl_context.check_hostname is True + assert ssl_context.verify_mode == ssl.CERT_REQUIRED + + +@pytest.mark.parametrize("config", ( + {"encrypted": True, "trusted_certificates": TrustAll()}, +)) +def test_trust_all_mock(config, mocker): + ssl_context_mock = mocker.patch("ssl.SSLContext", autospec=True) + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is True + ssl_context = pool_config.get_ssl_context() + _assert_mock_tls_1_2(ssl_context_mock) + assert ssl_context.minimum_version == ssl.TLSVersion.TLSv1_2 + ssl_context_mock.return_value.load_default_certs.assert_not_called() + ssl_context_mock.return_value.load_verify_locations.assert_not_called() + assert ssl_context.check_hostname is False + assert ssl_context.verify_mode is ssl.CERT_NONE + + +def _assert_mock_tls_1_2(mock): + mock.assert_called_once_with(ssl.PROTOCOL_TLS_CLIENT) + assert mock.return_value.minimum_version == ssl.TLSVersion.TLSv1_2 + + +@pytest.mark.parametrize("config", ( + {}, + {"encrypted": False}, + {"trusted_certificates": TrustSystemCAs()}, + {"trusted_certificates": TrustAll()}, + {"trusted_certificates": TrustCustomCAs("foo", "bar")}, +)) +def test_no_ssl(config): + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is False + assert pool_config.get_ssl_context() is None + + +@pytest.mark.parametrize("config", ( + {"encrypted": True}, + {"encrypted": True, "trusted_certificates": TrustSystemCAs()}, +)) +def test_trust_system_cas(config): + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is True + ssl_context = pool_config.get_ssl_context() + assert isinstance(ssl_context, ssl.SSLContext) + _assert_context_tls_1_2(ssl_context) + assert ssl_context.check_hostname is True + assert ssl_context.verify_mode == ssl.CERT_REQUIRED + + +@pytest.mark.parametrize("config", ( + {"encrypted": True, "trusted_certificates": TrustCustomCAs()}, +)) +def test_trust_custom_cas(config): + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is True + ssl_context = pool_config.get_ssl_context() + assert isinstance(ssl_context, ssl.SSLContext) + _assert_context_tls_1_2(ssl_context) + assert ssl_context.check_hostname is True + assert ssl_context.verify_mode == ssl.CERT_REQUIRED + + +@pytest.mark.parametrize("config", ( + {"encrypted": True, "trusted_certificates": TrustAll()}, +)) +def test_trust_all(config): + pool_config = PoolConfig.consume(config) + assert pool_config.encrypted is True + ssl_context = pool_config.get_ssl_context() + assert isinstance(ssl_context, ssl.SSLContext) + _assert_context_tls_1_2(ssl_context) + assert ssl_context.check_hostname is False + assert ssl_context.verify_mode is ssl.CERT_NONE + + +def _assert_context_tls_1_2(ctx): + assert ctx.protocol == ssl.PROTOCOL_TLS_CLIENT + assert ctx.minimum_version == ssl.TLSVersion.TLSv1_2 + + +@pytest.mark.parametrize("encrypted", (True, False)) +@pytest.mark.parametrize("trusted_certificates", ( + TrustSystemCAs(), TrustAll(), TrustCustomCAs() +)) +def test_custom_ssl_context(encrypted, trusted_certificates): + custom_ssl_context = object() + pool_config = PoolConfig.consume({ + "encrypted": encrypted, + "trusted_certificates": trusted_certificates, + "ssl_context": custom_ssl_context, + }) + assert pool_config.encrypted is encrypted + assert pool_config.get_ssl_context() is custom_ssl_context