Skip to content

Commit c100d81

Browse files
authored
[ISSUE apache#7328] Convergent thread pool creation (apache#7329)
* Convergence thread pool creation to facilitate subsequent iteration management * Convergence thread pool creation in ThreadPoolMonitor.java * fix unit test * Convergence ThreadPool constructor * Convergence ScheduledThreadPool constructor * remove unused import * Convergence ScheduledThreadPool constructor * remove unused import ---------
1 parent 6fd0073 commit c100d81

File tree

41 files changed

+215
-278
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+215
-278
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+19-20
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
3535
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
3636
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
37-
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
3837
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
3938
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
4039
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -98,6 +97,7 @@
9897
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
9998
import org.apache.rocketmq.common.stats.MomentStatsItem;
10099
import org.apache.rocketmq.common.utils.ServiceProvider;
100+
import org.apache.rocketmq.common.utils.ThreadUtils;
101101
import org.apache.rocketmq.logging.org.slf4j.Logger;
102102
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
103103
import org.apache.rocketmq.remoting.Configuration;
@@ -160,7 +160,6 @@
160160
import java.util.concurrent.LinkedBlockingQueue;
161161
import java.util.concurrent.ScheduledExecutorService;
162162
import java.util.concurrent.ScheduledFuture;
163-
import java.util.concurrent.ScheduledThreadPoolExecutor;
164163
import java.util.concurrent.TimeUnit;
165164
import java.util.concurrent.locks.Lock;
166165
import java.util.concurrent.locks.ReentrantLock;
@@ -455,116 +454,116 @@ protected void initializeRemotingServer() throws CloneNotSupportedException {
455454
* Initialize resources including remoting server and thread executors.
456455
*/
457456
protected void initializeResources() {
458-
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
457+
this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
459458
new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));
460459

461-
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
460+
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
462461
this.brokerConfig.getSendMessageThreadPoolNums(),
463462
this.brokerConfig.getSendMessageThreadPoolNums(),
464463
1000 * 60,
465464
TimeUnit.MILLISECONDS,
466465
this.sendThreadPoolQueue,
467466
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
468467

469-
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
468+
this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
470469
this.brokerConfig.getPullMessageThreadPoolNums(),
471470
this.brokerConfig.getPullMessageThreadPoolNums(),
472471
1000 * 60,
473472
TimeUnit.MILLISECONDS,
474473
this.pullThreadPoolQueue,
475474
new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
476475

477-
this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
476+
this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
478477
this.brokerConfig.getLitePullMessageThreadPoolNums(),
479478
this.brokerConfig.getLitePullMessageThreadPoolNums(),
480479
1000 * 60,
481480
TimeUnit.MILLISECONDS,
482481
this.litePullThreadPoolQueue,
483482
new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));
484483

485-
this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
484+
this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor(
486485
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
487486
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
488487
1000 * 60,
489488
TimeUnit.MILLISECONDS,
490489
this.putThreadPoolQueue,
491490
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
492491

493-
this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
492+
this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor(
494493
this.brokerConfig.getAckMessageThreadPoolNums(),
495494
this.brokerConfig.getAckMessageThreadPoolNums(),
496495
1000 * 60,
497496
TimeUnit.MILLISECONDS,
498497
this.ackThreadPoolQueue,
499498
new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
500499

501-
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
500+
this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor(
502501
this.brokerConfig.getQueryMessageThreadPoolNums(),
503502
this.brokerConfig.getQueryMessageThreadPoolNums(),
504503
1000 * 60,
505504
TimeUnit.MILLISECONDS,
506505
this.queryThreadPoolQueue,
507506
new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
508507

509-
this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
508+
this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor(
510509
this.brokerConfig.getAdminBrokerThreadPoolNums(),
511510
this.brokerConfig.getAdminBrokerThreadPoolNums(),
512511
1000 * 60,
513512
TimeUnit.MILLISECONDS,
514513
this.adminBrokerThreadPoolQueue,
515514
new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
516515

517-
this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
516+
this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor(
518517
this.brokerConfig.getClientManageThreadPoolNums(),
519518
this.brokerConfig.getClientManageThreadPoolNums(),
520519
1000 * 60,
521520
TimeUnit.MILLISECONDS,
522521
this.clientManagerThreadPoolQueue,
523522
new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
524523

525-
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
524+
this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
526525
this.brokerConfig.getHeartbeatThreadPoolNums(),
527526
this.brokerConfig.getHeartbeatThreadPoolNums(),
528527
1000 * 60,
529528
TimeUnit.MILLISECONDS,
530529
this.heartbeatThreadPoolQueue,
531530
new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));
532531

533-
this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
532+
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
534533
this.brokerConfig.getConsumerManageThreadPoolNums(),
535534
this.brokerConfig.getConsumerManageThreadPoolNums(),
536535
1000 * 60,
537536
TimeUnit.MILLISECONDS,
538537
this.consumerManagerThreadPoolQueue,
539538
new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));
540539

541-
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
540+
this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor(
542541
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
543542
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
544543
1000 * 60,
545544
TimeUnit.MILLISECONDS,
546545
this.replyThreadPoolQueue,
547546
new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));
548547

