@@ -7,7 +7,10 @@ use crate::{build_network_worker, Announcement};
77use anyhow:: { format_err, Result } ;
88use bytes:: Bytes ;
99use futures:: future:: { abortable, AbortHandle } ;
10- use futures:: FutureExt ;
10+ use futures:: {
11+ stream:: { self , StreamExt } ,
12+ FutureExt ,
13+ } ;
1114use log:: { debug, error, info, trace} ;
1215use lru:: LruCache ;
1316use network_api:: messages:: {
@@ -183,15 +186,55 @@ impl EventHandler<Self, NotificationMessage> for NetworkActorService {
183186 fn handle_event (
184187 & mut self ,
185188 msg : NotificationMessage ,
186- _ctx : & mut ServiceContext < NetworkActorService > ,
189+ ctx : & mut ServiceContext < NetworkActorService > ,
187190 ) {
188- self . inner . broadcast ( msg) ;
191+ let prepared_to_broadcast = self . inner . prepare_broadcast ( msg) ;
192+ let network_service = self . network_service ( ) ;
193+ let fut = stream:: iter ( prepared_to_broadcast) . for_each_concurrent (
194+ Some ( 5 ) ,
195+ move |( protocol, peer_id, data) | {
196+ let network_service = network_service. clone ( ) ;
197+ async move {
198+ if network_service
199+ . write_notification_async ( peer_id. clone ( ) . into ( ) , protocol. clone ( ) , data)
200+ . await
201+ . is_err ( )
202+ {
203+ error ! (
204+ "[network] write notification failed on {}, {}" ,
205+ peer_id, protocol
206+ ) ;
207+ }
208+ }
209+ } ,
210+ ) ;
211+ ctx. spawn ( fut)
189212 }
190213}
191214
192215impl EventHandler < Self , PeerMessage > for NetworkActorService {
193- fn handle_event ( & mut self , msg : PeerMessage , _ctx : & mut ServiceContext < NetworkActorService > ) {
194- self . inner . send_peer_message ( msg. peer_id , msg. notification ) ;
216+ fn handle_event ( & mut self , msg : PeerMessage , ctx : & mut ServiceContext < NetworkActorService > ) {
217+ let network_service = self . network_service ( ) ;
218+ let peer_id = msg. peer_id ;
219+ let notification = msg. notification ;
220+ if let Some ( ( protocol, data) ) = self
221+ . inner
222+ . prepare_send_peer_message ( peer_id. clone ( ) , notification)
223+ {
224+ let fut = async move {
225+ if network_service
226+ . write_notification_async ( peer_id. clone ( ) . into ( ) , protocol. clone ( ) , data)
227+ . await
228+ . is_err ( )
229+ {
230+ error ! (
231+ "[network] write notification failed on {}, {}" ,
232+ peer_id, protocol
233+ ) ;
234+ }
235+ } ;
236+ ctx. spawn ( fut)
237+ }
195238 }
196239}
197240
@@ -200,17 +243,38 @@ impl EventHandler<Self, PropagateTransactions> for NetworkActorService {
200243 fn handle_event (
201244 & mut self ,
202245 msg : PropagateTransactions ,
203- _ctx : & mut ServiceContext < NetworkActorService > ,
246+ ctx : & mut ServiceContext < NetworkActorService > ,
204247 ) {
205248 let txns = msg. transaction_to_propagate ( ) ;
206249 if txns. is_empty ( ) {
207250 return ;
208251 }
209252 debug ! ( "prepare to propagate txns, len: {}" , txns. len( ) ) ;
210- self . inner
211- . broadcast ( NotificationMessage :: Transactions ( TransactionsMessage :: new (
212- txns,
213- ) ) ) ;
253+ let prepared_to_broadcast =
254+ self . inner
255+ . prepare_broadcast ( NotificationMessage :: Transactions ( TransactionsMessage :: new (
256+ txns,
257+ ) ) ) ;
258+ let network_service = self . network_service ( ) ;
259+ let fut = stream:: iter ( prepared_to_broadcast) . for_each_concurrent (
260+ Some ( 5 ) ,
261+ move |( protocol, peer_id, data) | {
262+ let network_service = network_service. clone ( ) ;
263+ async move {
264+ if network_service
265+ . write_notification_async ( peer_id. clone ( ) . into ( ) , protocol. clone ( ) , data)
266+ . await
267+ . is_err ( )
268+ {
269+ error ! (
270+ "[network] write notification failed on {}, {}" ,
271+ peer_id, protocol
272+ ) ;
273+ }
274+ }
275+ } ,
276+ ) ;
277+ ctx. spawn ( fut)
214278 }
215279}
216280
@@ -510,7 +574,11 @@ impl Inner {
510574 self . peers . remove ( & peer_id) ;
511575 }
512576
513- pub ( crate ) fn send_peer_message ( & mut self , peer_id : PeerId , notification : NotificationMessage ) {
577+ pub ( crate ) fn prepare_send_peer_message (
578+ & mut self ,
579+ peer_id : PeerId ,
580+ notification : NotificationMessage ,
581+ ) -> Option < ( Cow < ' static , str > , Vec < u8 > ) > {
514582 let ( protocol_name, data) = notification
515583 . encode_notification ( )
516584 . expect ( "Encode notification message should ok" ) ;
@@ -519,7 +587,7 @@ impl Inner {
519587 "[network]protocol {:?} not supported by peer {:?}" ,
520588 protocol_name, peer_id
521589 ) ;
522- return ;
590+ return None ;
523591 }
524592 match notification {
525593 NotificationMessage :: Transactions ( txn_message) => {
@@ -540,18 +608,20 @@ impl Inner {
540608 }
541609 }
542610 } ;
543- self . network_service
544- . write_notification ( peer_id. into ( ) , protocol_name, data) ;
611+ Some ( ( protocol_name, data) )
545612 }
546613
547- pub ( crate ) fn broadcast ( & mut self , notification : NotificationMessage ) {
614+ pub ( crate ) fn prepare_broadcast (
615+ & mut self ,
616+ notification : NotificationMessage ,
617+ ) -> Vec < ( Cow < ' static , str > , PeerId , Vec < u8 > ) > {
548618 let _timer = self . metrics . as_ref ( ) . map ( |metrics| {
549619 metrics
550620 . broadcast_duration
551621 . with_label_values ( & [ notification. protocol_name ( ) . as_ref ( ) ] )
552622 . start_timer ( )
553623 } ) ;
554-
624+ let mut prepare_to_broadcast = vec ! [ ] ;
555625 match & notification {
556626 NotificationMessage :: CompactBlock ( msg) => {
557627 let id = msg. compact_block . header . id ( ) ;
@@ -605,15 +675,14 @@ impl Inner {
605675 filtered_peer_ids. iter ( ) ,
606676 ) ;
607677 let peers_send_message = selected_peers. len ( ) ;
608- for peer_id in selected_peers {
609- let peer = self . peers . get_mut ( & peer_id) . expect ( "peer should exists" ) ;
678+ for peer_id in & selected_peers {
679+ let peer = self . peers . get_mut ( peer_id) . expect ( "peer should exists" ) ;
610680 peer. known_blocks . put ( id, ( ) ) ;
611-
612- self . network_service . write_notification (
613- peer_id. into ( ) ,
681+ prepare_to_broadcast. push ( (
614682 protocol_name. clone ( ) ,
683+ peer_id. clone ( ) ,
615684 message. clone ( ) ,
616- )
685+ ) ) ;
617686 }
618687 debug ! (
619688 "[network] broadcast new compact block message {:?} to {} peers, total_peers: {}, peers_after_known_hash_filter: {}, peers_after_protocol_filter: {}" ,
@@ -693,12 +762,8 @@ impl Inner {
693762 ) ;
694763 continue ;
695764 }
696- self . network_service . write_notification (
697- peer_id. into ( ) ,
698- real_protocol_name,
699- data,
700- ) ;
701765 send_peer_count = send_peer_count. saturating_add ( 1 ) ;
766+ prepare_to_broadcast. push ( ( real_protocol_name, peer_id, data) ) ;
702767 }
703768 debug ! (
704769 "[network] broadcast new {} transactions to {} peers" ,
@@ -710,6 +775,7 @@ impl Inner {
710775 error ! ( "[network] can not broadcast announcement message directly." ) ;
711776 }
712777 }
778+ prepare_to_broadcast
713779 }
714780}
715781
0 commit comments