Skip to content

Commit 3e10010

Browse files
authored
[ISSUE apache#7277] Enhance rocksDBConfigToJson to support metadata counting (apache#7276)
1 parent b4f73e2 commit 3e10010

File tree

6 files changed

+173
-139
lines changed

6 files changed

+173
-139
lines changed

common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,10 @@ public synchronized boolean shutdown() {
385385
this.options.close();
386386
}
387387
//4. close db.
388-
if (db != null) {
388+
if (db != null && !this.readOnly) {
389389
this.db.syncWal();
390+
}
391+
if (db != null) {
390392
this.db.closeE();
391393
}
392394
//5. help gc.

common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public ConfigRocksDBStorage(final String dbPath) {
6060
this.readOnly = false;
6161
}
6262

63+
public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
64+
super();
65+
this.dbPath = dbPath;
66+
this.readOnly = readOnly;
67+
}
68+
6369
private void initOptions() {
6470
this.options = createConfigDBOptions();
6571

tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
8181
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
8282
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
83-
import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
83+
import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
8484
import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
8585
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
8686
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -212,7 +212,6 @@ public static void initCommand() {
212212

213213
initCommand(new ClusterListSubCommand());
214214
initCommand(new TopicListSubCommand());
215-
initCommand(new RocksDBConfigToJsonCommand());
216215

217216
initCommand(new UpdateKvConfigCommand());
218217
initCommand(new DeleteKvConfigCommand());
@@ -257,6 +256,7 @@ public static void initCommand() {
257256
initCommand(new ExportMetadataCommand());
258257
initCommand(new ExportConfigsCommand());
259258
initCommand(new ExportMetricsCommand());
259+
initCommand(new ExportMetadataInRocksDBCommand());
260260

261261
initCommand(new HAStatusSubCommand());
262262

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.tools.command.export;
18+
19+
import com.alibaba.fastjson.JSONObject;
20+
import org.apache.commons.cli.CommandLine;
21+
import org.apache.commons.cli.Option;
22+
import org.apache.commons.cli.Options;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.rocketmq.common.UtilAll;
25+
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
26+
import org.apache.rocketmq.common.utils.DataConverter;
27+
import org.apache.rocketmq.remoting.RPCHook;
28+
import org.apache.rocketmq.tools.command.SubCommand;
29+
import org.apache.rocketmq.tools.command.SubCommandException;
30+
import org.rocksdb.RocksIterator;
31+
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicLong;
35+
import java.util.function.BiConsumer;
36+
37+
public class ExportMetadataInRocksDBCommand implements SubCommand {
38+
private static final String TOPICS_JSON_CONFIG = "topics";
39+
private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
40+
41+
@Override
42+
public String commandName() {
43+
return "exportMetadataInRocksDB";
44+
}
45+
46+
@Override
47+
public String commandDesc() {
48+
return "export RocksDB kv config (topics/subscriptionGroups)";
49+
}
50+
51+
@Override
52+
public Options buildCommandlineOptions(Options options) {
53+
Option pathOption = new Option("p", "path", true,
54+
"Absolute path for the metadata directory");
55+
pathOption.setRequired(true);
56+
options.addOption(pathOption);
57+
58+
Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
59+
"topics/subscriptionGroups");
60+
configTypeOption.setRequired(true);
61+
options.addOption(configTypeOption);
62+
63+
Option jsonEnableOption = new Option("j", "jsonEnable", true,
64+
"Json format enable, Default: false");
65+
jsonEnableOption.setRequired(false);
66+
options.addOption(jsonEnableOption);
67+
68+
return options;
69+
}
70+
71+
@Override
72+
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
73+
String path = commandLine.getOptionValue("path").trim();
74+
if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
75+
System.out.print("RocksDB path is invalid.\n");
76+
return;
77+
}
78+
79+
String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
80+
81+
boolean jsonEnable = false;
82+
if (commandLine.hasOption("jsonEnable")) {
83+
jsonEnable = Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim());
84+
}
85+
86+
87+
ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */);
88+
if (!kvStore.start()) {
89+
System.out.print("RocksDB load error, path=" + path + "\n");
90+
return;
91+
}
92+
93+
try {
94+
if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
95+
handleExportMetadata(kvStore, configType, jsonEnable);
96+
} else {
97+
System.out.printf("Invalid config type=%s, Options: topics,subscriptionGroups\n", configType);
98+
}
99+
} finally {
100+
kvStore.shutdown();
101+
}
102+
}
103+
104+
private static void handleExportMetadata(ConfigRocksDBStorage kvStore, String configType, boolean jsonEnable) {
105+
if (jsonEnable) {
106+
final Map<String, JSONObject> jsonConfig = new HashMap<>();
107+
final Map<String, JSONObject> configTable = new HashMap<>();
108+
iterateKvStore(kvStore, (key, value) -> {
109+
final String configKey = new String(key, DataConverter.charset);
110+
final String configValue = new String(value, DataConverter.charset);
111+
final JSONObject jsonObject = JSONObject.parseObject(configValue);
112+
configTable.put(configKey, jsonObject);
113+
}
114+
);
115+
116+
jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? "topicConfigTable" : "subscriptionGroupTable",
117+
(JSONObject) JSONObject.toJSON(configTable));
118+
final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, true);
119+
System.out.print(jsonConfigStr + "\n");
120+
} else {
121+
AtomicLong count = new AtomicLong(0);
122+
iterateKvStore(kvStore, (key, value) -> {
123+
final String configKey = new String(key, DataConverter.charset);
124+
final String configValue = new String(value, DataConverter.charset);
125+
System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), configKey, configValue);
126+
});
127+
}
128+
}
129+
130+
private static void iterateKvStore(ConfigRocksDBStorage kvStore, BiConsumer<byte[], byte[]> biConsumer) {
131+
try (RocksIterator iterator = kvStore.iterator()) {
132+
iterator.seekToFirst();
133+
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
134+
biConsumer.accept(iterator.key(), iterator.value());
135+
}
136+
}
137+
}
138+
}

tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java

-122
This file was deleted.

tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java renamed to tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java

+24-14
Original file line numberDiff line numberDiff line change
@@ -21,43 +21,53 @@
2121
import org.apache.commons.cli.Options;
2222
import org.apache.rocketmq.srvutil.ServerUtil;
2323
import org.apache.rocketmq.tools.command.SubCommandException;
24+
import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
2425
import org.junit.Test;
2526

2627
import java.io.File;
2728

2829
import static org.assertj.core.api.Assertions.assertThat;
2930

30-
public class KvConfigToJsonCommandTest {
31+
public class ExportMetadataInRocksDBCommandTest {
3132
private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/";
3233

3334
@Test
3435
public void testExecute() throws SubCommandException {
3536
{
36-
String[] cases = new String[]{"topics", "subscriptionGroups"};
37-
for (String c : cases) {
38-
RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
37+
String[][] cases = new String[][] {
38+
{"topics", "false"},
39+
{"topics", "false1"},
40+
{"topics", "true"},
41+
{"subscriptionGroups", "false"},
42+
{"subscriptionGroups", "false2"},
43+
{"subscriptionGroups", "true"}
44+
};
45+
46+
for (String[] c : cases) {
47+
ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand();
3948
Options options = ServerUtil.buildCommandlineOptions(new Options());
40-
String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c};
49+
String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t " + c[0], "-j " + c[1]};
4150
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
42-
cmd.buildCommandlineOptions(options), new DefaultParser());
51+
cmd.buildCommandlineOptions(options), new DefaultParser());
4352
cmd.execute(commandLine, options, null);
44-
assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c);
45-
assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c);
53+
assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]);
54+
assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]);
55+
assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]);
4656
}
4757
}
4858
// invalid cases
4959
{
50-
String[][] cases = new String[][]{
51-
{"-p " + BASE_PATH + "tmpPath", "-t topics"},
52-
{"-p ", "-t topics"},
53-
{"-p " + BASE_PATH + "topics", "-t invalid_type"}
60+
String[][] cases = new String[][] {
61+
{"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"},
62+
{"-p ", "-t topics", "-j true"},
63+
{"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"}
5464
};
5565

5666
for (String[] c : cases) {
57-
RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
67+
ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand();
5868
Options options = ServerUtil.buildCommandlineOptions(new Options());
5969
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c,
60-
cmd.buildCommandlineOptions(options), new DefaultParser());
70+
cmd.buildCommandlineOptions(options), new DefaultParser());
6171
cmd.execute(commandLine, options, null);
6272
}
6373
}

0 commit comments

Comments
 (0)