@@ -767,17 +767,23 @@ def test_bad_argument(self):
767767 self .assertRaises (TypeError , self ._callFUT , None )
768768
769769 def test_signed_jwt_for_p12 (self ):
770+ import base64
770771 from oauth2client import client
771772 from gcloud ._testing import _Monkey
772773 from gcloud .storage import connection as MUT
773774
774775 scopes = []
776+ PRIVATE_KEY = 'dummy_private_key_text'
775777 credentials = client .SignedJwtAssertionCredentials (
776- 'dummy_service_account_name' , 'dummy_private_key_text' , scopes )
777- crypto = _Crypto ()
778+ 'dummy_service_account_name' , PRIVATE_KEY , scopes )
779+ crypt = _Crypt ()
778780 rsa = _RSA ()
779- with _Monkey (MUT , crypto = crypto , RSA = rsa ):
781+ with _Monkey (MUT , crypt = crypt , RSA = rsa ):
780782 result = self ._callFUT (credentials )
783+
784+ self .assertEqual (crypt ._private_key_text ,
785+ base64 .b64encode (PRIVATE_KEY ))
786+ self .assertEqual (crypt ._private_key_password , 'notasecret' )
781787 self .assertEqual (result , 'imported:__PEM__' )
782788
783789 def test_service_account_via_json_key (self ):
@@ -816,7 +822,6 @@ def test_wrong_type(self):
816822 from gcloud ._testing import _Monkey
817823 from gcloud .storage import connection as MUT
818824
819- crypto = _Crypto ()
820825 pkcs_v1_5 = _PKCS1_v1_5 ()
821826 rsa = _RSA ()
822827 sha256 = _SHA256 ()
@@ -827,7 +832,7 @@ def _get_pem_key(credentials):
827832 BAD_CREDENTIALS = None
828833 EXPIRATION = '100'
829834 SIGNATURE_STRING = 'dummy_signature'
830- with _Monkey (MUT , crypto = crypto , RSA = rsa , PKCS1_v1_5 = pkcs_v1_5 ,
835+ with _Monkey (MUT , RSA = rsa , PKCS1_v1_5 = pkcs_v1_5 ,
831836 SHA256 = sha256 , _get_pem_key = _get_pem_key ):
832837 self .assertRaises (NameError , self ._callFUT ,
833838 BAD_CREDENTIALS , EXPIRATION , SIGNATURE_STRING )
@@ -837,17 +842,21 @@ def _run_test_with_credentials(self, credentials, account_name):
837842 from gcloud ._testing import _Monkey
838843 from gcloud .storage import connection as MUT
839844
840- crypto = _Crypto ()
845+ crypt = _Crypt ()
841846 pkcs_v1_5 = _PKCS1_v1_5 ()
842847 rsa = _RSA ()
843848 sha256 = _SHA256 ()
844849
845850 EXPIRATION = '100'
846851 SIGNATURE_STRING = 'dummy_signature'
847- with _Monkey (MUT , crypto = crypto , RSA = rsa , PKCS1_v1_5 = pkcs_v1_5 ,
852+ with _Monkey (MUT , crypt = crypt , RSA = rsa , PKCS1_v1_5 = pkcs_v1_5 ,
848853 SHA256 = sha256 ):
849854 result = self ._callFUT (credentials , EXPIRATION , SIGNATURE_STRING )
850855
856+ if crypt ._pkcs12_key_as_pem_called :
857+ self .assertEqual (crypt ._private_key_text ,
858+ base64 .b64encode ('dummy_private_key_text' ))
859+ self .assertEqual (crypt ._private_key_password , 'notasecret' )
851860 self .assertEqual (sha256 ._signature_string , SIGNATURE_STRING )
852861 SIGNED = base64 .b64encode ('DEADBEEF' )
853862 expected_query = {
@@ -900,20 +909,14 @@ def request(self, **kw):
900909 return self ._response , self ._content
901910
902911
903- class _Crypto (object ):
904-
905- FILETYPE_PEM = 'pem'
906- _loaded = _dumped = None
907-
908- def load_pkcs12 (self , buffer , passphrase ):
909- self ._loaded = (buffer , passphrase )
910- return self
912+ class _Crypt (object ):
911913
912- def get_privatekey (self ):
913- return '__PKCS12__'
914+ _pkcs12_key_as_pem_called = False
914915
915- def dump_privatekey (self , type , pkey , cipher = None , passphrase = None ):
916- self ._dumped = (type , pkey , cipher , passphrase )
916+ def pkcs12_key_as_pem (self , private_key_text , private_key_password ):
917+ self ._pkcs12_key_as_pem_called = True
918+ self ._private_key_text = private_key_text
919+ self ._private_key_password = private_key_password
917920 return '__PEM__'
918921
919922
0 commit comments