Skip to content

Commit 10c880a

Browse files
authored
[batch] correctly apply FillPercent at bucket level (iotexproject#2418)
1 parent a671c90 commit 10c880a

12 files changed

+218
-248
lines changed

blockindex/indexer.go

+6-11
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ type (
6666
genesisHash hash.Hash256
6767
kvStore db.KVStoreWithRange
6868
batch batch.KVStoreBatch
69-
cBatch batch.KVStoreBatch
7069
dirtyAddr addrIndex
7170
tbk db.CountingIndex
7271
tac db.CountingIndex
@@ -85,7 +84,6 @@ func NewIndexer(kv db.KVStore, genesisHash hash.Hash256) (Indexer, error) {
8584
x := blockIndexer{
8685
kvStore: kvRange,
8786
batch: batch.NewBatch(),
88-
cBatch: batch.NewBatch(),
8987
dirtyAddr: make(addrIndex),
9088
genesisHash: genesisHash,
9189
}
@@ -305,7 +303,7 @@ func (x *blockIndexer) putBlock(blk *block.Block) error {
305303
hash: hash[:],
306304
numAction: uint32(len(blk.Actions)),
307305
tsfAmount: blk.CalculateTransferAmount()}
308-
if err := x.tbk.UseBatch(x.cBatch); err != nil {
306+
if err := x.tbk.UseBatch(x.batch); err != nil {
309307
return err
310308
}
311309
if err := x.tbk.Add(bd.Serialize(), true); err != nil {
@@ -315,7 +313,7 @@ func (x *blockIndexer) putBlock(blk *block.Block) error {
315313
// store height of the block, so getReceiptByActionHash() can use height to directly pull receipts
316314
ad := (&actionIndex{
317315
blkHeight: blk.Height()}).Serialize()
318-
if err := x.tac.UseBatch(x.cBatch); err != nil {
316+
if err := x.tac.UseBatch(x.batch); err != nil {
319317
return err
320318
}
321319
// index actions in the block
@@ -338,7 +336,7 @@ func (x *blockIndexer) commit() error {
338336
var commitErr error
339337
for k, v := range x.dirtyAddr {
340338
if commitErr == nil {
341-
if err := v.AddTotalSize(); err != nil {
339+
if err := v.Finalize(); err != nil {
342340
commitErr = err
343341
}
344342
}
@@ -348,13 +346,10 @@ func (x *blockIndexer) commit() error {
348346
return commitErr
349347
}
350348
// total block and total action index
351-
if err := x.tbk.AddTotalSize(); err != nil {
349+
if err := x.tbk.Finalize(); err != nil {
352350
return err
353351
}
354-
if err := x.tac.AddTotalSize(); err != nil {
355-
return err
356-
}
357-
if err := db.CommitWithFillPercent(x.kvStore, x.cBatch, 1.0); err != nil {
352+
if err := x.tac.Finalize(); err != nil {
358353
return err
359354
}
360355
return x.kvStore.WriteBatch(x.batch)
@@ -375,7 +370,7 @@ func (x *blockIndexer) getIndexerForAddr(addr []byte, batch bool) (db.CountingIn
375370
if err != nil {
376371
return nil, err
377372
}
378-
if err := indexer.UseBatch(x.cBatch); err != nil {
373+
if err := indexer.UseBatch(x.batch); err != nil {
379374
return nil, err
380375
}
381376
x.dirtyAddr[address] = indexer

db/batch/batch.go

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ type (
5151
Clear()
5252
// Translate clones the batch
5353
Translate(WriteInfoTranslate) KVStoreBatch
54+
// CheckFillPercent
55+
CheckFillPercent(string) (float64, bool)
56+
// AddFillPercent
57+
AddFillPercent(string, float64)
5458
}
5559

5660
// CachedBatch derives from Batch interface

db/batch/batch_impl.go

+38-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ type (
1717
// baseKVStoreBatch is the base implementation of KVStoreBatch
1818
baseKVStoreBatch struct {
1919
mutex sync.RWMutex
20+
fillLock sync.RWMutex
2021
writeQueue []*WriteInfo
22+
fill map[string]float64
2123
}
2224

2325
// cachedBatch implements the CachedBatch interface
@@ -32,7 +34,9 @@ type (
3234
)
3335

3436
func newBaseKVStoreBatch() *baseKVStoreBatch {
35-
return &baseKVStoreBatch{}
37+
return &baseKVStoreBatch{
38+
fill: make(map[string]float64),
39+
}
3640
}
3741

3842
// NewBatch returns a batch
@@ -54,6 +58,12 @@ func (b *baseKVStoreBatch) Unlock() {
5458
func (b *baseKVStoreBatch) ClearAndUnlock() {
5559
defer b.mutex.Unlock()
5660
b.writeQueue = nil
61+
62+
b.fillLock.Lock()
63+
defer b.fillLock.Unlock()
64+
for k := range b.fill {
65+
delete(b.fill, k)
66+
}
5767
}
5868

5969
// Put inserts a <key, value> record
@@ -107,6 +117,12 @@ func (b *baseKVStoreBatch) Clear() {
107117
b.mutex.Lock()
108118
defer b.mutex.Unlock()
109119
b.writeQueue = nil
120+
121+
b.fillLock.Lock()
122+
defer b.fillLock.Unlock()
123+
for k := range b.fill {
124+
delete(b.fill, k)
125+
}
110126
}
111127

112128
func (b *baseKVStoreBatch) Translate(wit WriteInfoTranslate) KVStoreBatch {
@@ -133,6 +149,19 @@ func (b *baseKVStoreBatch) Translate(wit WriteInfoTranslate) KVStoreBatch {
133149
return c
134150
}
135151

152+
func (b *baseKVStoreBatch) CheckFillPercent(ns string) (float64, bool) {
153+
b.fillLock.RLock()
154+
defer b.fillLock.RUnlock()
155+
p, ok := b.fill[ns]
156+
return p, ok
157+
}
158+
159+
func (b *baseKVStoreBatch) AddFillPercent(ns string, percent float64) {
160+
b.fillLock.Lock()
161+
defer b.fillLock.Unlock()
162+
b.fill[ns] = percent
163+
}
164+
136165
// batch puts an entry into the write queue
137166
func (b *baseKVStoreBatch) batch(op WriteType, namespace string, key, value []byte, errorFormat string, errorArgs ...interface{}) {
138167
b.writeQueue = append(
@@ -273,6 +302,14 @@ func (cb *cachedBatch) Revert(snapshot int) error {
273302
return nil
274303
}
275304

305+
func (cb *cachedBatch) CheckFillPercent(ns string) (float64, bool) {
306+
return cb.kvStoreBatch.CheckFillPercent(ns)
307+
}
308+
309+
func (cb *cachedBatch) AddFillPercent(ns string, percent float64) {
310+
cb.kvStoreBatch.AddFillPercent(ns, percent)
311+
}
312+
276313
func (cb *cachedBatch) hash(namespace string, key []byte) hash.Hash160 {
277314
return hash.Hash160b(append([]byte(namespace), key...))
278315
}

db/batch/batch_impl_test.go

+23-13
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,32 @@ var (
2727
)
2828

2929
func TestBaseKVStoreBatch(t *testing.T) {
30+
require := require.New(t)
31+
3032
b := NewBatch()
31-
require.Equal(t, 0, b.Size())
33+
require.Equal(0, b.Size())
3234
b.Put("ns", []byte{}, []byte{}, "")
33-
require.Equal(t, 1, b.Size())
35+
require.Equal(1, b.Size())
3436
_, err := b.Entry(1)
35-
require.Error(t, err)
37+
require.Error(err)
3638
b.Delete("ns", []byte{}, "")
37-
require.Equal(t, 2, b.Size())
39+
require.Equal(2, b.Size())
3840
wi, err := b.Entry(1)
39-
require.NoError(t, err)
40-
require.Equal(t, Delete, wi.WriteType())
41-
require.True(t, bytes.Equal([]byte{0, 110, 115, 1, 110, 115}, b.SerializeQueue(nil, nil)))
42-
require.True(t, bytes.Equal([]byte{}, b.SerializeQueue(nil, func(wi *WriteInfo) bool {
41+
require.NoError(err)
42+
require.Equal(Delete, wi.WriteType())
43+
b.AddFillPercent("test", 0.5)
44+
p, ok := b.CheckFillPercent("ns")
45+
require.False(ok)
46+
p, ok = b.CheckFillPercent("test")
47+
require.True(ok)
48+
require.Equal(0.5, p)
49+
50+
// test serialize/translate
51+
require.True(bytes.Equal([]byte{0, 110, 115, 1, 110, 115}, b.SerializeQueue(nil, nil)))
52+
require.True(bytes.Equal([]byte{}, b.SerializeQueue(nil, func(wi *WriteInfo) bool {
4353
return wi.Namespace() == "ns"
4454
})))
45-
require.True(t, bytes.Equal([]byte{110, 115, 110, 115}, b.SerializeQueue(func(wi *WriteInfo) []byte {
55+
require.True(bytes.Equal([]byte{110, 115, 110, 115}, b.SerializeQueue(func(wi *WriteInfo) []byte {
4656
return wi.SerializeWithoutWriteType()
4757
}, nil)))
4858
newb := b.Translate(func(wi *WriteInfo) *WriteInfo {
@@ -59,11 +69,11 @@ func TestBaseKVStoreBatch(t *testing.T) {
5969
return wi
6070
})
6171
newEntry1, err := newb.Entry(1)
62-
require.NoError(t, err)
63-
require.Equal(t, "to_delete_ns", newEntry1.Namespace())
64-
require.Equal(t, Put, newEntry1.WriteType())
72+
require.NoError(err)
73+
require.Equal("to_delete_ns", newEntry1.Namespace())
74+
require.Equal(Put, newEntry1.WriteType())
6575
b.Clear()
66-
require.Equal(t, 0, b.Size())
76+
require.Equal(0, b.Size())
6777
}
6878

6979
func TestCachedBatch(t *testing.T) {

db/counting_index.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ type (
4343
Commit() error
4444
// UseBatch
4545
UseBatch(batch.KVStoreBatch) error
46-
// AddTotalSize
47-
AddTotalSize() error
46+
// Finalize
47+
Finalize() error
4848
}
4949

5050
// countingIndex is CountingIndex implementation based on KVStore
@@ -121,7 +121,8 @@ func (c *countingIndex) Add(value []byte, inBatch bool) error {
121121
b := batch.NewBatch()
122122
b.Put(c.bucket, byteutil.Uint64ToBytesBigEndian(c.size), value, "failed to add %d-th item", c.size+1)
123123
b.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(c.size+1), "failed to update size = %d", c.size+1)
124-
if err := CommitWithFillPercent(c.kvStore, b, 1.0); err != nil {
124+
b.AddFillPercent(c.bucket, 1.0)
125+
if err := c.kvStore.WriteBatch(b); err != nil {
125126
return err
126127
}
127128
c.size++
@@ -168,7 +169,8 @@ func (c *countingIndex) Revert(count uint64) error {
168169
b.Delete(c.bucket, byteutil.Uint64ToBytesBigEndian(start+i), "failed to delete %d-th item", start+i)
169170
}
170171
b.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(start), "failed to update size = %d", start)
171-
if err := CommitWithFillPercent(c.kvStore, b, 1.0); err != nil {
172+
b.AddFillPercent(c.bucket, 1.0)
173+
if err := c.kvStore.WriteBatch(b); err != nil {
172174
return err
173175
}
174176
c.size = start
@@ -188,7 +190,8 @@ func (c *countingIndex) Commit() error {
188190
return nil
189191
}
190192
c.batch.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(c.size), "failed to update size = %d", c.size)
191-
if err := CommitWithFillPercent(c.kvStore, c.batch, 1.0); err != nil {
193+
c.batch.AddFillPercent(c.bucket, 1.0)
194+
if err := c.kvStore.WriteBatch(c.batch); err != nil {
192195
return err
193196
}
194197
c.batch = nil
@@ -204,12 +207,13 @@ func (c *countingIndex) UseBatch(b batch.KVStoreBatch) error {
204207
return nil
205208
}
206209

207-
// AddTotalSize updates the total size before committing the (usually common) batch
208-
func (c *countingIndex) AddTotalSize() error {
210+
// Finalize updates the total size before committing the (usually common) batch
211+
func (c *countingIndex) Finalize() error {
209212
if c.batch == nil {
210213
return ErrInvalid
211214
}
212215
c.batch.Put(c.bucket, CountKey, byteutil.Uint64ToBytesBigEndian(c.size), "failed to update size = %d", c.size)
216+
c.batch.AddFillPercent(c.bucket, 1.0)
213217
c.batch = nil
214218
return nil
215219
}

db/counting_index_test.go

+24-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/iotexproject/go-pkgs/hash"
1919

20+
"github.com/iotexproject/iotex-core/config"
2021
"github.com/iotexproject/iotex-core/db/batch"
2122
"github.com/iotexproject/iotex-core/pkg/log"
2223
"github.com/iotexproject/iotex-core/testutil"
@@ -77,13 +78,19 @@ func TestCountingIndex(t *testing.T) {
7778
require.NoError(index.Add(h[:], false))
7879
}
7980
require.EqualValues(250, index.Size())
81+
8082
// use external batch
81-
require.NoError(index.UseBatch(batch.NewBatch()))
83+
require.Equal(ErrInvalid, index.Finalize())
84+
b := batch.NewBatch()
85+
require.NoError(index.UseBatch(b))
8286
for i := 250; i < 300; i++ {
8387
h := hash.Hash160b([]byte(strconv.Itoa(i)))
8488
require.NoError(index.Add(h[:], true))
8589
}
86-
require.NoError(index.Commit())
90+
require.NoError(index.Finalize())
91+
cIndex, ok := index.(*countingIndex)
92+
require.True(ok)
93+
require.NoError(cIndex.kvStore.WriteBatch(b))
8794
require.EqualValues(300, index.Size())
8895

8996
_, err = index.Range(248, 0)
@@ -142,19 +149,22 @@ func TestCountingIndex(t *testing.T) {
142149
}
143150
}
144151

145-
t.Run("in-mem KVStore", func(t *testing.T) {
146-
testFunc(NewMemKVStore(), t)
147-
})
148-
149-
path := "test-iterate.bolt"
152+
path := "test-counting.bolt"
150153
testPath, err := testutil.PathOfTempFile(path)
151154
require.NoError(t, err)
155+
testutil.CleanupPath(t, testPath)
156+
defer testutil.CleanupPath(t, testPath)
157+
cfg := config.Default.DB
152158
cfg.DbPath = testPath
153-
t.Run("Bolt DB", func(t *testing.T) {
154-
testutil.CleanupPath(t, testPath)
155-
defer testutil.CleanupPath(t, testPath)
156-
testFunc(NewBoltDB(cfg), t)
157-
})
159+
160+
for _, v := range []KVStore{
161+
NewMemKVStore(),
162+
NewBoltDB(cfg),
163+
} {
164+
t.Run("test counting index", func(t *testing.T) {
165+
testFunc(v, t)
166+
})
167+
}
158168
}
159169

160170
const (
@@ -191,6 +201,7 @@ func TestBulk(t *testing.T) {
191201
}
192202
}
193203

204+
cfg := config.Default.DB
194205
cfg.DbPath = "test-bulk.dat"
195206
t.Run("Bolt DB", func(t *testing.T) {
196207
testutil.CleanupPath(t, cfg.DbPath)
@@ -230,6 +241,7 @@ func TestCheckBulk(t *testing.T) {
230241
}
231242
}
232243

244+
cfg := config.Default.DB
233245
cfg.DbPath = "test-bulk.dat"
234246
t.Run("Bolt DB", func(t *testing.T) {
235247
defer testutil.CleanupPath(t, cfg.DbPath)

db/db_bolt.go

+3-12
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type boltDB struct {
2828
}
2929

3030
// NewBoltDB instantiates an BoltDB with implements KVStore
31-
func NewBoltDB(cfg config.DB) KVStoreWithBucketFillPercent {
31+
func NewBoltDB(cfg config.DB) KVStore {
3232
return &boltDB{
3333
db: nil,
3434
path: cfg.DbPath,
@@ -251,15 +251,6 @@ func (b *boltDB) Delete(namespace string, key []byte) (err error) {
251251

252252
// WriteBatch commits a batch
253253
func (b *boltDB) WriteBatch(kvsb batch.KVStoreBatch) (err error) {
254-
return b.writeBatch(kvsb, 0.0)
255-
}
256-
257-
// WriteBatchWithFillPercent commits a batch with specified fill percent
258-
func (b *boltDB) WriteBatchWithFillPercent(kvsb batch.KVStoreBatch, percent float64) (err error) {
259-
return b.writeBatch(kvsb, percent)
260-
}
261-
262-
func (b *boltDB) writeBatch(kvsb batch.KVStoreBatch, percent float64) (err error) {
263254
succeed := true
264255
kvsb.Lock()
265256
defer func() {
@@ -286,8 +277,8 @@ func (b *boltDB) writeBatch(kvsb batch.KVStoreBatch, percent float64) (err error
286277
if e != nil {
287278
return errors.Wrapf(e, errFmt, errArgs)
288279
}
289-
if percent != 0.0 {
290-
bucket.FillPercent = percent
280+
if p, ok := kvsb.CheckFillPercent(ns); ok {
281+
bucket.FillPercent = p
291282
}
292283
if e := bucket.Put(write.Key(), write.Value()); e != nil {
293284
return errors.Wrapf(e, errFmt, errArgs)

0 commit comments

Comments
 (0)