diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 01a12a71d..72f1546b1 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -471,18 +471,25 @@ def import_rsa_key_from_file(filename): return key -def parse_xmlsec_output(output): +def parse_xmlsec_verify_output(output, version=None): """Parse the output from xmlsec to try to find out if the command was successfull or not. :param output: The output from Popen :return: A boolean; True if the command was a success otherwise False """ - for line in output.splitlines(): - if line == "OK": - return True - elif line == "FAIL": - raise XmlsecError(output) + if version is None or version < (1, 3): + for line in output.splitlines(): + if line == "OK": + return True + elif line == "FAIL": + raise XmlsecError(output) + else: + for line in output.splitlines(): + if line == 'Verification status: OK': + return True + elif line == 'Verification status: FAILED': + raise XmlsecError(output) raise XmlsecError(output) @@ -593,9 +600,18 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None): class CryptoBackend: + @property def version(self): raise NotImplementedError() + @property + def version_nums(self): + try: + vns = tuple(int(t) for t in self.version) + except ValueError: + vns = (0, 0, 0) + return vns + def encrypt(self, text, recv_key, template, key_type): raise NotImplementedError() @@ -634,6 +650,7 @@ def __init__(self, xmlsec_binary, delete_tmpfiles=True, **kwargs): except KeyError: pass + @property def version(self): com_list = [self.xmlsec, "--version"] pof = Popen(com_list, stderr=PIPE, stdout=PIPE) @@ -642,7 +659,7 @@ def version(self): try: return content.split(" ")[1] except IndexError: - return "" + return "0.0.0" def encrypt(self, text, recv_key, template, session_key_type, xpath=""): """ @@ -824,7 +841,7 @@ def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_i except XmlsecError as e: raise SignatureError(com_list) from e - return parse_xmlsec_output(stderr) + return parse_xmlsec_verify_output(stderr, self.version_nums) def _run_xmlsec(self, com_list, extra_args): """ @@ -836,6 +853,8 @@ def _run_xmlsec(self, com_list, extra_args): """ with NamedTemporaryFile(suffix=".xml") as ntf: com_list.extend(["--output", ntf.name]) + if self.version_nums >= (1, 3): + com_list.extend(['--lax-key-search']) com_list += extra_args logger.debug("xmlsec command: %s", " ".join(com_list)) @@ -870,10 +889,13 @@ class CryptoBackendXMLSecurity(CryptoBackend): def __init__(self): CryptoBackend.__init__(self) + @property def version(self): - # XXX if XMLSecurity.__init__ included a __version__, that would be - # better than static 0.0 here. - return "XMLSecurity 0.0" + try: + import xmlsec + return xmlsec.__version__ + except (ImportError, AttributeError): + return "0.0.0" def sign_statement(self, statement, node_name, key_file, node_id): """ diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index f026fa393..0049e1711 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -192,7 +192,7 @@ def test_sign_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass) @@ -209,7 +209,7 @@ def test_multiple_signatures_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass, must=True) @@ -498,7 +498,7 @@ def test_sign_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass) @@ -515,7 +515,7 @@ def test_multiple_signatures_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass, must=True) @@ -1079,18 +1079,34 @@ def test_sha256_signing_non_ascii_ava(): def test_xmlsec_output_line_parsing(): output1 = "prefix\nOK\npostfix" - assert sigver.parse_xmlsec_output(output1) + assert sigver.parse_xmlsec_verify_output(output1) output2 = "prefix\nFAIL\npostfix" with raises(sigver.XmlsecError): - sigver.parse_xmlsec_output(output2) + sigver.parse_xmlsec_verify_output(output2) output3 = "prefix\r\nOK\r\npostfix" - assert sigver.parse_xmlsec_output(output3) + assert sigver.parse_xmlsec_verify_output(output3) output4 = "prefix\r\nFAIL\r\npostfix" with raises(sigver.XmlsecError): - sigver.parse_xmlsec_output(output4) + sigver.parse_xmlsec_verify_output(output4) + + +def test_xmlsec_v1_3_x_output_line_parsing(): + output1 = "prefix\nVerification status: OK\npostfix" + assert sigver.parse_xmlsec_verify_output(output1, version=(1, 3)) + + output2 = "prefix\nVerification status: FAILED\npostfix" + with raises(sigver.XmlsecError): + sigver.parse_xmlsec_verify_output(output2, version=(1, 3)) + + output3 = "prefix\r\nVerification status: OK\r\npostfix" + assert sigver.parse_xmlsec_verify_output(output3, version=(1, 3)) + + output4 = "prefix\r\nVerification status: FAILED\r\npostfix" + with raises(sigver.XmlsecError): + sigver.parse_xmlsec_verify_output(output4, version=(1, 3)) def test_cert_trailing_newlines_ignored():