1313 * See the License for the specific language governing permissions and
1414 * limitations under the License.
1515 */
16- package com .alibaba .dubbo .rpc .cluster .loadbalance ;
17-
18- import java .util .ArrayList ;
16+ package com .alibaba .dubbo .rpc .cluster .loadbalance ;
17+
18+ import java .util .LinkedHashMap ;
1919import java .util .List ;
20+ import java .util .Map ;
2021import java .util .concurrent .ConcurrentHashMap ;
2122import java .util .concurrent .ConcurrentMap ;
2223
2324import com .alibaba .dubbo .common .URL ;
2425import com .alibaba .dubbo .common .utils .AtomicPositiveInteger ;
2526import com .alibaba .dubbo .rpc .Invocation ;
2627import com .alibaba .dubbo .rpc .Invoker ;
27-
28- /**
29- * Round robin load balance.
30- *
31- * @author qian.lei
32- * @author william.liangf
33- */
28+
29+ /**
30+ * Round robin load balance.
31+ *
32+ * @author qian.lei
33+ * @author william.liangf
34+ */
3435public class RoundRobinLoadBalance extends AbstractLoadBalance {
35-
36- public static final String NAME = "roundrobin" ;
37-
38- private final ConcurrentMap <String , AtomicPositiveInteger > sequences = new ConcurrentHashMap <String , AtomicPositiveInteger >();
3936
40- private final ConcurrentMap <String , AtomicPositiveInteger > weightSequences = new ConcurrentHashMap <String , AtomicPositiveInteger >();
41-
42- protected <T > Invoker <T > doSelect (List <Invoker <T >> invokers , URL url , Invocation invocation ) {
43- String key = invokers .get (0 ).getUrl ().getServiceKey () + "." + invocation .getMethodName ();
44- int length = invokers .size (); // 总个数
45- int maxWeight = 0 ; // 最大权重
46- int minWeight = Integer .MAX_VALUE ; // 最小权重
47- for (int i = 0 ; i < length ; i ++) {
48- int weight = getWeight (invokers .get (i ), invocation );
49- maxWeight = Math .max (maxWeight , weight ); // 累计最大权重
50- minWeight = Math .min (minWeight , weight ); // 累计最小权重
51- }
52- if (maxWeight > 0 && minWeight < maxWeight ) { // 权重不一样
53- AtomicPositiveInteger weightSequence = weightSequences .get (key );
54- if (weightSequence == null ) {
55- weightSequences .putIfAbsent (key , new AtomicPositiveInteger ());
56- weightSequence = weightSequences .get (key );
57- }
58- int currentWeight = weightSequence .getAndIncrement () % maxWeight ;
59- List <Invoker <T >> weightInvokers = new ArrayList <Invoker <T >>();
60- for (Invoker <T > invoker : invokers ) { // 筛选权重大于当前权重基数的Invoker
61- if (getWeight (invoker , invocation ) > currentWeight ) {
62- weightInvokers .add (invoker );
63- }
64- }
65- int weightLength = weightInvokers .size ();
66- if (weightLength == 1 ) {
67- return weightInvokers .get (0 );
68- } else if (weightLength > 1 ) {
69- invokers = weightInvokers ;
70- length = invokers .size ();
71- }
72- }
73- AtomicPositiveInteger sequence = sequences .get (key );
74- if (sequence == null ) {
75- sequences .putIfAbsent (key , new AtomicPositiveInteger ());
76- sequence = sequences .get (key );
77- }
78- // 取模轮循
79- return invokers .get (sequence .getAndIncrement () % length );
80- }
81-
37+ public static final String NAME = "roundrobin" ;
38+
39+ private final ConcurrentMap <String , AtomicPositiveInteger > sequences = new ConcurrentHashMap <String , AtomicPositiveInteger >();
40+
41+ private static final class IntegerWrapper {
42+ public IntegerWrapper (int value ) {
43+ this .value = value ;
44+ }
45+
46+ private int value ;
47+
48+ public int getValue () {
49+ return value ;
50+ }
51+
52+ public void setValue (int value ) {
53+ this .value = value ;
54+ }
55+
56+ public void decrement () {
57+ this .value --;
58+ }
59+ }
60+
61+ protected <T > Invoker <T > doSelect (List <Invoker <T >> invokers , URL url , Invocation invocation ) {
62+ String key = invokers .get (0 ).getUrl ().getServiceKey () + "." + invocation .getMethodName ();
63+ int length = invokers .size (); // 总个数
64+ int maxWeight = 0 ; // 最大权重
65+ int minWeight = Integer .MAX_VALUE ; // 最小权重
66+ final LinkedHashMap <Invoker <T >, IntegerWrapper > invokerToWeightMap = new LinkedHashMap <Invoker <T >, IntegerWrapper >();
67+ int weightSum = 0 ;
68+ for (int i = 0 ; i < length ; i ++) {
69+ int weight = getWeight (invokers .get (i ), invocation );
70+ maxWeight = Math .max (maxWeight , weight ); // 累计最大权重
71+ minWeight = Math .min (minWeight , weight ); // 累计最小权重
72+ if (weight > 0 ) {
73+ invokerToWeightMap .put (invokers .get (i ), new IntegerWrapper (weight ));
74+ weightSum += weight ;
75+ }
76+ }
77+ AtomicPositiveInteger sequence = sequences .get (key );
78+ if (sequence == null ) {
79+ sequences .putIfAbsent (key , new AtomicPositiveInteger ());
80+ sequence = sequences .get (key );
81+ }
82+ int currentSequence = sequence .getAndIncrement ();
83+ if (maxWeight > 0 && minWeight < maxWeight ) { // 权重不一样
84+ int mod = currentSequence % weightSum ;
85+ for (int i = 0 ; i < maxWeight ; i ++) {
86+ for (Map .Entry <Invoker <T >, IntegerWrapper > each : invokerToWeightMap .entrySet ()) {
87+ final Invoker <T > k = each .getKey ();
88+ final IntegerWrapper v = each .getValue ();
89+ if (mod == 0 && v .getValue () > 0 ) {
90+ return k ;
91+ }
92+ if (v .getValue () > 0 ) {
93+ v .decrement ();
94+ mod --;
95+ }
96+ }
97+ }
98+ }
99+ // 取模轮循
100+ return invokers .get (currentSequence % length );
101+ }
102+
82103}
0 commit comments