26
26
import org .elasticsearch .cluster .action .index .NodeMappingCreatedAction ;
27
27
import org .elasticsearch .cluster .routing .IndexRoutingTable ;
28
28
import org .elasticsearch .common .collect .Lists ;
29
+ import org .elasticsearch .common .collect .Maps ;
30
+ import org .elasticsearch .common .collect .Sets ;
29
31
import org .elasticsearch .common .component .AbstractComponent ;
30
32
import org .elasticsearch .common .compress .CompressedString ;
31
33
import org .elasticsearch .common .inject .Inject ;
44
46
import java .util .Arrays ;
45
47
import java .util .List ;
46
48
import java .util .Map ;
49
+ import java .util .Set ;
47
50
import java .util .concurrent .atomic .AtomicBoolean ;
48
51
import java .util .concurrent .atomic .AtomicInteger ;
49
52
@@ -64,6 +67,8 @@ public class MetaDataMappingService extends AbstractComponent {
64
67
65
68
private final NodeMappingCreatedAction mappingCreatedAction ;
66
69
70
+ private final Map <String , Set <String >> indicesAndTypesToRefresh = Maps .newHashMap ();
71
+
67
72
@ Inject public MetaDataMappingService (Settings settings , ClusterService clusterService , IndicesService indicesService , NodeMappingCreatedAction mappingCreatedAction ) {
68
73
super (settings );
69
74
this .clusterService = clusterService ;
@@ -75,10 +80,27 @@ public class MetaDataMappingService extends AbstractComponent {
75
80
* Refreshes mappings if they are not the same between original and parsed version
76
81
*/
77
82
public void refreshMapping (final String index , final String ... types ) {
83
+ synchronized (indicesAndTypesToRefresh ) {
84
+ Set <String > sTypes = indicesAndTypesToRefresh .get (index );
85
+ if (sTypes == null ) {
86
+ sTypes = Sets .newHashSet ();
87
+ indicesAndTypesToRefresh .put (index , sTypes );
88
+ }
89
+ sTypes .addAll (Arrays .asList (types ));
90
+ }
78
91
clusterService .submitStateUpdateTask ("refresh-mapping [" + index + "][" + Arrays .toString (types ) + "]" , new ClusterStateUpdateTask () {
79
92
@ Override public ClusterState execute (ClusterState currentState ) {
80
93
boolean createdIndex = false ;
81
94
try {
95
+ Set <String > sTypes ;
96
+ synchronized (indicesAndTypesToRefresh ) {
97
+ sTypes = indicesAndTypesToRefresh .remove (index );
98
+ }
99
+ // we already processed those types...
100
+ if (sTypes == null || sTypes .isEmpty ()) {
101
+ return currentState ;
102
+ }
103
+
82
104
// first, check if it really needs to be updated
83
105
final IndexMetaData indexMetaData = currentState .metaData ().index (index );
84
106
if (indexMetaData == null ) {
@@ -91,7 +113,7 @@ public void refreshMapping(final String index, final String... types) {
91
113
// we need to create the index here, and add the current mapping to it, so we can merge
92
114
indexService = indicesService .createIndex (indexMetaData .index (), indexMetaData .settings (), currentState .nodes ().localNode ().id ());
93
115
createdIndex = true ;
94
- for (String type : types ) {
116
+ for (String type : sTypes ) {
95
117
// only add the current relevant mapping (if exists)
96
118
if (indexMetaData .mappings ().containsKey (type )) {
97
119
indexService .mapperService ().add (type , indexMetaData .mappings ().get (type ).source ().string ());
@@ -100,7 +122,7 @@ public void refreshMapping(final String index, final String... types) {
100
122
}
101
123
IndexMetaData .Builder indexMetaDataBuilder = newIndexMetaDataBuilder (indexMetaData );
102
124
List <String > updatedTypes = Lists .newArrayList ();
103
- for (String type : types ) {
125
+ for (String type : sTypes ) {
104
126
DocumentMapper mapper = indexService .mapperService ().documentMapper (type );
105
127
if (!mapper .mappingSource ().equals (indexMetaData .mappings ().get (type ).source ())) {
106
128
updatedTypes .add (type );
0 commit comments