@@ -558,19 +558,27 @@ class SocketPairTest(unittest.TestCase, ThreadableTest):
558
558
def __init__ (self , methodName = 'runTest' ):
559
559
unittest .TestCase .__init__ (self , methodName = methodName )
560
560
ThreadableTest .__init__ (self )
561
+ self .cli = None
562
+ self .serv = None
563
+
564
+ def socketpair (self ):
565
+ # To be overridden by some child classes.
566
+ return socket .socketpair ()
561
567
562
568
def setUp (self ):
563
- self .serv , self .cli = socket .socketpair ()
569
+ self .serv , self .cli = self .socketpair ()
564
570
565
571
def tearDown (self ):
566
- self .serv .close ()
572
+ if self .serv :
573
+ self .serv .close ()
567
574
self .serv = None
568
575
569
576
def clientSetUp (self ):
570
577
pass
571
578
572
579
def clientTearDown (self ):
573
- self .cli .close ()
580
+ if self .cli :
581
+ self .cli .close ()
574
582
self .cli = None
575
583
ThreadableTest .clientTearDown (self )
576
584
@@ -4786,6 +4794,120 @@ def _testSend(self):
4786
4794
self .assertEqual (msg , MSG )
4787
4795
4788
4796
4797
+ class PurePythonSocketPairTest (SocketPairTest ):
4798
+
4799
+ # Explicitly use socketpair AF_INET or AF_INET6 to ensure that is the
4800
+ # code path we're using regardless platform is the pure python one where
4801
+ # `_socket.socketpair` does not exist. (AF_INET does not work with
4802
+ # _socket.socketpair on many platforms).
4803
+ def socketpair (self ):
4804
+ # called by super().setUp().
4805
+ try :
4806
+ return socket .socketpair (socket .AF_INET6 )
4807
+ except OSError :
4808
+ return socket .socketpair (socket .AF_INET )
4809
+
4810
+ # Local imports in this class make for easy security fix backporting.
4811
+
4812
+ def setUp (self ):
4813
+ import _socket
4814
+ self ._orig_sp = getattr (_socket , 'socketpair' , None )
4815
+ if self ._orig_sp is not None :
4816
+ # This forces the version using the non-OS provided socketpair
4817
+ # emulation via an AF_INET socket in Lib/socket.py.
4818
+ del _socket .socketpair
4819
+ import importlib
4820
+ global socket
4821
+ socket = importlib .reload (socket )
4822
+ else :
4823
+ pass # This platform already uses the non-OS provided version.
4824
+ super ().setUp ()
4825
+
4826
+ def tearDown (self ):
4827
+ super ().tearDown ()
4828
+ import _socket
4829
+ if self ._orig_sp is not None :
4830
+ # Restore the default socket.socketpair definition.
4831
+ _socket .socketpair = self ._orig_sp
4832
+ import importlib
4833
+ global socket
4834
+ socket = importlib .reload (socket )
4835
+
4836
+ def test_recv (self ):
4837
+ msg = self .serv .recv (1024 )
4838
+ self .assertEqual (msg , MSG )
4839
+
4840
+ def _test_recv (self ):
4841
+ self .cli .send (MSG )
4842
+
4843
+ def test_send (self ):
4844
+ self .serv .send (MSG )
4845
+
4846
+ def _test_send (self ):
4847
+ msg = self .cli .recv (1024 )
4848
+ self .assertEqual (msg , MSG )
4849
+
4850
+ def test_ipv4 (self ):
4851
+ cli , srv = socket .socketpair (socket .AF_INET )
4852
+ cli .close ()
4853
+ srv .close ()
4854
+
4855
+ def _test_ipv4 (self ):
4856
+ pass
4857
+
4858
+ @unittest .skipIf (not hasattr (_socket , 'IPPROTO_IPV6' ) or
4859
+ not hasattr (_socket , 'IPV6_V6ONLY' ),
4860
+ "IPV6_V6ONLY option not supported" )
4861
+ @unittest .skipUnless (socket_helper .IPV6_ENABLED , 'IPv6 required for this test' )
4862
+ def test_ipv6 (self ):
4863
+ cli , srv = socket .socketpair (socket .AF_INET6 )
4864
+ cli .close ()
4865
+ srv .close ()
4866
+
4867
+ def _test_ipv6 (self ):
4868
+ pass
4869
+
4870
+ def test_injected_authentication_failure (self ):
4871
+ orig_getsockname = socket .socket .getsockname
4872
+ inject_sock = None
4873
+
4874
+ def inject_getsocketname (self ):
4875
+ nonlocal inject_sock
4876
+ sockname = orig_getsockname (self )
4877
+ # Connect to the listening socket ahead of the
4878
+ # client socket.
4879
+ if inject_sock is None :
4880
+ inject_sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
4881
+ inject_sock .setblocking (False )
4882
+ try :
4883
+ inject_sock .connect (sockname [:2 ])
4884
+ except (BlockingIOError , InterruptedError ):
4885
+ pass
4886
+ inject_sock .setblocking (True )
4887
+ return sockname
4888
+
4889
+ sock1 = sock2 = None
4890
+ try :
4891
+ socket .socket .getsockname = inject_getsocketname
4892
+ with self .assertRaises (OSError ):
4893
+ sock1 , sock2 = socket .socketpair ()
4894
+ finally :
4895
+ socket .socket .getsockname = orig_getsockname
4896
+ if inject_sock :
4897
+ inject_sock .close ()
4898
+ if sock1 : # This cleanup isn't needed on a successful test.
4899
+ sock1 .close ()
4900
+ if sock2 :
4901
+ sock2 .close ()
4902
+
4903
+ def _test_injected_authentication_failure (self ):
4904
+ # No-op. Exists for base class threading infrastructure to call.
4905
+ # We could refactor this test into its own lesser class along with the
4906
+ # setUp and tearDown code to construct an ideal; it is simpler to keep
4907
+ # it here and live with extra overhead one this _one_ failure test.
4908
+ pass
4909
+
4910
+
4789
4911
class NonBlockingTCPTests (ThreadedTCPSocketTest ):
4790
4912
4791
4913
def __init__ (self , methodName = 'runTest' ):
0 commit comments