@@ -532,8 +532,10 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
532
532
self .logMu .Lock ()
533
533
defer self .logMu .Unlock ()
534
534
535
- var id int
536
535
filter := core .NewFilter (self .backend )
536
+ id := self .filterManager .InstallFilter (filter )
537
+ self .logQueue [id ] = & logQueue {timeout : time .Now ()}
538
+
537
539
filter .SetEarliestBlock (earliest )
538
540
filter .SetLatestBlock (latest )
539
541
filter .SetSkip (skip )
@@ -544,10 +546,10 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
544
546
self .logMu .Lock ()
545
547
defer self .logMu .Unlock ()
546
548
547
- self .logQueue [id ].add (logs ... )
549
+ if queue := self .logQueue [id ]; queue != nil {
550
+ queue .add (logs ... )
551
+ }
548
552
}
549
- id = self .filterManager .InstallFilter (filter )
550
- self .logQueue [id ] = & logQueue {timeout : time .Now ()}
551
553
552
554
return id
553
555
}
@@ -556,33 +558,37 @@ func (self *XEth) NewTransactionFilter() int {
556
558
self .transactionMu .Lock ()
557
559
defer self .transactionMu .Unlock ()
558
560
559
- var id int
560
561
filter := core .NewFilter (self .backend )
562
+ id := self .filterManager .InstallFilter (filter )
563
+ self .transactionQueue [id ] = & hashQueue {timeout : time .Now ()}
564
+
561
565
filter .TransactionCallback = func (tx * types.Transaction ) {
562
566
self .transactionMu .Lock ()
563
567
defer self .transactionMu .Unlock ()
564
568
565
- self .transactionQueue [id ].add (tx .Hash ())
569
+ if queue := self .transactionQueue [id ]; queue != nil {
570
+ queue .add (tx .Hash ())
571
+ }
566
572
}
567
- id = self .filterManager .InstallFilter (filter )
568
- self .transactionQueue [id ] = & hashQueue {timeout : time .Now ()}
569
573
return id
570
574
}
571
575
572
576
func (self * XEth ) NewBlockFilter () int {
573
577
self .blockMu .Lock ()
574
578
defer self .blockMu .Unlock ()
575
579
576
- var id int
577
580
filter := core .NewFilter (self .backend )
581
+ id := self .filterManager .InstallFilter (filter )
582
+ self .blockQueue [id ] = & hashQueue {timeout : time .Now ()}
583
+
578
584
filter .BlockCallback = func (block * types.Block , logs state.Logs ) {
579
585
self .blockMu .Lock ()
580
586
defer self .blockMu .Unlock ()
581
587
582
- self .blockQueue [id ].add (block .Hash ())
588
+ if queue := self .blockQueue [id ]; queue != nil {
589
+ queue .add (block .Hash ())
590
+ }
583
591
}
584
- id = self .filterManager .InstallFilter (filter )
585
- self .blockQueue [id ] = & hashQueue {timeout : time .Now ()}
586
592
return id
587
593
}
588
594
@@ -1022,33 +1028,49 @@ func (m callmsg) Value() *big.Int { return m.value }
1022
1028
func (m callmsg ) Data () []byte { return m .data }
1023
1029
1024
1030
type logQueue struct {
1031
+ mu sync.Mutex
1032
+
1025
1033
logs state.Logs
1026
1034
timeout time.Time
1027
1035
id int
1028
1036
}
1029
1037
1030
1038
func (l * logQueue ) add (logs ... * state.Log ) {
1039
+ l .mu .Lock ()
1040
+ defer l .mu .Unlock ()
1041
+
1031
1042
l .logs = append (l .logs , logs ... )
1032
1043
}
1033
1044
1034
1045
func (l * logQueue ) get () state.Logs {
1046
+ l .mu .Lock ()
1047
+ defer l .mu .Unlock ()
1048
+
1035
1049
l .timeout = time .Now ()
1036
1050
tmp := l .logs
1037
1051
l .logs = nil
1038
1052
return tmp
1039
1053
}
1040
1054
1041
1055
type hashQueue struct {
1056
+ mu sync.Mutex
1057
+
1042
1058
hashes []common.Hash
1043
1059
timeout time.Time
1044
1060
id int
1045
1061
}
1046
1062
1047
1063
func (l * hashQueue ) add (hashes ... common.Hash ) {
1064
+ l .mu .Lock ()
1065
+ defer l .mu .Unlock ()
1066
+
1048
1067
l .hashes = append (l .hashes , hashes ... )
1049
1068
}
1050
1069
1051
1070
func (l * hashQueue ) get () []common.Hash {
1071
+ l .mu .Lock ()
1072
+ defer l .mu .Unlock ()
1073
+
1052
1074
l .timeout = time .Now ()
1053
1075
tmp := l .hashes
1054
1076
l .hashes = nil
0 commit comments