3
3
import com .google .common .hash .BloomFilter ;
4
4
import com .google .common .hash .Funnels ;
5
5
import com .google .common .primitives .Longs ;
6
+ import java .io .BufferedInputStream ;
7
+ import java .io .BufferedOutputStream ;
8
+ import java .io .InputStream ;
9
+ import java .io .InputStreamReader ;
10
+ import java .io .OutputStream ;
11
+ import java .io .Reader ;
12
+ import java .io .Writer ;
13
+ import java .nio .charset .StandardCharsets ;
14
+ import java .nio .file .Files ;
15
+ import java .nio .file .Path ;
6
16
import java .nio .file .Paths ;
17
+ import java .nio .file .StandardOpenOption ;
7
18
import java .util .Iterator ;
8
19
import java .util .Map ;
9
20
import java .util .Map .Entry ;
21
+ import java .util .Properties ;
22
+ import java .util .concurrent .CompletableFuture ;
23
+ import java .util .concurrent .atomic .AtomicBoolean ;
24
+ import lombok .Getter ;
25
+ import lombok .Setter ;
10
26
import lombok .extern .slf4j .Slf4j ;
11
27
import org .apache .commons .lang3 .ArrayUtils ;
12
28
import org .bouncycastle .util .encoders .Hex ;
17
33
import org .tron .common .storage .leveldb .LevelDbDataSourceImpl ;
18
34
import org .tron .common .storage .rocksdb .RocksDbDataSourceImpl ;
19
35
import org .tron .common .utils .ByteArray ;
36
+ import org .tron .common .utils .FileUtil ;
20
37
import org .tron .common .utils .JsonUtil ;
21
38
import org .tron .common .utils .StorageUtils ;
22
39
import org .tron .core .capsule .BytesCapsule ;
@@ -42,6 +59,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
42
59
private BloomFilter <byte []>[] bloomFilters = new BloomFilter [2 ];
43
60
// filterStartBlock record the start block of the active filter
44
61
private volatile long filterStartBlock = INVALID_BLOCK ;
62
+ private volatile long currentBlockNum = INVALID_BLOCK ;
45
63
// currentFilterIndex records the index of the active filter
46
64
private volatile int currentFilterIndex = 0 ;
47
65
@@ -57,6 +75,16 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
57
75
// replace persistentStore and optimizes startup performance
58
76
private RecentTransactionStore recentTransactionStore ;
59
77
78
+ private final Path cacheFile0 ;
79
+ private final Path cacheFile1 ;
80
+ private final Path cacheProperties ;
81
+ private final Path cacheDir ;
82
+ private AtomicBoolean isValid = new AtomicBoolean (false );
83
+
84
+ @ Getter
85
+ @ Setter
86
+ private volatile boolean alive ;
87
+
60
88
public TxCacheDB (String name , RecentTransactionStore recentTransactionStore ) {
61
89
this .name = name ;
62
90
this .TRANSACTION_COUNT =
@@ -85,6 +113,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
85
113
MAX_BLOCK_SIZE * TRANSACTION_COUNT );
86
114
this .bloomFilters [1 ] = BloomFilter .create (Funnels .byteArrayFunnel (),
87
115
MAX_BLOCK_SIZE * TRANSACTION_COUNT );
116
+ cacheDir = Paths .get (CommonParameter .getInstance ().getOutputDirectory (), ".cache" );
117
+ this .cacheFile0 = Paths .get (cacheDir .toString (), "bloomFilters_0" );
118
+ this .cacheFile1 = Paths .get (cacheDir .toString (), "bloomFilters_1" );
119
+ this .cacheProperties = Paths .get (cacheDir .toString (), "txCache.properties" );
88
120
89
121
}
90
122
@@ -110,6 +142,11 @@ private void initCache() {
110
142
}
111
143
112
144
public void init () {
145
+ if (recovery ()) {
146
+ isValid .set (true );
147
+ setAlive (true );
148
+ return ;
149
+ }
113
150
long size = recentTransactionStore .size ();
114
151
if (size != MAX_BLOCK_SIZE ) {
115
152
// 0. load from persistentStore
@@ -129,6 +166,8 @@ public void init() {
129
166
logger .info ("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms." ,
130
167
bloomFilters [1 ].approximateElementCount (), bloomFilters [1 ].expectedFpp (),
131
168
System .currentTimeMillis () - start );
169
+ isValid .set (true );
170
+ setAlive (true );
132
171
}
133
172
134
173
@ Override
@@ -172,7 +211,7 @@ public void put(byte[] key, byte[] value) {
172
211
MAX_BLOCK_SIZE * TRANSACTION_COUNT );
173
212
}
174
213
bloomFilters [currentFilterIndex ].put (key );
175
-
214
+ currentBlockNum = blockNum ;
176
215
if (lastMetricBlock != blockNum ) {
177
216
lastMetricBlock = blockNum ;
178
217
Metrics .gaugeSet (MetricKeys .Gauge .TX_CACHE ,
@@ -208,22 +247,138 @@ public Iterator<Entry<byte[], byte[]>> iterator() {
208
247
}
209
248
210
249
@ Override
211
- public void flush (Map <WrappedByteArray , WrappedByteArray > batch ) {
250
+ public synchronized void flush (Map <WrappedByteArray , WrappedByteArray > batch ) {
251
+ isValid .set (false );
212
252
batch .forEach ((k , v ) -> this .put (k .getBytes (), v .getBytes ()));
253
+ isValid .set (true );
213
254
}
214
255
215
256
@ Override
216
257
public void close () {
217
- reset ();
258
+ if (!isAlive ()) {
259
+ return ;
260
+ }
261
+ dump ();
218
262
bloomFilters [0 ] = null ;
219
263
bloomFilters [1 ] = null ;
220
264
persistentStore .close ();
265
+ setAlive (false );
221
266
}
222
267
223
268
@ Override
224
269
public void reset () {
225
270
}
226
271
272
+ private boolean recovery () {
273
+ FileUtil .createDirIfNotExists (this .cacheDir .toString ());
274
+ logger .info ("recovery bloomFilters start." );
275
+ CompletableFuture <Boolean > loadProperties = CompletableFuture .supplyAsync (this ::loadProperties );
276
+ CompletableFuture <Boolean > tk0 = loadProperties .thenApplyAsync (
277
+ v -> recovery (0 , this .cacheFile0 ));
278
+ CompletableFuture <Boolean > tk1 = loadProperties .thenApplyAsync (
279
+ v -> recovery (1 , this .cacheFile1 ));
280
+
281
+ return CompletableFuture .allOf (tk0 , tk1 ).thenApply (v -> {
282
+ logger .info ("recovery bloomFilters success." );
283
+ return true ;
284
+ }).exceptionally (this ::handleException ).join ();
285
+ }
286
+
287
+ private boolean recovery (int index , Path file ) {
288
+ try (InputStream in = new BufferedInputStream (Files .newInputStream (file ,
289
+ StandardOpenOption .READ , StandardOpenOption .DELETE_ON_CLOSE ))) {
290
+ logger .info ("recovery bloomFilter[{}] from file." , index );
291
+ long start = System .currentTimeMillis ();
292
+ bloomFilters [index ] = BloomFilter .readFrom (in , Funnels .byteArrayFunnel ());
293
+ logger .info ("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms." ,
294
+ index , bloomFilters [index ].approximateElementCount (), bloomFilters [index ].expectedFpp (),
295
+ System .currentTimeMillis () - start );
296
+ return true ;
297
+ } catch (Exception e ) {
298
+ throw new RuntimeException (e );
299
+ }
300
+ }
301
+
302
+ private boolean handleException (Throwable e ) {
303
+ bloomFilters [0 ] = BloomFilter .create (Funnels .byteArrayFunnel (),
304
+ MAX_BLOCK_SIZE * TRANSACTION_COUNT );
305
+ bloomFilters [1 ] = BloomFilter .create (Funnels .byteArrayFunnel (),
306
+ MAX_BLOCK_SIZE * TRANSACTION_COUNT );
307
+ try {
308
+ Files .deleteIfExists (this .cacheFile0 );
309
+ Files .deleteIfExists (this .cacheFile1 );
310
+ } catch (Exception ignored ) {
311
+
312
+ }
313
+ logger .info ("recovery bloomFilters failed. {}" , e .getMessage ());
314
+ logger .info ("rollback to previous mode." );
315
+ return false ;
316
+ }
317
+
318
+ private void dump () {
319
+ if (!isValid .get ()) {
320
+ logger .info ("bloomFilters is not valid." );
321
+ }
322
+ FileUtil .createDirIfNotExists (this .cacheDir .toString ());
323
+ logger .info ("dump bloomFilters start." );
324
+ CompletableFuture <Void > task0 = CompletableFuture .runAsync (
325
+ () -> dump (0 , this .cacheFile0 ));
326
+ CompletableFuture <Void > task1 = CompletableFuture .runAsync (
327
+ () -> dump (1 , this .cacheFile1 ));
328
+ CompletableFuture .allOf (task0 , task1 ).thenRun (() -> {
329
+ writeProperties ();
330
+ logger .info ("dump bloomFilters done." );
331
+
332
+ }).exceptionally (e -> {
333
+ logger .info ("dump bloomFilters to file failed. {}" , e .getMessage ());
334
+ return null ;
335
+ }).join ();
336
+ }
337
+
338
+ private void dump (int index , Path file ) {
339
+ try (OutputStream out = new BufferedOutputStream (Files .newOutputStream (file ))) {
340
+ logger .info ("dump bloomFilters[{}] to file." , index );
341
+ long start = System .currentTimeMillis ();
342
+ bloomFilters [index ].writeTo (out );
343
+ logger .info ("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms." ,
344
+ index , bloomFilters [index ].approximateElementCount (), bloomFilters [index ].expectedFpp (),
345
+ System .currentTimeMillis () - start );
346
+ } catch (Exception e ) {
347
+ throw new RuntimeException (e );
348
+ }
349
+ }
350
+
351
+ private boolean loadProperties () {
352
+ try (Reader r = new InputStreamReader (new BufferedInputStream (Files .newInputStream (
353
+ this .cacheProperties , StandardOpenOption .READ , StandardOpenOption .DELETE_ON_CLOSE )),
354
+ StandardCharsets .UTF_8 )) {
355
+ Properties properties = new Properties ();
356
+ properties .load (r );
357
+ filterStartBlock = Long .parseLong (properties .getProperty ("filterStartBlock" ));
358
+ currentBlockNum = Long .parseLong (properties .getProperty ("currentBlockNum" ));
359
+ currentFilterIndex = Integer .parseInt (properties .getProperty ("currentFilterIndex" ));
360
+ logger .info ("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done." ,
361
+ filterStartBlock , currentBlockNum , currentFilterIndex );
362
+ return true ;
363
+ } catch (Exception e ) {
364
+ throw new RuntimeException (e );
365
+ }
366
+ }
367
+
368
+ private void writeProperties () {
369
+ try (Writer w = Files .newBufferedWriter (this .cacheProperties , StandardCharsets .UTF_8 )) {
370
+ Properties properties = new Properties ();
371
+ properties .setProperty ("filterStartBlock" , String .valueOf (filterStartBlock ));
372
+ properties .setProperty ("currentBlockNum" , String .valueOf (currentBlockNum ));
373
+ properties .setProperty ("currentFilterIndex" , String .valueOf (currentFilterIndex ));
374
+ properties .store (w , "Generated by the application. PLEASE DO NOT EDIT! " );
375
+ logger .info ("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done." ,
376
+ filterStartBlock , currentBlockNum , currentFilterIndex );
377
+ } catch (Exception e ) {
378
+ throw new RuntimeException (e );
379
+ }
380
+ }
381
+
227
382
@ Override
228
383
public TxCacheDB newInstance () {
229
384
return new TxCacheDB (name , recentTransactionStore );
0 commit comments