38
38
import org .elasticsearch .common .inject .Inject ;
39
39
import org .elasticsearch .common .settings .Settings ;
40
40
import org .elasticsearch .common .unit .TimeValue ;
41
- import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
42
41
import org .elasticsearch .index .Index ;
43
42
import org .elasticsearch .index .mapper .DocumentMapper ;
44
43
import org .elasticsearch .index .mapper .MapperService ;
49
48
import org .elasticsearch .indices .InvalidTypeNameException ;
50
49
import org .elasticsearch .indices .TypeMissingException ;
51
50
import org .elasticsearch .percolator .PercolatorService ;
51
+ import org .elasticsearch .threadpool .ThreadPool ;
52
52
53
53
import java .util .*;
54
- import java .util .concurrent .BlockingQueue ;
55
54
56
55
import static com .google .common .collect .Maps .newHashMap ;
57
56
import static org .elasticsearch .index .mapper .DocumentMapper .MergeFlags .mergeFlags ;
61
60
*/
62
61
public class MetaDataMappingService extends AbstractComponent {
63
62
63
+ private final ThreadPool threadPool ;
64
64
private final ClusterService clusterService ;
65
-
66
65
private final IndicesService indicesService ;
67
66
68
- private final BlockingQueue <MappingTask > refreshOrUpdateQueue = ConcurrentCollections .newBlockingQueue ();
67
+ // the mutex protect all the refreshOrUpdate variables!
68
+ private final Object refreshOrUpdateMutex = new Object ();
69
+ private final List <MappingTask > refreshOrUpdateQueue = new ArrayList <MappingTask >();
70
+ private long refreshOrUpdateInsertOrder ;
71
+ private long refreshOrUpdateProcessedInsertOrder ;
69
72
70
73
@ Inject
71
- public MetaDataMappingService (Settings settings , ClusterService clusterService , IndicesService indicesService ) {
74
+ public MetaDataMappingService (Settings settings , ThreadPool threadPool , ClusterService clusterService , IndicesService indicesService ) {
72
75
super (settings );
76
+ this .threadPool = threadPool ;
73
77
this .clusterService = clusterService ;
74
78
this .indicesService = indicesService ;
75
79
}
@@ -96,12 +100,16 @@ static class RefreshTask extends MappingTask {
96
100
static class UpdateTask extends MappingTask {
97
101
final String type ;
98
102
final CompressedString mappingSource ;
103
+ final long order ; // -1 for unknown
104
+ final String nodeId ; // null fr unknown
99
105
final ClusterStateUpdateListener listener ;
100
106
101
- UpdateTask (String index , String indexUUID , String type , CompressedString mappingSource , ClusterStateUpdateListener listener ) {
107
+ UpdateTask (String index , String indexUUID , String type , CompressedString mappingSource , long order , String nodeId , ClusterStateUpdateListener listener ) {
102
108
super (index , indexUUID );
103
109
this .type = type ;
104
110
this .mappingSource = mappingSource ;
111
+ this .order = order ;
112
+ this .nodeId = nodeId ;
105
113
this .listener = listener ;
106
114
}
107
115
}
@@ -111,9 +119,26 @@ static class UpdateTask extends MappingTask {
111
119
* as possible so we won't create the same index all the time for example for the updates on the same mapping
112
120
* and generate a single cluster change event out of all of those.
113
121
*/
114
- ClusterState executeRefreshOrUpdate (final ClusterState currentState ) throws Exception {
115
- List <MappingTask > allTasks = new ArrayList <MappingTask >();
116
- refreshOrUpdateQueue .drainTo (allTasks );
122
+ ClusterState executeRefreshOrUpdate (final ClusterState currentState , final long insertionOrder ) throws Exception {
123
+ final List <MappingTask > allTasks = new ArrayList <MappingTask >();
124
+
125
+ synchronized (refreshOrUpdateMutex ) {
126
+ if (refreshOrUpdateQueue .isEmpty ()) {
127
+ return currentState ;
128
+ }
129
+
130
+ // we already processed this task in a bulk manner in a previous cluster event, simply ignore
131
+ // it so we will let other tasks get in and processed ones, we will handle the queued ones
132
+ // later on in a subsequent cluster state event
133
+ if (insertionOrder < refreshOrUpdateProcessedInsertOrder ) {
134
+ return currentState ;
135
+ }
136
+
137
+ allTasks .addAll (refreshOrUpdateQueue );
138
+ refreshOrUpdateQueue .clear ();
139
+
140
+ refreshOrUpdateProcessedInsertOrder = refreshOrUpdateInsertOrder ;
141
+ }
117
142
118
143
if (allTasks .isEmpty ()) {
119
144
return currentState ;
@@ -132,32 +157,61 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exce
132
157
tasksPerIndex .put (task .index , indexTasks );
133
158
}
134
159
indexTasks .add (task );
135
-
136
160
}
137
161
138
162
boolean dirty = false ;
139
163
MetaData .Builder mdBuilder = MetaData .builder (currentState .metaData ());
140
164
for (Map .Entry <String , List <MappingTask >> entry : tasksPerIndex .entrySet ()) {
141
165
String index = entry .getKey ();
142
- List <MappingTask > tasks = entry .getValue ();
166
+ final IndexMetaData indexMetaData = mdBuilder .get (index );
167
+ if (indexMetaData == null ) {
168
+ // index got deleted on us, ignore...
169
+ logger .debug ("[{}] ignoring tasks - index meta data doesn't exist" , index );
170
+ continue ;
171
+ }
172
+ // the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
173
+ // the latest (based on order) update mapping one per node
174
+ List <MappingTask > allIndexTasks = entry .getValue ();
175
+ List <MappingTask > tasks = new ArrayList <MappingTask >();
176
+ for (MappingTask task : allIndexTasks ) {
177
+ if (!indexMetaData .isSameUUID (task .indexUUID )) {
178
+ logger .debug ("[{}] ignoring task [{}] - index meta data doesn't match task uuid" , index , task );
179
+ continue ;
180
+ }
181
+ boolean add = true ;
182
+ // if its an update task, make sure we only process the latest ordered one per node
183
+ if (task instanceof UpdateTask ) {
184
+ UpdateTask uTask = (UpdateTask ) task ;
185
+ // we can only do something to compare if we have the order && node
186
+ if (uTask .order != -1 && uTask .nodeId != null ) {
187
+ for (int i = 0 ; i < tasks .size (); i ++) {
188
+ MappingTask existing = tasks .get (i );
189
+ if (existing instanceof UpdateTask ) {
190
+ UpdateTask eTask = (UpdateTask ) existing ;
191
+ // if we have the order, and the node id, then we can compare, and replace if applicable
192
+ if (eTask .order != -1 && eTask .nodeId != null ) {
193
+ if (eTask .nodeId .equals (uTask .nodeId ) && uTask .order > eTask .order ) {
194
+ // a newer update task, we can replace so we execute it one!
195
+ tasks .set (i , uTask );
196
+ add = false ;
197
+ break ;
198
+ }
199
+ }
200
+ }
201
+ }
202
+ }
203
+ }
204
+
205
+ if (add ) {
206
+ tasks .add (task );
207
+ }
208
+ }
209
+
143
210
boolean removeIndex = false ;
144
211
// keep track of what we already refreshed, no need to refresh it again...
145
212
Set <String > processedRefreshes = Sets .newHashSet ();
146
213
try {
147
214
for (MappingTask task : tasks ) {
148
- final IndexMetaData indexMetaData = mdBuilder .get (index );
149
- if (indexMetaData == null ) {
150
- // index got deleted on us, ignore...
151
- logger .debug ("[{}] ignoring task [{}] - index meta data doesn't exist" , index , task );
152
- continue ;
153
- }
154
-
155
- if (!indexMetaData .isSameUUID (task .indexUUID )) {
156
- // index got deleted on us, ignore...
157
- logger .debug ("[{}] ignoring task [{}] - index meta data doesn't match task uuid" , index , task );
158
- continue ;
159
- }
160
-
161
215
if (task instanceof RefreshTask ) {
162
216
RefreshTask refreshTask = (RefreshTask ) task ;
163
217
try {
@@ -249,13 +303,24 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exce
249
303
if (removeIndex ) {
250
304
indicesService .removeIndex (index , "created for mapping processing" );
251
305
}
252
- for (Object task : tasks ) {
306
+ }
307
+ }
308
+
309
+ // fork sending back updates, so we won't wait to send them back on the cluster state, there
310
+ // might be a few of those...
311
+ threadPool .generic ().execute (new Runnable () {
312
+ @ Override
313
+ public void run () {
314
+ for (Object task : allTasks ) {
253
315
if (task instanceof UpdateTask ) {
254
- ((UpdateTask ) task ).listener .onResponse (new ClusterStateUpdateResponse (true ));
316
+ UpdateTask uTask = (UpdateTask ) task ;
317
+ ClusterStateUpdateResponse response = new ClusterStateUpdateResponse (true );
318
+ uTask .listener .onResponse (response );
255
319
}
256
320
}
257
321
}
258
- }
322
+ });
323
+
259
324
260
325
if (!dirty ) {
261
326
return currentState ;
@@ -267,7 +332,11 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exce
267
332
* Refreshes mappings if they are not the same between original and parsed version
268
333
*/
269
334
public void refreshMapping (final String index , final String indexUUID , final String ... types ) {
270
- refreshOrUpdateQueue .add (new RefreshTask (index , indexUUID , types ));
335
+ final long insertOrder ;
336
+ synchronized (refreshOrUpdateMutex ) {
337
+ insertOrder = ++refreshOrUpdateInsertOrder ;
338
+ refreshOrUpdateQueue .add (new RefreshTask (index , indexUUID , types ));
339
+ }
271
340
clusterService .submitStateUpdateTask ("refresh-mapping [" + index + "][" + Arrays .toString (types ) + "]" , Priority .HIGH , new ClusterStateUpdateTask () {
272
341
@ Override
273
342
public void onFailure (String source , Throwable t ) {
@@ -276,13 +345,17 @@ public void onFailure(String source, Throwable t) {
276
345
277
346
@ Override
278
347
public ClusterState execute (ClusterState currentState ) throws Exception {
279
- return executeRefreshOrUpdate (currentState );
348
+ return executeRefreshOrUpdate (currentState , insertOrder );
280
349
}
281
350
});
282
351
}
283
352
284
- public void updateMapping (final String index , final String indexUUID , final String type , final CompressedString mappingSource , final ClusterStateUpdateListener listener ) {
285
- refreshOrUpdateQueue .add (new UpdateTask (index , indexUUID , type , mappingSource , listener ));
353
+ public void updateMapping (final String index , final String indexUUID , final String type , final CompressedString mappingSource , final long order , final String nodeId , final ClusterStateUpdateListener listener ) {
354
+ final long insertOrder ;
355
+ synchronized (refreshOrUpdateMutex ) {
356
+ insertOrder = ++refreshOrUpdateInsertOrder ;
357
+ refreshOrUpdateQueue .add (new UpdateTask (index , indexUUID , type , mappingSource , order , nodeId , listener ));
358
+ }
286
359
clusterService .submitStateUpdateTask ("update-mapping [" + index + "][" + type + "]" , Priority .HIGH , new ClusterStateUpdateTask () {
287
360
@ Override
288
361
public void onFailure (String source , Throwable t ) {
@@ -291,7 +364,7 @@ public void onFailure(String source, Throwable t) {
291
364
292
365
@ Override
293
366
public ClusterState execute (final ClusterState currentState ) throws Exception {
294
- return executeRefreshOrUpdate (currentState );
367
+ return executeRefreshOrUpdate (currentState , insertOrder );
295
368
}
296
369
});
297
370
}
0 commit comments