Skip to content

Commit df8ca5d

Browse files
committed
Merge pull request MyCATApache#1 from MyCATApache/master
2
2 parents 0fa1fae + cde521e commit df8ca5d

File tree

16 files changed

+1043
-158
lines changed

16 files changed

+1043
-158
lines changed

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,14 @@
168168
<artifactId>log4j-core</artifactId>
169169
<version>2.3</version>
170170
</dependency>
171+
172+
<!-- joda日期处理工具 -->
173+
<dependency>
174+
<groupId>joda-time</groupId>
175+
<artifactId>joda-time</artifactId>
176+
<version>2.8.2</version>
177+
</dependency>
178+
171179
</dependencies>
172180

173181

src/main/java/demo/ZkCreate.java

Lines changed: 0 additions & 142 deletions
This file was deleted.

src/main/java/io/mycat/net/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ public void onReadData(int got) throws IOException {
233233
// handle this package
234234
readBuffer.position(offset);
235235
handle(readBuffer, offset, length);
236+
237+
// maybe handle stmt_close
238+
if(isClosed()) {
239+
return ;
240+
}
241+
236242
// offset to next position
237243
offset += length;
238244
// reached end

src/main/java/io/mycat/server/FrontendPrepareHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public interface FrontendPrepareHandler {
3434

3535
void execute(byte[] data);
3636

37-
void close();
37+
void close(byte[] data);
3838

39+
void clear();
3940
}

src/main/java/io/mycat/server/MySQLFrontConnection.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.mycat.server.sqlhandler.SavepointHandler;
1515
import io.mycat.server.sqlhandler.SelectHandler;
1616
import io.mycat.server.sqlhandler.ServerLoadDataInfileHandler;
17+
import io.mycat.server.sqlhandler.ServerPrepareHandler;
1718
import io.mycat.server.sqlhandler.SetHandler;
1819
import io.mycat.server.sqlhandler.ShowHandler;
1920
import io.mycat.server.sqlhandler.StartHandler;
@@ -79,7 +80,7 @@ public MySQLFrontConnection(SocketChannel channel) throws IOException {
7980
this.port = localAddr.getPort();
8081
this.localPort = remoteAddr.getPort();
8182
loadDataInfileHandler = new ServerLoadDataInfileHandler(this);
82-
83+
prepareHandler = new ServerPrepareHandler(this);
8384
}
8485

8586
public void sendAuthPackge() throws IOException {
@@ -235,7 +236,12 @@ public void query(byte[] data) {
235236
"Unknown charset '" + charset + "'");
236237
return;
237238
}
239+
240+
query(sql);
238241

242+
}
243+
244+
public void query(String sql) {
239245
if (sql == null || sql.length() == 0) {
240246
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
241247
return;
@@ -415,7 +421,7 @@ public void stmtExecute(byte[] data) {
415421

416422
public void stmtClose(byte[] data) {
417423
if (prepareHandler != null) {
418-
prepareHandler.close();
424+
prepareHandler.close(data);
419425
} else {
420426
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
421427
"Prepare unsupported!");
@@ -508,11 +514,18 @@ public void close(String reason) {
508514
if (getLoadDataInfileHandler() != null) {
509515
getLoadDataInfileHandler().clear();
510516
}
517+
if(getPrepareHandler() != null) {
518+
getPrepareHandler().clear();
519+
}
511520
}
512521

513522
public LoadDataInfileHandler getLoadDataInfileHandler() {
514523
return loadDataInfileHandler;
515524
}
525+
526+
public FrontendPrepareHandler getPrepareHandler() {
527+
return prepareHandler;
528+
}
516529

517530
public void ping() {
518531
write(OkPacket.OK);

src/main/java/io/mycat/server/NonBlockingSession.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class NonBlockingSession{
6161
private final MultiNodeCoordinator multiNodeCoordinator;
6262
private final CommitNodeHandler commitHandler;
6363
private volatile String xaTXID;
64+
65+
private boolean prepared;
6466

6567
public NonBlockingSession(MySQLFrontConnection source) {
6668
this.source = source;
@@ -117,6 +119,9 @@ public void execute(RouteResultset rrs, int type) {
117119

118120
if (nodes.length == 1) {
119121
singleNodeHandler = new SingleNodeHandler(rrs, this);
122+
if(this.isPrepared()) {
123+
singleNodeHandler.setPrepared(true);
124+
}
120125
try {
121126
singleNodeHandler.execute();
122127
} catch (Exception e) {
@@ -125,18 +130,25 @@ public void execute(RouteResultset rrs, int type) {
125130
}
126131
} else {
127132
boolean autocommit = source.isAutocommit();
128-
SystemConfig sysConfig = MycatServer.getInstance().getConfig()
129-
.getSystem();
133+
// SystemConfig sysConfig = MycatServer.getInstance().getConfig()
134+
// .getSystem();
130135
multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
131136
this);
132-
137+
if(this.isPrepared()) {
138+
multiNodeHandler.setPrepared(true);
139+
}
133140
try {
134141
multiNodeHandler.execute();
135142
} catch (Exception e) {
136143
LOGGER.warn("{} {}", source, rrs, e);
137144
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
138145
}
139146
}
147+
148+
if(this.isPrepared()) {
149+
this.setPrepared(false);
150+
}
151+
140152
}
141153

142154
public void commit() {
@@ -374,4 +386,14 @@ public String getXaTXID() {
374386
return xaTXID;
375387
}
376388

389+
390+
public boolean isPrepared() {
391+
return prepared;
392+
}
393+
394+
395+
public void setPrepared(boolean prepared) {
396+
this.prepared = prepared;
397+
}
398+
377399
}

src/main/java/io/mycat/server/executors/MultiNodeQueryHandler.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.mycat.route.RouteResultsetNode;
3535
import io.mycat.server.MySQLFrontConnection;
3636
import io.mycat.server.NonBlockingSession;
37+
import io.mycat.server.packet.BinaryRowDataPacket;
3738
import io.mycat.server.config.node.MycatConfig;
3839
import io.mycat.server.packet.FieldPacket;
3940
import io.mycat.server.packet.OkPacket;
@@ -73,6 +74,8 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements
7374
private volatile boolean fieldsReturned;
7475
private int okCount;
7576
private final boolean isCallProcedure;
77+
private boolean prepared;
78+
private List<FieldPacket> fieldPackets = new ArrayList<FieldPacket>();
7679

7780
public MultiNodeQueryHandler(int sqlType, RouteResultset rrs,
7881
boolean autocommit, NonBlockingSession session) {
@@ -329,8 +332,15 @@ public void outputMergeResult(final MySQLFrontConnection source,
329332
// }
330333
for (int i = start; i < end; i++) {
331334
RowDataPacket row = results.get(i);
332-
row.packetId = ++packetId;
333-
row.write(bufferArray);
335+
if(prepared) {
336+
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
337+
binRowDataPk.read(fieldPackets, row);
338+
binRowDataPk.packetId = ++packetId;
339+
binRowDataPk.write(bufferArray);
340+
} else {
341+
row.packetId = ++packetId;
342+
row.write(bufferArray);
343+
}
334344
}
335345

336346
eof[3] = ++packetId;
@@ -416,6 +426,7 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields,
416426
if (needMerg) {
417427
FieldPacket fieldPkg = new FieldPacket();
418428
fieldPkg.read(field);
429+
fieldPackets.add(fieldPkg);
419430
String fieldName = new String(fieldPkg.name).toUpperCase();
420431
if (columToIndx != null
421432
&& !columToIndx.containsKey(fieldName)) {
@@ -429,7 +440,6 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields,
429440
fieldPkg.packetId = ++packetId;
430441
shouldSkip = true;
431442
fieldPkg.write(bufferArray);
432-
433443
}
434444

435445
columToIndx.put(fieldName,
@@ -439,6 +449,7 @@ public void fieldEofResponse(byte[] header, List<byte[]> fields,
439449
// find primary key index
440450
FieldPacket fieldPkg = new FieldPacket();
441451
fieldPkg.read(field);
452+
fieldPackets.add(fieldPkg);
442453
String fieldName = new String(fieldPkg.name);
443454
if (primaryKey.equalsIgnoreCase(fieldName)) {
444455
primaryKeyIndex = i;
@@ -525,4 +536,12 @@ public void requestDataResponse(byte[] data, BackendConnection conn) {
525536
(MySQLBackendConnection) conn);
526537
}
527538

539+
public boolean isPrepared() {
540+
return prepared;
541+
}
542+
543+
public void setPrepared(boolean prepared) {
544+
this.prepared = prepared;
545+
}
546+
528547
}

0 commit comments

Comments
 (0)