File tree Expand file tree Collapse file tree 2 files changed +14
-4
lines changed
rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer Expand file tree Collapse file tree 2 files changed +14
-4
lines changed Original file line number Diff line number Diff line change 56
56
import com .alibaba .rocketmq .common .protocol .heartbeat .MessageModel ;
57
57
import com .alibaba .rocketmq .common .protocol .heartbeat .SubscriptionData ;
58
58
import com .alibaba .rocketmq .common .sysflag .PullSysFlag ;
59
+ import com .alibaba .rocketmq .remoting .RPCHook ;
59
60
import com .alibaba .rocketmq .remoting .exception .RemotingException ;
60
61
61
62
@@ -79,9 +80,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
79
80
// Consumer启动时间
80
81
private final long consumerStartTimestamp = System .currentTimeMillis ();
81
82
83
+ private final RPCHook rpcHook ;
82
84
83
- public DefaultMQPullConsumerImpl (final DefaultMQPullConsumer defaultMQPullConsumer ) {
85
+
86
+ public DefaultMQPullConsumerImpl (final DefaultMQPullConsumer defaultMQPullConsumer , final RPCHook rpcHook ) {
84
87
this .defaultMQPullConsumer = defaultMQPullConsumer ;
88
+ this .rpcHook = rpcHook ;
85
89
}
86
90
87
91
@@ -497,7 +501,8 @@ public void start() throws MQClientException {
497
501
}
498
502
499
503
this .mQClientFactory =
500
- MQClientManager .getInstance ().getAndCreateMQClientInstance (this .defaultMQPullConsumer );
504
+ MQClientManager .getInstance ().getAndCreateMQClientInstance (this .defaultMQPullConsumer ,
505
+ this .rpcHook );
501
506
502
507
// 初始化Rebalance变量
503
508
this .rebalanceImpl .setConsumerGroup (this .defaultMQPullConsumer .getConsumerGroup ());
Original file line number Diff line number Diff line change 68
68
import com .alibaba .rocketmq .common .protocol .heartbeat .MessageModel ;
69
69
import com .alibaba .rocketmq .common .protocol .heartbeat .SubscriptionData ;
70
70
import com .alibaba .rocketmq .common .sysflag .PullSysFlag ;
71
+ import com .alibaba .rocketmq .remoting .RPCHook ;
71
72
import com .alibaba .rocketmq .remoting .exception .RemotingException ;
72
73
73
74
@@ -123,9 +124,12 @@ public void registerFilterMessageHook(final FilterMessageHook hook) {
123
124
*/
124
125
private final ArrayList <ConsumeMessageHook > consumeMessageHookList = new ArrayList <ConsumeMessageHook >();
125
126
127
+ private final RPCHook rpcHook ;
126
128
127
- public DefaultMQPushConsumerImpl (DefaultMQPushConsumer defaultMQPushConsumer ) {
129
+
130
+ public DefaultMQPushConsumerImpl (DefaultMQPushConsumer defaultMQPushConsumer , RPCHook rpcHook ) {
128
131
this .defaultMQPushConsumer = defaultMQPushConsumer ;
132
+ this .rpcHook = rpcHook ;
129
133
}
130
134
131
135
@@ -658,7 +662,8 @@ public void start() throws MQClientException {
658
662
}
659
663
660
664
this .mQClientFactory =
661
- MQClientManager .getInstance ().getAndCreateMQClientInstance (this .defaultMQPushConsumer );
665
+ MQClientManager .getInstance ().getAndCreateMQClientInstance (this .defaultMQPushConsumer ,
666
+ this .rpcHook );
662
667
663
668
// 初始化Rebalance变量
664
669
this .rebalanceImpl .setConsumerGroup (this .defaultMQPushConsumer .getConsumerGroup ());
You can’t perform that action at this time.
0 commit comments