1
1
/*
2
- * Licensed to Elastic Search and Shay Banon under one
2
+ * Licensed to ElasticSearch and Shay Banon under one
3
3
* or more contributor license agreements. See the NOTICE file
4
4
* distributed with this work for additional information
5
- * regarding copyright ownership. Elastic Search licenses this
5
+ * regarding copyright ownership. ElasticSearch licenses this
6
6
* file to you under the Apache License, Version 2.0 (the
7
7
* "License"); you may not use this file except in compliance
8
8
* with the License. You may obtain a copy of the License at
32
32
import org .elasticsearch .gateway .GatewayException ;
33
33
import org .elasticsearch .threadpool .ThreadPool ;
34
34
35
+ import java .util .concurrent .ExecutorService ;
36
+ import java .util .concurrent .TimeUnit ;
37
+
38
+ import static java .util .concurrent .Executors .*;
39
+ import static org .elasticsearch .common .util .concurrent .EsExecutors .*;
40
+
35
41
/**
36
- * @author kimchy (shay.banon)
42
+ *
37
43
*/
38
44
public abstract class SharedStorageGateway extends AbstractLifecycleComponent <Gateway > implements Gateway , ClusterStateListener {
39
45
40
46
private final ClusterService clusterService ;
41
47
42
48
private final ThreadPool threadPool ;
43
49
50
+ private ExecutorService writeStateExecutor ;
51
+
44
52
public SharedStorageGateway (Settings settings , ThreadPool threadPool , ClusterService clusterService ) {
45
53
super (settings );
46
54
this .threadPool = threadPool ;
47
55
this .clusterService = clusterService ;
48
56
}
49
57
50
- @ Override protected void doStart () throws ElasticSearchException {
58
+ @ Override
59
+ protected void doStart () throws ElasticSearchException {
51
60
clusterService .add (this );
61
+ this .writeStateExecutor = newSingleThreadExecutor (daemonThreadFactory (settings , "gateway#writeMetaData" ));
52
62
}
53
63
54
- @ Override protected void doStop () throws ElasticSearchException {
64
+ @ Override
65
+ protected void doStop () throws ElasticSearchException {
55
66
clusterService .remove (this );
67
+ writeStateExecutor .shutdown ();
68
+ try {
69
+ writeStateExecutor .awaitTermination (10 , TimeUnit .SECONDS );
70
+ } catch (InterruptedException e ) {
71
+ // ignore
72
+ }
56
73
}
57
74
58
- @ Override protected void doClose () throws ElasticSearchException {
75
+ @ Override
76
+ protected void doClose () throws ElasticSearchException {
59
77
}
60
78
61
- @ Override public void performStateRecovery (final GatewayStateRecoveredListener listener ) throws GatewayException {
79
+ @ Override
80
+ public void performStateRecovery (final GatewayStateRecoveredListener listener ) throws GatewayException {
62
81
threadPool .cached ().execute (new Runnable () {
63
- @ Override public void run () {
82
+ @ Override
83
+ public void run () {
64
84
logger .debug ("reading state from gateway {} ..." , this );
65
85
StopWatch stopWatch = new StopWatch ().start ();
66
86
MetaData metaData ;
@@ -81,7 +101,8 @@ public SharedStorageGateway(Settings settings, ThreadPool threadPool, ClusterSer
81
101
});
82
102
}
83
103
84
- @ Override public void clusterChanged (final ClusterChangedEvent event ) {
104
+ @ Override
105
+ public void clusterChanged (final ClusterChangedEvent event ) {
85
106
if (!lifecycle .started ()) {
86
107
return ;
87
108
}
@@ -95,8 +116,9 @@ public SharedStorageGateway(Settings settings, ThreadPool threadPool, ClusterSer
95
116
if (!event .metaDataChanged ()) {
96
117
return ;
97
118
}
98
- threadPool .cached ().execute (new Runnable () {
99
- @ Override public void run () {
119
+ writeStateExecutor .execute (new Runnable () {
120
+ @ Override
121
+ public void run () {
100
122
logger .debug ("writing to gateway {} ..." , this );
101
123
StopWatch stopWatch = new StopWatch ().start ();
102
124
try {
0 commit comments