549-
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
548+
this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor(
550549
this.brokerConfig.getEndTransactionThreadPoolNums(),
551550
this.brokerConfig.getEndTransactionThreadPoolNums(),
552551
1000 * 60,
553552
TimeUnit.MILLISECONDS,
554553
this.endTransactionThreadPoolQueue,
555554
new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));
556555

557-
this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
556+
this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor(
558557
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
559558
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
560559
1000 * 60,
561560
TimeUnit.MILLISECONDS,
562561
this.loadBalanceThreadPoolQueue,
563562
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
564563

565-
this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
564+
this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1,
566565
new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
567-
this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
566+
this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1,
568567
new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
569568

570569
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
@@ -828,8 +827,6 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
828827

829828
initializeResources();
830829

831-
registerProcessor();
832-
833830
initializeScheduledTasks();
834831

835832
initialTransaction();
@@ -1690,6 +1687,8 @@ public void run() {
16901687
}
16911688
}
16921689
}, 10, 5, TimeUnit.SECONDS);
1690+
1691+
registerProcessor();
16931692
}
16941693

16951694
protected void scheduleSendHeartbeat() {

broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import io.netty.channel.Channel;
2020
import java.util.concurrent.ScheduledExecutorService;
21-
import java.util.concurrent.ScheduledThreadPoolExecutor;
2221
import java.util.concurrent.TimeUnit;
2322
import org.apache.rocketmq.broker.BrokerController;
2423
import org.apache.rocketmq.common.ThreadFactoryImpl;
2524
import org.apache.rocketmq.common.constant.LoggerName;
25+
import org.apache.rocketmq.common.utils.ThreadUtils;
2626
import org.apache.rocketmq.logging.org.slf4j.Logger;
2727
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2828
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -35,7 +35,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
3535

3636
public ClientHousekeepingService(final BrokerController brokerController) {
3737
this.brokerController = brokerController;
38-
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
38+
scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
3939
new ThreadFactoryImpl("ClientHousekeepingScheduledThread", brokerController.getBrokerIdentity()));
4040
}
4141

broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Map;
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.ScheduledExecutorService;
25-
import java.util.concurrent.ScheduledThreadPoolExecutor;
2625
import java.util.concurrent.TimeUnit;
2726
import org.apache.rocketmq.broker.BrokerController;
2827
import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
3736
private final BrokerController brokerController;
3837
private final int cacheSize = 8096;
3938

40-
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
39+
private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
4140
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));
4241

4342
private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);

broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,8 @@
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.ConcurrentMap;
2929
import java.util.concurrent.ExecutorService;
30-
import java.util.concurrent.Executors;
3130
import java.util.concurrent.ScheduledExecutorService;
3231
import java.util.concurrent.ScheduledFuture;
33-
import java.util.concurrent.ThreadPoolExecutor;
3432
import java.util.concurrent.TimeUnit;
3533

