Skip to content

Commit cbdfe4c

Browse files
committed
Close alibaba#75
添加独立线程解决update和commit/rollback互相争夺线程池资源以及数据库锁资源问题
1 parent 03c2d08 commit cbdfe4c

File tree

7 files changed

+30
-11
lines changed

7 files changed

+30
-11
lines changed

server/src/main/config/com/alibaba/cobar/config/model/SystemConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public final class SystemConfig {
4646
private int processors;
4747
private int processorHandler;
4848
private int processorExecutor;
49+
private int processorCommitter;
4950
private int initExecutor;
5051
private int timerExecutor;
5152
private int managerExecutor;
@@ -69,6 +70,7 @@ public SystemConfig() {
6970
this.processors = DEFAULT_PROCESSORS;
7071
this.processorHandler = DEFAULT_PROCESSORS;
7172
this.processorExecutor = DEFAULT_PROCESSORS;
73+
this.processorCommitter = DEFAULT_PROCESSORS;
7274
this.managerExecutor = DEFAULT_PROCESSORS;
7375
this.timerExecutor = DEFAULT_PROCESSORS;
7476
this.initExecutor = DEFAULT_PROCESSORS;
@@ -133,8 +135,16 @@ public int getProcessorExecutor() {
133135
public void setProcessorExecutor(int processorExecutor) {
134136
this.processorExecutor = processorExecutor;
135137
}
138+
139+
public int getProcessorCommitter() {
140+
return processorCommitter;
141+
}
136142

137-
public int getManagerExecutor() {
143+
public void setProcessorCommitter(int processorCommitter) {
144+
this.processorCommitter = processorCommitter;
145+
}
146+
147+
public int getManagerExecutor() {
138148
return managerExecutor;
139149
}
140150

server/src/main/net/com/alibaba/cobar/net/NIOProcessor.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,28 @@ public final class NIOProcessor {
3939
private final BufferPool bufferPool;
4040
private final NameableExecutor handler;
4141
private final NameableExecutor executor;
42+
private final NameableExecutor committer;
4243
private final ConcurrentMap<Long, FrontendConnection> frontends;
4344
private final ConcurrentMap<Long, BackendConnection> backends;
4445
private final CommandCount commands;
4546
private long netInBytes;
4647
private long netOutBytes;
4748

4849
public NIOProcessor(String name) throws IOException {
49-
this(name, DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_CHUNK_SIZE, AVAILABLE_PROCESSORS, AVAILABLE_PROCESSORS);
50+
this(name, DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_CHUNK_SIZE, AVAILABLE_PROCESSORS, AVAILABLE_PROCESSORS, AVAILABLE_PROCESSORS);
5051
}
5152

52-
public NIOProcessor(String name, int handler, int executor) throws IOException {
53-
this(name, DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_CHUNK_SIZE, handler, executor);
53+
public NIOProcessor(String name, int handler, int executor, int committer) throws IOException {
54+
this(name, DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_CHUNK_SIZE, handler, executor, committer);
5455
}
5556

56-
public NIOProcessor(String name, int buffer, int chunk, int handler, int executor) throws IOException {
57+
public NIOProcessor(String name, int buffer, int chunk, int handler, int executor, int committer) throws IOException {
5758
this.name = name;
5859
this.reactor = new NIOReactor(name);
5960
this.bufferPool = new BufferPool(buffer, chunk);
6061
this.handler = (handler > 0) ? ExecutorUtil.create(name + "-H", handler) : null;
6162
this.executor = (executor > 0) ? ExecutorUtil.create(name + "-E", executor) : null;
63+
this.committer = (committer > 0) ? ExecutorUtil.create(name + "-C", committer) : null;
6264
this.frontends = new ConcurrentHashMap<Long, FrontendConnection>();
6365
this.backends = new ConcurrentHashMap<Long, BackendConnection>();
6466
this.commands = new CommandCount();
@@ -87,8 +89,12 @@ public NameableExecutor getHandler() {
8789
public NameableExecutor getExecutor() {
8890
return executor;
8991
}
92+
93+
public NameableExecutor getCommitter() {
94+
return committer;
95+
}
9096

91-
public void startup() {
97+
public void startup() {
9298
reactor.startup();
9399
}
94100

server/src/main/server/com/alibaba/cobar/CobarServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,10 @@ public void startup() throws IOException {
104104
LOGGER.info("Startup processors ...");
105105
int handler = system.getProcessorHandler();
106106
int executor = system.getProcessorExecutor();
107+
int committer = system.getProcessorCommitter();
107108
processors = new NIOProcessor[system.getProcessors()];
108109
for (int i = 0; i < processors.length; i++) {
109-
processors[i] = new NIOProcessor("Processor" + i, handler, executor);
110+
processors[i] = new NIOProcessor("Processor" + i, handler, executor, committer);
110111
processors[i].startup();
111112
}
112113
timer.schedule(processorCheck(), 0L, system.getProcessorCheckPeriod());

server/src/main/server/com/alibaba/cobar/manager/response/ShowThreadPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ private static List<NameableExecutor> getExecutors() {
125125
for (NIOProcessor p : server.getProcessors()) {
126126
list.add(p.getHandler());
127127
list.add(p.getExecutor());
128+
list.add(p.getCommitter());
128129
}
129130
return list;
130131
}

server/src/main/server/com/alibaba/cobar/mysql/bio/executor/DefaultCommitExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void commit(final OkPacket packet, final BlockingSession session, final i
121121

122122
// 执行
123123
final ConcurrentMap<RouteResultsetNode, Channel> target = session.getTarget();
124-
Executor executor = session.getSource().getProcessor().getExecutor();
124+
Executor committer = session.getSource().getProcessor().getCommitter();
125125
int started = 0;
126126
for (RouteResultsetNode rrn : target.keySet()) {
127127
if (rrn == null) {
@@ -136,7 +136,7 @@ public void commit(final OkPacket packet, final BlockingSession session, final i
136136
final MySQLChannel mc = (MySQLChannel) target.get(rrn);
137137
if (mc != null) {
138138
mc.setRunning(true);
139-
executor.execute(new Runnable() {
139+
committer.execute(new Runnable() {
140140
@Override
141141
public void run() {
142142
_commit(mc, session);

server/src/main/server/com/alibaba/cobar/mysql/bio/executor/RollbackExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ public void rollback(final BlockingSession session) {
118118
}
119119

120120
// 执行
121-
Executor exec = source.getProcessor().getExecutor();
121+
Executor committer = source.getProcessor().getCommitter();
122122

123123
int started = 0;
124124
for (RouteResultsetNode rrn : target.keySet()) {
125125
final MySQLChannel mc = (MySQLChannel) target.get(rrn);
126126
if (mc != null) {
127127
mc.setRunning(true);
128-
exec.execute(new Runnable() {
128+
committer.execute(new Runnable() {
129129
@Override
130130
public void run() {
131131
_rollback(mc, session);

server/src/test/resources/server.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<property name="processors">4</property>
2929
<property name="processorHandler">8</property>
3030
<property name="processorExecutor">8</property>
31+
<property name="processorCommitter">8</property>
3132
<property name="clusterHeartbeatUser">_HEARTBEAT_USER_</property>
3233
<property name="clusterHeartbeatPass">_HEARTBEAT_PASS_</property>
3334
</system>

0 commit comments

Comments
 (0)