diff --git a/chainbase/src/main/java/org/tron/core/service/RewardViCalService.java b/chainbase/src/main/java/org/tron/core/service/RewardViCalService.java index b3ef76b091e..f88fd02c539 100644 --- a/chainbase/src/main/java/org/tron/core/service/RewardViCalService.java +++ b/chainbase/src/main/java/org/tron/core/service/RewardViCalService.java @@ -82,7 +82,7 @@ public void init() { // checkpoint is flushed to db, we can start rewardViCalService immediately lastBlockNumber = Long.MAX_VALUE; } - es.scheduleWithFixedDelay(this::maybeRun, 0, 3, TimeUnit.SECONDS); + ExecutorServiceManager.scheduleWithFixedDelay(es, this::maybeRun, 0, 3, TimeUnit.SECONDS); } private boolean enableNewRewardAlgorithm() { diff --git a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java index 196d44ba722..779a8edf75d 100644 --- a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java +++ b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java @@ -4,10 +4,13 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.tron.common.exit.ExitManager; @Slf4j(topic = "common-executor") public class ExecutorServiceManager { @@ -80,4 +83,30 @@ public static void shutdownAndAwaitTermination(ExecutorService pool, String name } logger.info("Pool {} shutdown done", name); } + + public static Future submit(ExecutorService es, Runnable task) { + return es.submit(() -> { + try { + task.run(); + } catch (Throwable e) { + ExitManager.findTronError(e).ifPresent(ExitManager::logAndExit); + throw e; + } + }); + } + + public static ScheduledFuture scheduleWithFixedDelay(ScheduledExecutorService es, + Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + return es.scheduleWithFixedDelay(() -> { + try { + command.run(); + } catch (Throwable e) { + ExitManager.findTronError(e).ifPresent(ExitManager::logAndExit); + throw e; + } + }, initialDelay, delay, unit); + } } diff --git a/common/src/main/java/org/tron/common/exit/ExitManager.java b/common/src/main/java/org/tron/common/exit/ExitManager.java index ff3e0de734b..d80b7838c08 100644 --- a/common/src/main/java/org/tron/common/exit/ExitManager.java +++ b/common/src/main/java/org/tron/common/exit/ExitManager.java @@ -44,7 +44,7 @@ public static Optional findTronError(Throwable e) { return Optional.empty(); } - private static void logAndExit(TronError exit) { + public static void logAndExit(TronError exit) { final int code = exit.getErrCode().getCode(); logger.error("Shutting down with code: {}.", exit.getErrCode(), exit); Thread exitThread = exitThreadFactory.newThread(() -> System.exit(code)); diff --git a/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java b/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java index 8d5697cdc89..9e42552c80f 100644 --- a/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java +++ b/consensus/src/main/java/org/tron/consensus/dpos/DposTask.java @@ -11,6 +11,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.exit.ExitManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.ByteArray; import org.tron.common.utils.Sha256Hash; @@ -68,10 +69,13 @@ public void init() { Thread.currentThread().interrupt(); } catch (Throwable throwable) { logger.error("Produce block error.", throwable); + ExitManager.findTronError(throwable).ifPresent(e -> { + throw e; + }); } } }; - produceExecutor.submit(runnable); + ExecutorServiceManager.submit(produceExecutor, runnable); logger.info("DPoS task started."); } diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 993198d60f3..55c248b1c14 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -53,6 +53,7 @@ import org.tron.common.args.GenesisBlock; import org.tron.common.bloom.Bloom; import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.exit.ExitManager; import org.tron.common.logsfilter.EventPluginLoader; import org.tron.common.logsfilter.FilterQuery; import org.tron.common.logsfilter.capsule.BlockFilterCapsule; @@ -293,6 +294,9 @@ public class Manager { Metrics.counterInc(MetricKeys.Counter.TXS, 1, MetricLabels.Counter.TXS_FAIL, MetricLabels.Counter.TXS_FAIL_ERROR); } + ExitManager.findTronError(ex).ifPresent(e -> { + throw e; + }); } finally { if (tx != null && getRePushTransactions().remove(tx)) { Metrics.gaugeInc(MetricKeys.Gauge.MANAGER_QUEUE, -1, @@ -550,18 +554,18 @@ public void init() { validateSignService = ExecutorServiceManager .newFixedThreadPool(validateSignName, Args.getInstance().getValidateSignThreadNum()); rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true); - rePushEs.submit(rePushLoop); + ExecutorServiceManager.submit(rePushEs, rePushLoop); // add contract event listener for subscribing if (Args.getInstance().isEventSubscribe()) { startEventSubscribing(); triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true); - triggerEs.submit(triggerCapsuleProcessLoop); + ExecutorServiceManager.submit(triggerEs, triggerCapsuleProcessLoop); } // start json rpc filter process if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) { filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName); - filterEs.submit(filterProcessLoop); + ExecutorServiceManager.submit(filterEs, filterProcessLoop); } //initStoreFactory diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 0ea0f24e7ae..5fab8bc6f33 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -83,7 +83,8 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep dropSmartContractCount++; } } else { - trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx))); + ExecutorServiceManager.submit( + trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); } } @@ -109,11 +110,12 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep } private void handleSmartContract() { - smartContractExecutor.scheduleWithFixedDelay(() -> { + ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> { try { while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) { TrxEvent event = smartContractQueue.take(); - trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg())); + ExecutorServiceManager.submit( + trxHandlePool, () -> handleTransaction(event.getPeer(), event.getMsg())); } } catch (InterruptedException e) { logger.warn("Handle smart server interrupted"); diff --git a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java index 0bce4b8baa5..e387329c467 100644 --- a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java +++ b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java @@ -70,7 +70,7 @@ public class SyncService { private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum(); public void init() { - fetchExecutor.scheduleWithFixedDelay(() -> { + ExecutorServiceManager.scheduleWithFixedDelay(fetchExecutor, () -> { try { if (fetchFlag) { fetchFlag = false; @@ -81,7 +81,7 @@ public void init() { } }, 10, 1, TimeUnit.SECONDS); - blockHandleExecutor.scheduleWithFixedDelay(() -> { + ExecutorServiceManager.scheduleWithFixedDelay(blockHandleExecutor, () -> { try { if (handleFlag) { handleFlag = false;