Skip to content

Commit b4b33bb

Browse files
ubonesskimchy
authored andcommitted
Local node master listener
* Fixed an issue where dynamic update to minimum_master_nodes settings would not take immediate effect * Added LocalNodeMasterListener support to the ClusterService. Enables listening to when the local node becomes/stopped being a master
1 parent 36ff6c9 commit b4b33bb

File tree

6 files changed

+393
-21
lines changed

6 files changed

+393
-21
lines changed

src/main/java/org/elasticsearch/cluster/ClusterService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
7272
*/
7373
void remove(ClusterStateListener listener);
7474

75+
/**
76+
* Add a listener for on/off local node master events
77+
*/
78+
void add(LocalNodeMasterListener listener);
79+
80+
/**
81+
* Remove the given listener for on/off local master events
82+
*/
83+
void remove(LocalNodeMasterListener listener);
84+
7585
/**
7686
* Adds a cluster state listener that will timeout after the provided timeout.
7787
*/
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. ElasticSearch licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster;
21+
22+
/**
23+
* Enables listening to master changes events of the local node (when the local node becomes the master, and when the local
24+
* node cease being a master).
25+
*/
26+
public interface LocalNodeMasterListener {
27+
28+
/**
29+
* Called when local node is elected to be the master
30+
*/
31+
void onMaster();
32+
33+
/**
34+
* Called when the local node used to be the master, a new master was elected and it's no longer the local node.
35+
*/
36+
void offMaster();
37+
38+
/**
39+
* The name of the executor that the implementation of the callbacks of this lister should be executed on. The thread
40+
* that is responsible for managing instances of this lister is the same thread handling the cluster state events. If
41+
* the work done is the callbacks above is inexpensive, this value may be {@link org.elasticsearch.threadpool.ThreadPool.Names#SAME SAME}
42+
* (indicating that the callbaks will run on the same thread as the cluster state events are fired with). On the other hand,
43+
* if the logic in the callbacks are heavier and take longer to process (or perhaps involve blocking due to IO operations),
44+
* prefer to execute them on a separte more appropriate executor (eg. {@link org.elasticsearch.threadpool.ThreadPool.Names#GENERIC GENERIC}
45+
* or {@link org.elasticsearch.threadpool.ThreadPool.Names#MANAGEMENT MANAGEMENT}).
46+
*
47+
* @return The name of the executor that will run the callbacks of this listener.
48+
*/
49+
String executorName();
50+
51+
}
52+

src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

Lines changed: 87 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@
4343
import java.util.Iterator;
4444
import java.util.List;
4545
import java.util.Queue;
46-
import java.util.concurrent.CopyOnWriteArrayList;
47-
import java.util.concurrent.ExecutorService;
48-
import java.util.concurrent.ScheduledFuture;
49-
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.*;
5047

5148
import static java.util.concurrent.Executors.newSingleThreadExecutor;
5249
import static org.elasticsearch.cluster.ClusterState.Builder;
@@ -75,6 +72,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
7572
private final List<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
7673
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
7774
private final List<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
75+
private final LocalNodeMasterListeners localNodeMasterListeners;
7876

7977
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
8078

@@ -97,6 +95,8 @@ public InternalClusterService(Settings settings, DiscoveryService discoveryServi
9795
this.nodeSettingsService.setClusterService(this);
9896

9997
this.reconnectInterval = componentSettings.getAsTime("reconnect_interval", TimeValue.timeValueSeconds(10));
98+
99+
localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
100100
}
101101

102102
public NodeSettingsService settingsService() {
@@ -112,6 +112,7 @@ public void addInitialStateBlock(ClusterBlock block) throws ElasticSearchIllegal
112112

113113
@Override
114114
protected void doStart() throws ElasticSearchException {
115+
add(localNodeMasterListeners);
115116
this.clusterState = newClusterStateBuilder().blocks(initialBlocks).build();
116117
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
117118
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
@@ -130,6 +131,7 @@ protected void doStop() throws ElasticSearchException {
130131
} catch (InterruptedException e) {
131132
// ignore
132133
}
134+
remove(localNodeMasterListeners);
133135
}
134136

135137
@Override
@@ -175,6 +177,16 @@ public void remove(ClusterStateListener listener) {
175177
}
176178
}
177179