3634
import org.apache.commons.lang3.StringUtils;
@@ -42,6 +40,7 @@
4240
import org.apache.rocketmq.common.Pair;
4341
import org.apache.rocketmq.common.ThreadFactoryImpl;
4442
import org.apache.rocketmq.common.constant.LoggerName;
43+
import org.apache.rocketmq.common.utils.ThreadUtils;
4544
import org.apache.rocketmq.logging.org.slf4j.Logger;
4645
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4746
import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -107,9 +106,9 @@ public class ReplicasManager {
107106
public ReplicasManager(final BrokerController brokerController) {
108107
this.brokerController = brokerController;
109108
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
110-
this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
111-
this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
112-
this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
109+
this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
110+
this.executorService = ThreadUtils.newThreadPoolExecutor(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
111+
this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
113112
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
114113
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
115114
this.brokerConfig = brokerController.getBrokerConfig();

broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import io.openmessaging.storage.dledger.MemberState;
2222
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
2323
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.TimeUnit;
2726
import org.apache.rocketmq.broker.BrokerController;
2827
import org.apache.rocketmq.common.ThreadFactoryImpl;
2928
import org.apache.rocketmq.common.constant.LoggerName;
29+
import org.apache.rocketmq.common.utils.ThreadUtils;
3030
import org.apache.rocketmq.logging.org.slf4j.Logger;
3131
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3232
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -49,7 +49,7 @@ public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessag
4949
this.messageStore = messageStore;
5050
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
5151
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
52-
this.executorService = Executors.newSingleThreadExecutor(
52+
this.executorService = ThreadUtils.newSingleThreadExecutor(
5353
new ThreadFactoryImpl("DLegerRoleChangeHandler_", brokerController.getBrokerIdentity()));
5454
}
5555

broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.ThreadPoolExecutor;
2827
import java.util.concurrent.TimeUnit;
2928
import org.apache.commons.lang3.StringUtils;
3029
import org.apache.rocketmq.broker.BrokerController;
@@ -43,6 +42,7 @@
4342
import org.apache.rocketmq.common.message.MessageExt;
4443
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
4544
import org.apache.rocketmq.common.message.MessageQueue;
45+
import org.apache.rocketmq.common.utils.ThreadUtils;
4646
import org.apache.rocketmq.logging.org.slf4j.Logger;
4747
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4848
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -72,7 +72,7 @@ public EscapeBridge(BrokerController brokerController) {
7272
public void start() throws Exception {
7373
if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && brokerController.getBrokerConfig().isEnableRemoteEscape()) {
7474
final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000);
75-
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
75+
this.defaultAsyncSenderExecutor = ThreadUtils.newThreadPoolExecutor(
7676
Runtime.getRuntime().availableProcessors(),
7777
Runtime.getRuntime().availableProcessors(),
7878
1000 * 60,

broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
import java.util.concurrent.BlockingQueue;
2020
import java.util.concurrent.ScheduledExecutorService;
21-
import java.util.concurrent.ScheduledThreadPoolExecutor;
2221
import java.util.concurrent.TimeUnit;
2322
import org.apache.rocketmq.broker.BrokerController;
2423
import org.apache.rocketmq.common.AbstractBrokerRunnable;
2524
import org.apache.rocketmq.common.ThreadFactoryImpl;
2625
import org.apache.rocketmq.common.UtilAll;
2726
import org.apache.rocketmq.common.constant.LoggerName;
27+
import org.apache.rocketmq.common.future.FutureTaskExt;
28+
import org.apache.rocketmq.common.utils.ThreadUtils;
2829
import org.apache.rocketmq.logging.org.slf4j.Logger;
2930
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3031
import org.apache.rocketmq.remoting.netty.RequestTask;
@@ -43,7 +44,7 @@ public class BrokerFastFailure {
4344

4445
public BrokerFastFailure(final BrokerController brokerController) {
4546
this.brokerController = brokerController;
46-
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
47+
this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
4748
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
4849
brokerController == null ? null : brokerController.getBrokerConfig()));
4950
}

broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java

-57
This file was deleted.

0 commit comments

Comments
 (0)