Skip to content

Commit 6034b82

Browse files
authored
[db] improve indexer write speed using common batch (iotexproject#2159)
1 parent 1906ae8 commit 6034b82

File tree

7 files changed

+66
-17
lines changed

7 files changed

+66
-17
lines changed

action/protocol/staking/protocol.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (p *Protocol) handle(ctx context.Context, act action.Action, csm CandidateS
265265
}
266266

267267
if receiptErr, ok := err.(ReceiptError); ok {
268-
log.L().Info("Non-critical error when processing staking action", zap.Error(err))
268+
log.L().Debug("Non-critical error when processing staking action", zap.Error(err))
269269
return p.settleAction(ctx, csm, receiptErr.ReceiptStatus(), rLog.Build(ctx, err))
270270
}
271271
return nil, err

blockindex/indexer.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type (
6666
genesisHash hash.Hash256
6767
kvStore db.KVStoreWithRange
6868
batch batch.KVStoreBatch
69+
cBatch batch.KVStoreBatch
6970
dirtyAddr addrIndex
7071
tbk db.CountingIndex
7172
tac db.CountingIndex
@@ -84,6 +85,7 @@ func NewIndexer(kv db.KVStore, genesisHash hash.Hash256) (Indexer, error) {
8485
x := blockIndexer{
8586
kvStore: kvRange,
8687
batch: batch.NewBatch(),
88+
cBatch: batch.NewBatch(),
8789
dirtyAddr: make(addrIndex),
8890
genesisHash: genesisHash,
8991
}
@@ -172,7 +174,7 @@ func (x *blockIndexer) DeleteTipBlock(blk *block.Block) error {
172174
if err := x.tac.Revert(uint64(len(blk.Actions))); err != nil {
173175
return err
174176
}
175-
return x.commit()
177+
return x.kvStore.WriteBatch(x.batch)
176178
}
177179

178180
// Height return the blockchain height
@@ -293,21 +295,29 @@ func (x *blockIndexer) putBlock(blk *block.Block) error {
293295
if height != x.tbk.Size() {
294296
return errors.Wrapf(db.ErrInvalid, "wrong block height %d, expecting %d", height, x.tbk.Size())
295297
}
298+
296299
// index hash --> height
297300
hash := blk.HashBlock()
298301
x.batch.Put(blockHashToHeightNS, hash[hashOffset:], byteutil.Uint64ToBytesBigEndian(height), "failed to put hash -> height mapping")
302+
299303
// index height --> block hash, number of actions, and total transfer amount
300304
bd := &blockIndex{
301305
hash: hash[:],
302306
numAction: uint32(len(blk.Actions)),
303307
tsfAmount: blk.CalculateTransferAmount()}
308+
if err := x.tbk.UseBatch(x.cBatch); err != nil {
309+
return err
310+
}
304311
if err := x.tbk.Add(bd.Serialize(), true); err != nil {
305312
return errors.Wrapf(err, "failed to put block %d index", height)
306313
}
307314

308315
// store height of the block, so getReceiptByActionHash() can use height to directly pull receipts
309316
ad := (&actionIndex{
310317
blkHeight: blk.Height()}).Serialize()
318+
if err := x.tac.UseBatch(x.cBatch); err != nil {
319+
return err
320+
}
311321
// index actions in the block
312322
for _, selp := range blk.Actions {
313323
actHash := selp.Hash()
@@ -328,7 +338,7 @@ func (x *blockIndexer) commit() error {
328338
var commitErr error
329339
for k, v := range x.dirtyAddr {
330340
if commitErr == nil {
331-
if err := v.Commit(); err != nil {
341+
if err := v.AddTotalSize(); err != nil {
332342
commitErr = err
333343
}
334344
}
@@ -338,10 +348,13 @@ func (x *blockIndexer) commit() error {
338348
return commitErr
339349
}
340350
// total block and total action index
341-
if err := x.tbk.Commit(); err != nil {
351+
if err := x.tbk.AddTotalSize(); err != nil {
352+
return err
353+
}
354+
if err := x.tac.AddTotalSize(); err != nil {
342355
return err
343356
}
344-
if err := x.tac.Commit(); err != nil {
357+
if err := db.CommitWithFillPercent(x.kvStore, x.cBatch, 1.0); err != nil {
345358
return err
346359
}
347360
return x.kvStore.WriteBatch(x.batch)
@@ -362,6 +375,9 @@ func (x *blockIndexer) getIndexerForAddr(addr []byte, batch bool) (db.CountingIn
362375
if err != nil {
363376
return nil, err
364377
}
378+
if err := indexer.UseBatch(x.cBatch); err != nil {
379+
return nil, err
380+
}
365381
x.dirtyAddr[address] = indexer
366382
}
367383
return indexer, nil

db/counting_index.go

+23-9
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ type (
4141
Close()
4242
// Commit commits the batch
4343
Commit() error
44+
// UseBatch
45+
UseBatch(batch.KVStoreBatch) error
46+
// AddTotalSize
47+
AddTotalSize() error
4448
}
4549

4650
// countingIndex is CountingIndex implementation based on KVStore
@@ -117,7 +121,7 @@ func (c *countingIndex) Add(value []byte, inBatch bool) error {
117121
b := batch.NewBatch()
118122
b.Put(c.bucket, byteutil.Uint64ToBytesBigEndian(c.size), value, "failed to add %d-th item", c.size+1)
119123
b.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(c.size+1), "failed to update size = %d", c.size+1)
120-
if err := c.commit(b); err != nil {
124+
if err := CommitWithFillPercent(c.kvStore, b, 1.0); err != nil {
121125
return err
122126
}
123127
c.size++
@@ -164,7 +168,7 @@ func (c *countingIndex) Revert(count uint64) error {
164168
b.Delete(c.bucket, byteutil.Uint64ToBytesBigEndian(start+i), "failed to delete %d-th item", start+i)
165169
}
166170
b.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(start), "failed to update size = %d", start)
167-
if err := c.commit(b); err != nil {
171+
if err := CommitWithFillPercent(c.kvStore, b, 1.0); err != nil {
168172
return err
169173
}
170174
c.size = start
@@ -184,18 +188,28 @@ func (c *countingIndex) Commit() error {
184188
return nil
185189
}
186190
c.batch.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(c.size), "failed to update size = %d", c.size)
187-
if err := c.commit(c.batch); err != nil {
191+
if err := CommitWithFillPercent(c.kvStore, c.batch, 1.0); err != nil {
188192
return err
189193
}
190194
c.batch = nil
191195
return nil
192196
}
193197

194-
func (c *countingIndex) commit(b batch.KVStoreBatch) error {
195-
if kvFillPercent, ok := c.kvStore.(KVStoreWithBucketFillPercent); ok {
196-
// set an aggressive fill percent
197-
// b/c counting index only appends, further inserts to the bucket would never split the page
198-
return kvFillPercent.WriteBatchWithFillPercent(b, 1.0)
198+
// UseBatch sets a (usually common) batch for the counting index to use
199+
func (c *countingIndex) UseBatch(b batch.KVStoreBatch) error {
200+
if b == nil {
201+
return ErrInvalid
199202
}
200-
return c.kvStore.WriteBatch(b)
203+
c.batch = b
204+
return nil
205+
}
206+
207+
// AddTotalSize updates the total size before committing the (usually common) batch
208+
func (c *countingIndex) AddTotalSize() error {
209+
if c.batch == nil {
210+
return ErrInvalid
211+
}
212+
c.batch.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(c.size), "failed to update size = %d", c.size)
213+
c.batch = nil
214+
return nil
201215
}

db/counting_index_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/iotexproject/go-pkgs/hash"
1919

20+
"github.com/iotexproject/iotex-core/db/batch"
2021
"github.com/iotexproject/iotex-core/pkg/log"
2122
"github.com/iotexproject/iotex-core/testutil"
2223
)
@@ -71,10 +72,18 @@ func TestCountingIndex(t *testing.T) {
7172
index, err = GetCountingIndex(kv, bucket)
7273
require.NoError(err)
7374
// write another 100 entries
74-
for i := 200; i < 300; i++ {
75+
for i := 200; i < 250; i++ {
7576
h := hash.Hash160b([]byte(strconv.Itoa(i)))
7677
require.NoError(index.Add(h[:], false))
7778
}
79+
require.EqualValues(250, index.Size())
80+
// use external batch
81+
require.NoError(index.UseBatch(batch.NewBatch()))
82+
for i := 250; i < 300; i++ {
83+
h := hash.Hash160b([]byte(strconv.Itoa(i)))
84+
require.NoError(index.Add(h[:], true))
85+
}
86+
require.NoError(index.Commit())
7887
require.EqualValues(300, index.Size())
7988

8089
_, err = index.Range(248, 0)

db/kvstore_impl.go

+10
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,13 @@ func (m *memKVStore) GetBucketByPrefix(namespace []byte) ([][]byte, error) {
144144
func (m *memKVStore) GetKeyByPrefix(namespace, prefix []byte) ([][]byte, error) {
145145
return nil, nil
146146
}
147+
148+
// CommitWithFillPercent commits the batch using fill percent, if underlying KVStore is capable
149+
func CommitWithFillPercent(kvstore KVStore, b batch.KVStoreBatch, percent float64) error {
150+
if kvFillPercent, ok := kvstore.(KVStoreWithBucketFillPercent); ok {
151+
// set an aggressive fill percent
152+
// b/c counting index only appends, further inserts to the bucket would never split the page
153+
return kvFillPercent.WriteBatchWithFillPercent(b, percent)
154+
}
155+
return kvstore.WriteBatch(b)
156+
}

state/factory/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
518518
// regenerate workingset
519519
_, err = ws.Process(ctx, blk.RunnableActions().Actions())
520520
if err != nil {
521-
log.L().Panic("Failed to update state.", zap.Error(err))
521+
log.L().Error("Failed to update state.", zap.Error(err))
522522
return err
523523
}
524524
}

state/factory/statedb.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error {
356356
if !isExist {
357357
_, err = ws.Process(ctx, blk.RunnableActions().Actions())
358358
if err != nil {
359-
log.L().Panic("Failed to update state.", zap.Error(err))
359+
log.L().Error("Failed to update state.", zap.Error(err))
360360
return err
361361
}
362362
}

0 commit comments

Comments
 (0)