180+
@Override
181+
public void add(LocalNodeMasterListener listener) {
182+
localNodeMasterListeners.add(listener);
183+
}
184+
185+
@Override
186+
public void remove(LocalNodeMasterListener listener) {
187+
localNodeMasterListeners.remove(listener);
188+
}
189+
178190
public void add(TimeValue timeout, final TimeoutClusterStateListener listener) {
179191
if (lifecycle.stoppedOrClosed()) {
180192
listener.onClose();
@@ -398,4 +410,75 @@ public void run() {
398410
private boolean nodeRequiresConnection(DiscoveryNode node) {
399411
return localNode().shouldConnectTo(node);
400412
}
413+
414+
private static class LocalNodeMasterListeners implements ClusterStateListener {
415+
416+
private final List<LocalNodeMasterListener> listeners = new CopyOnWriteArrayList<LocalNodeMasterListener>();
417+
private final ThreadPool threadPool;
418+
private volatile boolean master = false;
419+
420+
private LocalNodeMasterListeners(ThreadPool threadPool) {
421+
this.threadPool = threadPool;
422+
}
423+
424+
@Override
425+
public void clusterChanged(ClusterChangedEvent event) {
426+
if (!master && event.localNodeMaster()) {
427+
master = true;
428+
for (LocalNodeMasterListener listener : listeners) {
429+
Executor executor = threadPool.executor(listener.executorName());
430+
executor.execute(new OnMasterRunnable(listener));
431+
}
432+
return;
433+
}
434+
435+
if (master && !event.localNodeMaster()) {
436+
master = false;
437+
for (LocalNodeMasterListener listener : listeners) {
438+
Executor executor = threadPool.executor(listener.executorName());
439+
executor.execute(new OffMasterRunnable(listener));
440+
}
441+
}
442+
}
443+
444+
private void add(LocalNodeMasterListener listener) {
445+
listeners.add(listener);
446+
}
447+
448+
private void remove(LocalNodeMasterListener listener) {
449+
listeners.remove(listener);
450+
}
451+
452+
private void clear() {
453+
listeners.clear();
454+
}
455+
}
456+
457+
private static class OnMasterRunnable implements Runnable {
458+
459+
private final LocalNodeMasterListener listener;
460+
461+
private OnMasterRunnable(LocalNodeMasterListener listener) {
462+
this.listener = listener;
463+
}
464+
465+
@Override
466+
public void run() {
467+
listener.onMaster();
468+
}
469+
}
470+
471+
private static class OffMasterRunnable implements Runnable {
472+
473+
private final LocalNodeMasterListener listener;
474+
475+
private OffMasterRunnable(LocalNodeMasterListener listener) {
476+
this.listener = listener;
477+
}
478+
479+
@Override
480+
public void run() {
481+
listener.offMaster();
482+
}
483+
}
401484
}

src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
142142

143143
logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
144144

145-
this.electMaster = new ElectMasterService(settings, nodeSettingsService);
145+
this.electMaster = new ElectMasterService(settings);
146+
nodeSettingsService.addListener(new ApplySettings());
146147

147148
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
148149
this.masterFD.addListener(new MasterNodeFailureListener());
@@ -410,6 +411,34 @@ public void clusterStateProcessed(ClusterState clusterState) {
410411
});
411412
}
412413

