Skip to content

Commit 0dbd077

Browse files
authored
[ISSUE apache#7326] Split the request to register to the nameserver (apache#7325)
Signed-off-by: Ziy1-Tan <[email protected]>
1 parent dad6b4d commit 0dbd077

File tree

4 files changed

+99
-18
lines changed

4 files changed

+99
-18
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

+23-18
Original file line numberDiff line numberDiff line change
@@ -1765,29 +1765,34 @@ public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConf
17651765
}
17661766

17671767
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
1768+
ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
1769+
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
17681770

1769-
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
1770-
1771-
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
1772-
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
1773-
1774-
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
1775-
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
1776-
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
1777-
1778-
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
1779-
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
1780-
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
1781-
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
1782-
TopicConfig tmp =
1771+
for (TopicConfig topicConfig : topicConfigMap.values()) {
1772+
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
1773+
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
1774+
topicConfigTable.put(topicConfig.getTopicName(),
17831775
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
1784-
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
1785-
topicConfigTable.put(topicConfig.getTopicName(), tmp);
1776+
topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
1777+
} else {
1778+
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
1779+
}
1780+
1781+
if (this.brokerConfig.isEnableSplitRegistration()
1782+
&& topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
1783+
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
1784+
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
1785+
topicConfigTable.clear();
17861786
}
1787-
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
17881787
}
17891788

1790-
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
1789+
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
1790+
.map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
1791+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1792+
1793+
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
1794+
buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
1795+
if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
17911796
this.getBrokerAddr(),
17921797
this.brokerConfig.getBrokerName(),
17931798
this.brokerConfig.getBrokerId(),

broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java

+21
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import com.google.common.collect.ImmutableMap;
3131

32+
import com.google.common.collect.Maps;
3233
import org.apache.commons.lang3.StringUtils;
3334
import org.apache.rocketmq.broker.BrokerController;
3435
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -47,7 +48,9 @@
4748
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4849
import org.apache.rocketmq.remoting.protocol.DataVersion;
4950
import org.apache.rocketmq.remoting.protocol.body.KVTable;
51+
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
5052
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
53+
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
5154

5255
import static com.google.common.base.Preconditions.checkNotNull;
5356

@@ -609,6 +612,24 @@ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
609612
return topicConfigSerializeWrapper;
610613
}
611614

615+
public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) {
616+
return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
617+
}
618+
619+
public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
620+
final ConcurrentMap<String, TopicConfig> topicConfigTable,
621+
final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
622+
) {
623+
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
624+
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
625+
topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
626+
topicConfigWrapper.setDataVersion(this.getDataVersion());
627+
if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) {
628+
this.getDataVersion().nextVersion();
629+
}
630+
return topicConfigWrapper;
631+
}
632+
612633
@Override
613634
public String encode() {
614635
return encode(false);

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

+24
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity {
396396

397397
private boolean enableMixedMessageType = false;
398398

399+
/**
400+
* This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time,
401+
* otherwise there will be a loss of routing
402+
*/
403+
private boolean enableSplitRegistration = false;
404+
405+
private int splitRegistrationSize = 800;
406+
399407
public long getMaxPopPollingSize() {
400408
return maxPopPollingSize;
401409
}
@@ -1731,4 +1739,20 @@ public boolean isEnableMixedMessageType() {
17311739
public void setEnableMixedMessageType(boolean enableMixedMessageType) {
17321740
this.enableMixedMessageType = enableMixedMessageType;
17331741
}
1742+
1743+
public boolean isEnableSplitRegistration() {
1744+
return enableSplitRegistration;
1745+
}
1746+
1747+
public void setEnableSplitRegistration(boolean enableSplitRegistration) {
1748+
this.enableSplitRegistration = enableSplitRegistration;
1749+
}
1750+
1751+
public int getSplitRegistrationSize() {
1752+
return splitRegistrationSize;
1753+
}
1754+
1755+
public void setSplitRegistrationSize(int splitRegistrationSize) {
1756+
this.splitRegistrationSize = splitRegistrationSize;
1757+
}
17341758
}

test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java

+31
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.rocketmq.test.route;
1919

20+
import org.apache.rocketmq.common.TopicConfig;
2021
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
2122
import org.apache.rocketmq.test.base.BaseConf;
2223
import org.apache.rocketmq.test.util.MQAdminTestUtils;
@@ -111,4 +112,34 @@ public void testStaticTopicNotAffected() throws Exception {
111112
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
112113
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
113114
}
115+
116+
@Test
117+
public void testCreateOrUpdateTopic_EnableSplitRegistration() {
118+
brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
119+
brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
120+
brokerController3.getBrokerConfig().setEnableSplitRegistration(true);
121+
122+
String testTopic = "test-topic-";
123+
124+
for (int i = 0; i < 1000; i++) {
125+
TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
126+
brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
127+
brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
128+
brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
129+
}
130+
131+
brokerController1.registerBrokerAll(false, true, true);
132+
brokerController2.registerBrokerAll(false, true, true);
133+
brokerController3.registerBrokerAll(false, true, true);
134+
135+
for (int i = 0; i < 1000; i++) {
136+
TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
137+
assertThat(route.getBrokerDatas()).hasSize(3);
138+
assertThat(route.getQueueDatas()).hasSize(3);
139+
}
140+
141+
brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
142+
brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
143+
brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
144+
}
114145
}

0 commit comments

Comments
 (0)