@@ -80,34 +80,43 @@ def run_test(self):
8080 self .log .debug ("Destroying ZMQ context" )
8181 self .ctx .destroy (linger = None )
8282
83+ # Restart node with the specified zmq notifications enabled, subscribe to
84+ # all of them and return the corresponding ZMQSubscriber objects.
85+ def setup_zmq_test (self , services , recv_timeout = 60 , connect_nodes = False ):
86+ subscribers = []
87+ for topic , address in services :
88+ socket = self .ctx .socket (zmq .SUB )
89+ socket .set (zmq .RCVTIMEO , recv_timeout * 1000 )
90+ subscribers .append (ZMQSubscriber (socket , topic .encode ()))
91+
92+ self .restart_node (0 , ["-zmqpub%s=%s" % (topic , address ) for topic , address in services ])
93+
94+ if connect_nodes :
95+ self .connect_nodes (0 , 1 )
96+
97+ for i , sub in enumerate (subscribers ):
98+ sub .socket .connect (services [i ][1 ])
99+
100+ # Relax so that the subscribers are ready before publishing zmq messages
101+ sleep (0.2 )
102+
103+ return subscribers
104+
83105 def test_basic (self ):
84106
85107 # Invalid zmq arguments don't take down the node, see #17185.
86108 self .restart_node (0 , ["-zmqpubrawtx=foo" , "-zmqpubhashtx=bar" ])
87109
88110 address = 'tcp://127.0.0.1:28332'
89- sockets = []
90- subs = []
91- services = [b"hashblock" , b"hashtx" , b"rawblock" , b"rawtx" ]
92- for service in services :
93- sockets .append (self .ctx .socket (zmq .SUB ))
94- sockets [- 1 ].set (zmq .RCVTIMEO , 60000 )
95- subs .append (ZMQSubscriber (sockets [- 1 ], service ))
96-
97- # Subscribe to all available topics.
111+ subs = self .setup_zmq_test (
112+ [(topic , address ) for topic in ["hashblock" , "hashtx" , "rawblock" , "rawtx" ]],
113+ connect_nodes = True )
114+
98115 hashblock = subs [0 ]
99116 hashtx = subs [1 ]
100117 rawblock = subs [2 ]
101118 rawtx = subs [3 ]
102119
103- self .restart_node (0 , ["-zmqpub%s=%s" % (sub .topic .decode (), address ) for sub in [hashblock , hashtx , rawblock , rawtx ]])
104- self .connect_nodes (0 , 1 )
105- for socket in sockets :
106- socket .connect (address )
107-
108- # Relax so that the subscriber is ready before publishing zmq messages
109- sleep (0.2 )
110-
111120 num_blocks = 5
112121 self .log .info ("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n" : num_blocks })
113122 genhashes = self .nodes [0 ].generatetoaddress (num_blocks , ADDRESS_BCRT1_UNSPENDABLE )
@@ -174,25 +183,10 @@ def test_reorg(self):
174183
175184 address = 'tcp://127.0.0.1:28333'
176185
177- services = [b"hashblock" , b"hashtx" ]
178- sockets = []
179- subs = []
180- for service in services :
181- sockets .append (self .ctx .socket (zmq .SUB ))
182- # 2 second timeout to check end of notifications
183- sockets [- 1 ].set (zmq .RCVTIMEO , 2000 )
184- subs .append (ZMQSubscriber (sockets [- 1 ], service ))
185-
186- # Subscribe to all available topics.
187- hashblock = subs [0 ]
188- hashtx = subs [1 ]
189-
190186 # Should only notify the tip if a reorg occurs
191- self .restart_node (0 , ["-zmqpub%s=%s" % (sub .topic .decode (), address ) for sub in [hashblock , hashtx ]])
192- for socket in sockets :
193- socket .connect (address )
194- # Relax so that the subscriber is ready before publishing zmq messages
195- sleep (0.2 )
187+ hashblock , hashtx = self .setup_zmq_test (
188+ [(topic , address ) for topic in ["hashblock" , "hashtx" ]],
189+ recv_timeout = 2 ) # 2 second timeout to check end of notifications
196190
197191 # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
198192 payment_txid = self .nodes [0 ].sendtoaddress (self .nodes [0 ].getnewaddress (), 1.0 )
@@ -240,15 +234,7 @@ def test_sequence(self):
240234 <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
241235 """
242236 self .log .info ("Testing 'sequence' publisher" )
243- address = 'tcp://127.0.0.1:28333'
244- socket = self .ctx .socket (zmq .SUB )
245- socket .set (zmq .RCVTIMEO , 60000 )
246- seq = ZMQSubscriber (socket , b'sequence' )
247-
248- self .restart_node (0 , ['-zmqpub%s=%s' % (seq .topic .decode (), address )])
249- socket .connect (address )
250- # Relax so that the subscriber is ready before publishing zmq messages
251- sleep (0.2 )
237+ [seq ] = self .setup_zmq_test ([("sequence" , "tcp://127.0.0.1:28333" )])
252238
253239 # Mempool sequence number starts at 1
254240 seq_num = 1
@@ -399,16 +385,7 @@ def test_mempool_sync(self):
399385 return
400386
401387 self .log .info ("Testing 'mempool sync' usage of sequence notifier" )
402- address = 'tcp://127.0.0.1:28333'
403- socket = self .ctx .socket (zmq .SUB )
404- socket .set (zmq .RCVTIMEO , 60000 )
405- seq = ZMQSubscriber (socket , b'sequence' )
406-
407- self .restart_node (0 , ['-zmqpub%s=%s' % (seq .topic .decode (), address )])
408- self .connect_nodes (0 , 1 )
409- socket .connect (address )
410- # Relax so that the subscriber is ready before publishing zmq messages
411- sleep (0.2 )
388+ [seq ] = self .setup_zmq_test ([("sequence" , "tcp://127.0.0.1:28333" )], connect_nodes = True )
412389
413390 # In-memory counter, should always start at 1
414391 next_mempool_seq = self .nodes [0 ].getrawmempool (mempool_sequence = True )["mempool_sequence" ]
@@ -508,26 +485,17 @@ def test_mempool_sync(self):
508485
509486 def test_multiple_interfaces (self ):
510487 # Set up two subscribers with different addresses
511- subscribers = []
512- for i in range (2 ):
513- address = 'tcp://127.0.0.1:%d' % (28334 + i )
514- socket = self .ctx .socket (zmq .SUB )
515- socket .set (zmq .RCVTIMEO , 60000 )
516- hashblock = ZMQSubscriber (socket , b"hashblock" )
517- socket .connect (address )
518- subscribers .append ({'address' : address , 'hashblock' : hashblock })
519-
520- self .restart_node (0 , ['-zmqpub%s=%s' % (subscriber ['hashblock' ].topic .decode (), subscriber ['address' ]) for subscriber in subscribers ])
521-
522- # Relax so that the subscriber is ready before publishing zmq messages
523- sleep (0.2 )
488+ subscribers = self .setup_zmq_test ([
489+ ("hashblock" , "tcp://127.0.0.1:28334" ),
490+ ("hashblock" , "tcp://127.0.0.1:28335" ),
491+ ])
524492
525493 # Generate 1 block in nodes[0] and receive all notifications
526494 self .nodes [0 ].generatetoaddress (1 , ADDRESS_BCRT1_UNSPENDABLE )
527495
528496 # Should receive the same block hash on both subscribers
529- assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [0 ][ 'hashblock' ] .receive ().hex ())
530- assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [1 ][ 'hashblock' ] .receive ().hex ())
497+ assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [0 ].receive ().hex ())
498+ assert_equal (self .nodes [0 ].getbestblockhash (), subscribers [1 ].receive ().hex ())
531499
532500if __name__ == '__main__' :
533501 ZMQTest ().main ()
0 commit comments