@@ -100,19 +100,20 @@ type IotxDispatcher struct {
100
100
started int32
101
101
shutdown int32
102
102
eventChan chan interface {}
103
+ syncChan chan * blockSyncMsg
103
104
eventAudit map [iotexrpc.MessageType ]int
104
105
eventAuditLock sync.RWMutex
105
106
wg sync.WaitGroup
106
107
quit chan struct {}
107
-
108
- subscribers map [uint32 ]Subscriber
109
- subscribersMU sync.RWMutex
108
+ subscribers map [uint32 ]Subscriber
109
+ subscribersMU sync.RWMutex
110
110
}
111
111
112
112
// NewDispatcher creates a new Dispatcher
113
113
func NewDispatcher (cfg config.Config ) (Dispatcher , error ) {
114
114
d := & IotxDispatcher {
115
115
eventChan : make (chan interface {}, cfg .Dispatcher .EventChanSize ),
116
+ syncChan : make (chan * blockSyncMsg , cfg .Dispatcher .EventChanSize ),
116
117
eventAudit : make (map [iotexrpc.MessageType ]int ),
117
118
quit : make (chan struct {}),
118
119
subscribers : make (map [uint32 ]Subscriber ),
@@ -136,8 +137,10 @@ func (d *IotxDispatcher) Start(ctx context.Context) error {
136
137
return errors .New ("Dispatcher already started" )
137
138
}
138
139
log .L ().Info ("Starting dispatcher." )
139
- d .wg .Add (1 )
140
+ d .wg .Add (2 )
140
141
go d .newsHandler ()
142
+ go d .syncHandler ()
143
+
141
144
return nil
142
145
}
143
146
@@ -153,9 +156,11 @@ func (d *IotxDispatcher) Stop(ctx context.Context) error {
153
156
return nil
154
157
}
155
158
156
- // EventChan returns the event chan
157
- func (d * IotxDispatcher ) EventChan () * chan interface {} {
158
- return & d .eventChan
159
+ // EventQueueSize returns the event queue size
160
+ func (d * IotxDispatcher ) EventQueueSize () int {
161
+ d .eventAuditLock .RLock ()
162
+ defer d .eventAuditLock .RUnlock ()
163
+ return len (d .eventChan ) + len (d .syncChan )
159
164
}
160
165
161
166
// EventAudit returns the event audit map
@@ -180,26 +185,43 @@ loop:
180
185
d .handleActionMsg (msg )
181
186
case * blockMsg :
182
187
d .handleBlockMsg (msg )
183
- case * blockSyncMsg :
184
- d .handleBlockSyncMsg (msg )
185
-
186
188
default :
187
189
log .L ().Warn ("Invalid message type in block handler." , zap .Any ("msg" , msg ))
188
190
}
191
+ case <- d .quit :
192
+ break loop
193
+ }
194
+ }
195
+
196
+ d .wg .Done ()
197
+ log .L ().Info ("news handler done." )
198
+ }
189
199
200
+ // syncHandler handles incoming block sync requests
201
+ func (d * IotxDispatcher ) syncHandler () {
202
+ loop:
203
+ for {
204
+ select {
205
+ case m := <- d .syncChan :
206
+ d .handleBlockSyncMsg (m )
190
207
case <- d .quit :
191
208
break loop
192
209
}
193
210
}
194
211
195
212
d .wg .Done ()
196
- log .L ().Info ("News handler done." )
213
+ log .L ().Info ("block sync handler done." )
197
214
}
198
215
199
216
// handleActionMsg handles actionMsg from all peers.
200
217
func (d * IotxDispatcher ) handleActionMsg (m * actionMsg ) {
201
- d .updateEventAudit (iotexrpc .MessageType_ACTION )
202
- if subscriber , ok := d .subscribers [m .ChainID ()]; ok {
218
+ log .L ().Debug ("receive actionMsg." )
219
+
220
+ d .subscribersMU .RLock ()
221
+ subscriber , ok := d .subscribers [m .ChainID ()]
222
+ d .subscribersMU .RUnlock ()
223
+ if ok {
224
+ d .updateEventAudit (iotexrpc .MessageType_ACTION )
203
225
if err := subscriber .HandleAction (m .ctx , m .action ); err != nil {
204
226
requestMtc .WithLabelValues ("AddAction" , "false" ).Inc ()
205
227
log .L ().Debug ("Handle action request error." , zap .Error (err ))
@@ -214,8 +236,9 @@ func (d *IotxDispatcher) handleBlockMsg(m *blockMsg) {
214
236
log .L ().Debug ("receive blockMsg." , zap .Uint64 ("height" , m .block .GetHeader ().GetCore ().GetHeight ()))
215
237
216
238
d .subscribersMU .RLock ()
217
- defer d .subscribersMU .RUnlock ()
218
- if subscriber , ok := d .subscribers [m .ChainID ()]; ok {
239
+ subscriber , ok := d .subscribers [m .ChainID ()]
240
+ d .subscribersMU .RUnlock ()
241
+ if ok {
219
242
d .updateEventAudit (iotexrpc .MessageType_BLOCK )
220
243
if err := subscriber .HandleBlock (m .ctx , m .block ); err != nil {
221
244
log .L ().Error ("Fail to handle the block." , zap .Error (err ))
@@ -232,8 +255,11 @@ func (d *IotxDispatcher) handleBlockSyncMsg(m *blockSyncMsg) {
232
255
zap .Uint64 ("start" , m .sync .Start ),
233
256
zap .Uint64 ("end" , m .sync .End ))
234
257
235
- d .updateEventAudit (iotexrpc .MessageType_BLOCK_REQUEST )
236
- if subscriber , ok := d .subscribers [m .ChainID ()]; ok {
258
+ d .subscribersMU .RLock ()
259
+ subscriber , ok := d .subscribers [m .ChainID ()]
260
+ d .subscribersMU .RUnlock ()
261
+ if ok {
262
+ d .updateEventAudit (iotexrpc .MessageType_BLOCK_REQUEST )
237
263
// dispatch to block sync
238
264
if err := subscriber .HandleSyncRequest (m .ctx , m .peer , m .sync ); err != nil {
239
265
log .L ().Error ("Failed to handle sync request." , zap .Error (err ))
@@ -272,12 +298,17 @@ func (d *IotxDispatcher) dispatchBlockSyncReq(ctx context.Context, chainID uint3
272
298
if atomic .LoadInt32 (& d .shutdown ) != 0 {
273
299
return
274
300
}
275
- d .enqueueEvent (& blockSyncMsg {
301
+
302
+ if len (d .syncChan ) == cap (d .syncChan ) {
303
+ log .L ().Warn ("dispatcher sync chan is full, drop an event." )
304
+ return
305
+ }
306
+ d .syncChan <- & blockSyncMsg {
276
307
ctx : ctx ,
277
308
chainID : chainID ,
278
309
peer : peer ,
279
310
sync : (msg ).(* iotexrpc.BlockSync ),
280
- })
311
+ }
281
312
}
282
313
283
314
// HandleBroadcast handles incoming broadcast message
@@ -288,12 +319,11 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, me
288
319
}
289
320
d .subscribersMU .RLock ()
290
321
subscriber , ok := d .subscribers [chainID ]
322
+ d .subscribersMU .RUnlock ()
291
323
if ! ok {
292
324
log .L ().Warn ("chainID has not been registered in dispatcher." , zap .Uint32 ("chainID" , chainID ))
293
- d .subscribersMU .RUnlock ()
294
325
return
295
326
}
296
- d .subscribersMU .RUnlock ()
297
327
298
328
switch msgType {
299
329
case iotexrpc .MessageType_CONSENSUS :
@@ -326,13 +356,11 @@ func (d *IotxDispatcher) HandleTell(ctx context.Context, chainID uint32, peer pe
326
356
}
327
357
328
358
func (d * IotxDispatcher ) enqueueEvent (event interface {}) {
329
- go func () {
330
- if len (d .eventChan ) == cap (d .eventChan ) {
331
- log .L ().Debug ("dispatcher event chan is full, drop an event." )
332
- return
333
- }
334
- d .eventChan <- event
335
- }()
359
+ if len (d .eventChan ) == cap (d .eventChan ) {
360
+ log .L ().Warn ("dispatcher event chan is full, drop an event." )
361
+ return
362
+ }
363
+ d .eventChan <- event
336
364
}
337
365
338
366
func (d * IotxDispatcher ) updateEventAudit (t iotexrpc.MessageType ) {
0 commit comments