@@ -19,16 +19,15 @@ use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
19
19
use crate :: lsps2:: client:: { LSPS2ClientConfig , LSPS2ClientHandler } ;
20
20
use crate :: lsps2:: msgs:: LSPS2Message ;
21
21
use crate :: lsps2:: service:: { LSPS2ServiceConfig , LSPS2ServiceHandler } ;
22
- use crate :: prelude:: { HashMap , ToString , Vec } ;
22
+ use crate :: prelude:: { HashMap , HashSet , ToString , Vec } ;
23
23
use crate :: sync:: { Arc , Mutex , RwLock } ;
24
24
25
25
use lightning:: chain:: { self , BestBlock , Confirm , Filter , Listen } ;
26
26
use lightning:: ln:: channelmanager:: { AChannelManager , ChainParameters } ;
27
27
use lightning:: ln:: features:: { InitFeatures , NodeFeatures } ;
28
- use lightning:: ln:: msgs:: { ErrorAction , ErrorMessage , LightningError } ;
28
+ use lightning:: ln:: msgs:: { ErrorAction , LightningError } ;
29
29
use lightning:: ln:: peer_handler:: CustomMessageHandler ;
30
30
use lightning:: ln:: wire:: CustomMessageReader ;
31
- use lightning:: ln:: ChannelId ;
32
31
use lightning:: sign:: EntropySource ;
33
32
use lightning:: util:: logger:: Level ;
34
33
use lightning:: util:: ser:: Readable ;
94
93
pending_messages : Arc < MessageQueue > ,
95
94
pending_events : Arc < EventQueue > ,
96
95
request_id_to_method_map : Mutex < HashMap < RequestId , LSPSMethod > > ,
96
+ // We ignore peers if they send us bogus data.
97
+ ignored_peers : RwLock < HashSet < PublicKey > > ,
97
98
lsps0_client_handler : LSPS0ClientHandler < ES > ,
98
99
lsps0_service_handler : Option < LSPS0ServiceHandler > ,
99
100
#[ cfg( lsps1) ]
@@ -126,6 +127,7 @@ where
126
127
where {
127
128
let pending_messages = Arc :: new ( MessageQueue :: new ( ) ) ;
128
129
let pending_events = Arc :: new ( EventQueue :: new ( ) ) ;
130
+ let ignored_peers = RwLock :: new ( HashSet :: new ( ) ) ;
129
131
130
132
let lsps0_client_handler = LSPS0ClientHandler :: new (
131
133
entropy_source. clone ( ) ,
@@ -192,6 +194,7 @@ where {
192
194
pending_messages,
193
195
pending_events,
194
196
request_id_to_method_map : Mutex :: new ( HashMap :: new ( ) ) ,
197
+ ignored_peers,
195
198
lsps0_client_handler,
196
199
lsps0_service_handler,
197
200
#[ cfg( lsps1) ]
@@ -480,41 +483,62 @@ where
480
483
fn handle_custom_message (
481
484
& self , msg : Self :: CustomMessage , sender_node_id : & PublicKey ,
482
485
) -> Result < ( ) , lightning:: ln:: msgs:: LightningError > {
486
+ {
487
+ if self . ignored_peers . read ( ) . unwrap ( ) . contains ( & sender_node_id) {
488
+ let err = format ! ( "Ignoring message from peer {}." , sender_node_id) ;
489
+ return Err ( LightningError {
490
+ err,
491
+ action : ErrorAction :: IgnoreAndLog ( Level :: Trace ) ,
492
+ } ) ;
493
+ }
494
+ }
495
+
483
496
let message = {
484
- let mut request_id_to_method_map = self . request_id_to_method_map . lock ( ) . unwrap ( ) ;
485
- LSPSMessage :: from_str_with_id_map ( & msg. payload , & mut request_id_to_method_map) . map_err (
486
- |_| {
487
- let error = ResponseError {
488
- code : JSONRPC_INVALID_MESSAGE_ERROR_CODE ,
489
- message : JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE . to_string ( ) ,
490
- data : None ,
491
- } ;
492
-
493
- self . pending_messages . enqueue ( sender_node_id, LSPSMessage :: Invalid ( error) ) ;
494
- let err = format ! ( "Failed to deserialize invalid LSPS message." ) ;
495
- let err_msg =
496
- Some ( ErrorMessage { channel_id : ChannelId ( [ 0 ; 32 ] ) , data : err. clone ( ) } ) ;
497
- LightningError { err, action : ErrorAction :: DisconnectPeer { msg : err_msg } }
498
- } ,
499
- ) ?
497
+ {
498
+ let mut request_id_to_method_map = self . request_id_to_method_map . lock ( ) . unwrap ( ) ;
499
+ LSPSMessage :: from_str_with_id_map ( & msg. payload , & mut request_id_to_method_map)
500
+ }
501
+ . map_err ( |_| {
502
+ let error = ResponseError {
503
+ code : JSONRPC_INVALID_MESSAGE_ERROR_CODE ,
504
+ message : JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE . to_string ( ) ,
505
+ data : None ,
506
+ } ;
507
+
508
+ self . pending_messages . enqueue ( sender_node_id, LSPSMessage :: Invalid ( error) ) ;
509
+ self . ignored_peers . write ( ) . unwrap ( ) . insert ( * sender_node_id) ;
510
+ let err = format ! (
511
+ "Failed to deserialize invalid LSPS message. Ignoring peer {} from now on." ,
512
+ sender_node_id
513
+ ) ;
514
+ LightningError { err, action : ErrorAction :: IgnoreAndLog ( Level :: Info ) }
515
+ } ) ?
500
516
} ;
501
517
502
518
self . handle_lsps_message ( message, sender_node_id)
503
519
}
504
520
505
521
fn get_and_clear_pending_msg ( & self ) -> Vec < ( PublicKey , Self :: CustomMessage ) > {
506
- let mut request_id_to_method_map = self . request_id_to_method_map . lock ( ) . unwrap ( ) ;
507
- self . pending_messages
508
- . get_and_clear_pending_msgs ( )
522
+ let pending_messages = self . pending_messages . get_and_clear_pending_msgs ( ) ;
523
+
524
+ let mut request_ids_and_methods = pending_messages
509
525
. iter ( )
510
- . map ( |( public_key, lsps_message) | {
511
- if let Some ( ( request_id, method) ) = lsps_message. get_request_id_and_method ( ) {
512
- request_id_to_method_map. insert ( request_id, method) ;
513
- }
514
- (
515
- * public_key,
516
- RawLSPSMessage { payload : serde_json:: to_string ( & lsps_message) . unwrap ( ) } ,
517
- )
526
+ . filter_map ( |( _, msg) | msg. get_request_id_and_method ( ) )
527
+ . peekable ( ) ;
528
+
529
+ if request_ids_and_methods. peek ( ) . is_some ( ) {
530
+ let mut request_id_to_method_map_lock = self . request_id_to_method_map . lock ( ) . unwrap ( ) ;
531
+ for ( request_id, method) in request_ids_and_methods {
532
+ request_id_to_method_map_lock. insert ( request_id, method) ;
533
+ }
534
+ }
535
+
536
+ pending_messages
537
+ . into_iter ( )
538
+ . filter_map ( |( public_key, msg) | {
539
+ serde_json:: to_string ( & msg)
540
+ . ok ( )
541
+ . and_then ( |payload| Some ( ( public_key, RawLSPSMessage { payload } ) ) )
518
542
} )
519
543
. collect ( )
520
544
}
0 commit comments