414+
private void handleMinimumMasterNodesChanged(final int minimumMasterNodes) {
415+
if (lifecycleState() != Lifecycle.State.STARTED) {
416+
// not started, ignore a node failure
417+
return;
418+
}
419+
if (!master) {
420+
// nothing to do here...
421+
return;
422+
}
423+
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ProcessedClusterStateUpdateTask() {
424+
@Override
425+
public ClusterState execute(ClusterState currentState) {
426+
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
427+
ZenDiscovery.this.electMaster.minimumMasterNodes(minimumMasterNodes);
428+
// check if we have enough master nodes, if not, we need to move into joining the cluster again
429+
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
430+
return rejoin(currentState, "not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + minimumMasterNodes + "]");
431+
}
432+
return currentState;
433+
}
434+
435+
@Override
436+
public void clusterStateProcessed(ClusterState clusterState) {
437+
sendInitialStateEventIfNeeded();
438+
}
439+
});
440+
}
441+
413442
private void handleMasterGone(final DiscoveryNode masterNode, final String reason) {
414443
if (lifecycleState() != Lifecycle.State.STARTED) {
415444
// not started, ignore a master failure
@@ -754,12 +783,12 @@ static class RejoinClusterRequest implements Streamable {
754783

755784
@Override
756785
public void readFrom(StreamInput in) throws IOException {
757-
fromNodeId = in.readOptionalUTF();
786+
fromNodeId = in.readOptionalString();
758787
}
759788

760789
@Override
761790
public void writeTo(StreamOutput out) throws IOException {
762-
out.writeOptionalUTF(fromNodeId);
791+
out.writeOptionalString(fromNodeId);
763792
}
764793
}
765794

@@ -793,4 +822,15 @@ public String executor() {
793822
}
794823
}
795824

825+
class ApplySettings implements NodeSettingsService.Listener {
826+
@Override
827+
public void onRefreshSettings(Settings settings) {
828+
int minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", ZenDiscovery.this.electMaster.minimumMasterNodes());
829+
if (minimumMasterNodes != ZenDiscovery.this.electMaster.minimumMasterNodes()) {
830+
logger.info("updating discovery.zen.minimum_master_nodes from [{}] to [{}]", ZenDiscovery.this.electMaster.minimumMasterNodes(), minimumMasterNodes);
831+
handleMinimumMasterNodesChanged(minimumMasterNodes);
832+
}
833+
}
834+
}
835+
796836
}

src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
2525
import org.elasticsearch.common.component.AbstractComponent;
2626
import org.elasticsearch.common.settings.Settings;
27-
import org.elasticsearch.node.settings.NodeSettingsService;
2827

2928
import java.util.Collections;
3029
import java.util.Comparator;
@@ -44,11 +43,18 @@ public class ElectMasterService extends AbstractComponent {
4443

4544
private volatile int minimumMasterNodes;
4645

47-
public ElectMasterService(Settings settings, NodeSettingsService nodeSettingsService) {
46+
public ElectMasterService(Settings settings) {
4847
super(settings);
4948
this.minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
5049
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
51-
nodeSettingsService.addListener(new ApplySettings());
50+
}
51+
52+
public void minimumMasterNodes(int minimumMasterNodes) {
53+
this.minimumMasterNodes = minimumMasterNodes;
54+
}
55+
56+
public int minimumMasterNodes() {
57+
return minimumMasterNodes;
5258
}
5359

5460
public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
@@ -111,17 +117,6 @@ private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
111117
return possibleNodes;
112118
}
113119

114-
class ApplySettings implements NodeSettingsService.Listener {
115-
@Override
116-
public void onRefreshSettings(Settings settings) {
117-
int minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", ElectMasterService.this.minimumMasterNodes);
118-
if (minimumMasterNodes != ElectMasterService.this.minimumMasterNodes) {
119-
logger.info("updating [discovery.zen.minimum_master_nodes] from [{}] to [{}]", ElectMasterService.this.minimumMasterNodes, minimumMasterNodes);
120-
ElectMasterService.this.minimumMasterNodes = minimumMasterNodes;
121-
}
122-
}
123-
}
124-
125120
private static class NodeComparator implements Comparator<DiscoveryNode> {
126121

127122
@Override

0 commit comments

Comments
 (0)