Skip to content

Commit fc46cf3

Browse files
committed
Merge pull request ethereum#1946 from fjl/xeth-oom
Fix for xeth OOM issue
2 parents fd27f07 + fbdb44d commit fc46cf3

File tree

9 files changed

+127
-157
lines changed

9 files changed

+127
-157
lines changed

cmd/utils/flags.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -627,17 +627,14 @@ func StartIPC(eth *eth.Ethereum, ctx *cli.Context) error {
627627
Endpoint: IpcSocketPath(ctx),
628628
}
629629

630-
initializer := func(conn net.Conn) (shared.EthereumApi, error) {
630+
initializer := func(conn net.Conn) (comms.Stopper, shared.EthereumApi, error) {
631631
fe := useragent.NewRemoteFrontend(conn, eth.AccountManager())
632632
xeth := xeth.New(eth, fe)
633-
codec := codec.JSON
634-
635-
apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec, xeth, eth)
633+
apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec.JSON, xeth, eth)
636634
if err != nil {
637-
return nil, err
635+
return nil, nil, err
638636
}
639-
640-
return api.Merge(apis...), nil
637+
return xeth, api.Merge(apis...), nil
641638
}
642639

643640
return comms.StartIpc(config, codec.JSON, initializer)

eth/filters/filter_system.go

Lines changed: 35 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,32 @@ import (
3131
// block, transaction and log events. The Filtering system can be used to listen
3232
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
3333
type FilterSystem struct {
34-
eventMux *event.TypeMux
35-
3634
filterMu sync.RWMutex
3735
filterId int
3836
filters map[int]*Filter
3937
created map[int]time.Time
40-
41-
quit chan struct{}
38+
sub event.Subscription
4239
}
4340

4441
// NewFilterSystem returns a newly allocated filter manager
4542
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
4643
fs := &FilterSystem{
47-
eventMux: mux,
48-
filters: make(map[int]*Filter),
49-
created: make(map[int]time.Time),
44+
filters: make(map[int]*Filter),
45+
created: make(map[int]time.Time),
5046
}
47+
fs.sub = mux.Subscribe(
48+
//core.PendingBlockEvent{},
49+
core.ChainEvent{},
50+
core.TxPreEvent{},
51+
vm.Logs(nil),
52+
)
5153
go fs.filterLoop()
5254
return fs
5355
}
5456

5557
// Stop quits the filter loop required for polling events
5658
func (fs *FilterSystem) Stop() {
57-
close(fs.quit)
59+
fs.sub.Unsubscribe()
5860
}
5961

6062
// Add adds a filter to the filter manager
@@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter {
8991
// filterLoop waits for specific events from ethereum and fires their handlers
9092
// when the filter matches the requirements.
9193
func (fs *FilterSystem) filterLoop() {
92-
// Subscribe to events
93-
eventCh := fs.eventMux.Subscribe(
94-
//core.PendingBlockEvent{},
95-
core.ChainEvent{},
96-
core.TxPreEvent{},
97-
vm.Logs(nil),
98-
).Chan()
99-
100-
out:
101-
for {
102-
select {
103-
case <-fs.quit:
104-
break out
105-
case event, ok := <-eventCh:
106-
if !ok {
107-
// Event subscription closed, set the channel to nil to stop spinning
108-
eventCh = nil
109-
continue
110-
}
111-
// A real event arrived, notify the registered filters
112-
switch ev := event.Data.(type) {
113-
case core.ChainEvent:
114-
fs.filterMu.RLock()
115-
for id, filter := range fs.filters {
116-
if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
117-
filter.BlockCallback(ev.Block, ev.Logs)
118-
}
94+
for event := range fs.sub.Chan() {
95+
switch ev := event.Data.(type) {
96+
case core.ChainEvent:
97+
fs.filterMu.RLock()
98+
for id, filter := range fs.filters {
99+
if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
100+
filter.BlockCallback(ev.Block, ev.Logs)
119101
}
120-
fs.filterMu.RUnlock()
102+
}
103+
fs.filterMu.RUnlock()
121104

122-
case core.TxPreEvent:
123-
fs.filterMu.RLock()
124-
for id, filter := range fs.filters {
125-
if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
126-
filter.TransactionCallback(ev.Tx)
127-
}
105+
case core.TxPreEvent:
106+
fs.filterMu.RLock()
107+
for id, filter := range fs.filters {
108+
if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
109+
filter.TransactionCallback(ev.Tx)
128110
}
129-
fs.filterMu.RUnlock()
130-
131-
case vm.Logs:
132-
fs.filterMu.RLock()
133-
for id, filter := range fs.filters {
134-
if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
135-
msgs := filter.FilterLogs(ev)
136-
if len(msgs) > 0 {
137-
filter.LogsCallback(msgs)
138-
}
111+
}
112+
fs.filterMu.RUnlock()
113+
114+
case vm.Logs:
115+
fs.filterMu.RLock()
116+
for id, filter := range fs.filters {
117+
if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
118+
msgs := filter.FilterLogs(ev)
119+
if len(msgs) > 0 {
120+
filter.LogsCallback(msgs)
139121
}
140122
}
141-
fs.filterMu.RUnlock()
142123
}
124+
fs.filterMu.RUnlock()
143125
}
144126
}
145127
}

miner/miner.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,13 @@ func (self *Miner) Register(agent Agent) {
133133
if self.Mining() {
134134
agent.Start()
135135
}
136-
137136
self.worker.register(agent)
138137
}
139138

139+
func (self *Miner) Unregister(agent Agent) {
140+
self.worker.unregister(agent)
141+
}
142+
140143
func (self *Miner) Mining() bool {
141144
return atomic.LoadInt32(&self.mining) > 0
142145
}
@@ -146,7 +149,7 @@ func (self *Miner) HashRate() (tot int64) {
146149
// do we care this might race? is it worth we're rewriting some
147150
// aspects of the worker/locking up agents so we can get an accurate
148151
// hashrate?
149-
for _, agent := range self.worker.agents {
152+
for agent := range self.worker.agents {
150153
tot += agent.GetHashRate()
151154
}
152155
return

miner/remote_agent.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ type RemoteAgent struct {
4848
}
4949

5050
func NewRemoteAgent() *RemoteAgent {
51-
agent := &RemoteAgent{work: make(map[common.Hash]*Work), hashrate: make(map[common.Hash]hashrate)}
52-
53-
return agent
51+
return &RemoteAgent{
52+
work: make(map[common.Hash]*Work),
53+
hashrate: make(map[common.Hash]hashrate),
54+
}
5455
}
5556

5657
func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) {
@@ -75,8 +76,12 @@ func (a *RemoteAgent) Start() {
7576
}
7677

7778
func (a *RemoteAgent) Stop() {
78-
close(a.quit)
79-
close(a.workCh)
79+
if a.quit != nil {
80+
close(a.quit)
81+
}
82+
if a.workCh != nil {
83+
close(a.workCh)
84+
}
8085
}
8186

8287
// GetHashRate returns the accumulated hashrate of all identifier combined

miner/worker.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ type Result struct {
9292
type worker struct {
9393
mu sync.Mutex
9494

95-
agents []Agent
95+
agents map[Agent]struct{}
9696
recv chan *Result
9797
mux *event.TypeMux
9898
quit chan struct{}
@@ -136,6 +136,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
136136
coinbase: coinbase,
137137
txQueue: make(map[common.Hash]*types.Transaction),
138138
quit: make(chan struct{}),
139+
agents: make(map[Agent]struct{}),
139140
fullValidation: false,
140141
}
141142
go worker.update()
@@ -180,7 +181,7 @@ func (self *worker) start() {
180181
atomic.StoreInt32(&self.mining, 1)
181182

182183
// spin up agents
183-
for _, agent := range self.agents {
184+
for agent := range self.agents {
184185
agent.Start()
185186
}
186187
}
@@ -190,16 +191,14 @@ func (self *worker) stop() {
190191
defer self.mu.Unlock()
191192

192193
if atomic.LoadInt32(&self.mining) == 1 {
193-
var keep []Agent
194-
// stop all agents
195-
for _, agent := range self.agents {
194+
// Stop all agents.
195+
for agent := range self.agents {
196196
agent.Stop()
197-
// keep all that's not a cpu agent
198-
if _, ok := agent.(*CpuAgent); !ok {
199-
keep = append(keep, agent)
197+
// Remove CPU agents.
198+
if _, ok := agent.(*CpuAgent); ok {
199+
delete(self.agents, agent)
200200
}
201201
}
202-
self.agents = keep
203202
}
204203

205204
atomic.StoreInt32(&self.mining, 0)
@@ -209,10 +208,17 @@ func (self *worker) stop() {
209208
func (self *worker) register(agent Agent) {
210209
self.mu.Lock()
211210
defer self.mu.Unlock()
212-
self.agents = append(self.agents, agent)
211+
self.agents[agent] = struct{}{}
213212
agent.SetReturnCh(self.recv)
214213
}
215214

215+
func (self *worker) unregister(agent Agent) {
216+
self.mu.Lock()
217+
defer self.mu.Unlock()
218+
delete(self.agents, agent)
219+
agent.Stop()
220+
}
221+
216222
func (self *worker) update() {
217223
eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
218224
defer eventSub.Unsubscribe()
@@ -341,11 +347,9 @@ func (self *worker) push(work *Work) {
341347
glog.Infoln("You turn back and abort mining")
342348
return
343349
}
344-
345350
// push new work to agents
346-
for _, agent := range self.agents {
351+
for agent := range self.agents {
347352
atomic.AddInt32(&self.atWork, 1)
348-
349353
if agent.Work() != nil {
350354
agent.Work() <- work
351355
}

rpc/comms/ipc.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,22 @@ import (
2020
"fmt"
2121
"math/rand"
2222
"net"
23+
"os"
2324

2425
"encoding/json"
2526

27+
"github.com/ethereum/go-ethereum/logger"
28+
"github.com/ethereum/go-ethereum/logger/glog"
2629
"github.com/ethereum/go-ethereum/rpc/codec"
2730
"github.com/ethereum/go-ethereum/rpc/shared"
2831
)
2932

33+
type Stopper interface {
34+
Stop()
35+
}
36+
37+
type InitFunc func(conn net.Conn) (Stopper, shared.EthereumApi, error)
38+
3039
type IpcConfig struct {
3140
Endpoint string
3241
}
@@ -90,8 +99,38 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
9099
}
91100

92101
// Start IPC server
93-
func StartIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
94-
return startIpc(cfg, codec, initializer)
102+
func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error {
103+
l, err := ipcListen(cfg)
104+
if err != nil {
105+
return err
106+
}
107+
go ipcLoop(cfg, codec, initializer, l)
108+
return nil
109+
}
110+
111+
func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) {
112+
glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
113+
defer os.Remove(cfg.Endpoint)
114+
defer l.Close()
115+
for {
116+
conn, err := l.Accept()
117+
if err != nil {
118+
glog.V(logger.Debug).Infof("accept: %v", err)
119+
return
120+
}
121+
id := newIpcConnId()
122+
go func() {
123+
defer conn.Close()
124+
glog.V(logger.Debug).Infof("new connection with id %06d started", id)
125+
stopper, api, err := initializer(conn)
126+
if err != nil {
127+
glog.V(logger.Error).Infof("Unable to initialize IPC connection: %v", err)
128+
return
129+
}
130+
defer stopper.Stop()
131+
handle(id, conn, api, codec)
132+
}()
133+
}
95134
}
96135

97136
func newIpcConnId() int {

0 commit comments

Comments
 (0)