Skip to content

Commit b8b4cbb

Browse files
committed
add mapping metadata to cluster state metadata, will allow in the future to add specific mappings hints (since we don't have the parsed mapping on each node anymore)
1 parent 51c18cd commit b8b4cbb

File tree

11 files changed

+125
-56
lines changed

11 files changed

+125
-56
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ public static State fromString(String state) {
9292

9393
private final Settings settings;
9494

95-
private final ImmutableMap<String, CompressedString> mappings;
95+
private final ImmutableMap<String, MappingMetaData> mappings;
9696

9797
private transient final int totalNumberOfShards;
9898

99-
private IndexMetaData(String index, State state, Settings settings, ImmutableMap<String, CompressedString> mappings) {
99+
private IndexMetaData(String index, State state, Settings settings, ImmutableMap<String, MappingMetaData> mappings) {
100100
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1) != -1, "must specify numberOfShards for index [" + index + "]");
101101
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1) != -1, "must specify numberOfReplicas for index [" + index + "]");
102102
this.index = index;
@@ -164,15 +164,15 @@ public ImmutableSet<String> getAliases() {
164164
return aliases();
165165
}
166166

167-
public ImmutableMap<String, CompressedString> mappings() {
167+
public ImmutableMap<String, MappingMetaData> mappings() {
168168
return mappings;
169169
}
170170

171-
public ImmutableMap<String, CompressedString> getMappings() {
171+
public ImmutableMap<String, MappingMetaData> getMappings() {
172172
return mappings();
173173
}
174174

175-
public CompressedString mapping(String mappingType) {
175+
public MappingMetaData mapping(String mappingType) {
176176
return mappings.get(mappingType);
177177
}
178178

@@ -192,7 +192,7 @@ public static class Builder {
192192

193193
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
194194

195-
private MapBuilder<String, CompressedString> mappings = MapBuilder.newMapBuilder();
195+
private MapBuilder<String, MappingMetaData> mappings = MapBuilder.newMapBuilder();
196196

197197
public Builder(String index) {
198198
this.index = index;
@@ -242,14 +242,13 @@ public Builder removeMapping(String mappingType) {
242242
return this;
243243
}
244244

245-
public Builder putMapping(String mappingType, CompressedString mappingSource) {
246-
mappings.put(mappingType, mappingSource);
245+
public Builder putMapping(MappingMetaData mappingMd) {
246+
mappings.put(mappingMd.type(), mappingMd);
247247
return this;
248248
}
249249

250250
public Builder putMapping(String mappingType, String mappingSource) throws IOException {
251-
mappings.put(mappingType, new CompressedString(mappingSource));
252-
return this;
251+
return putMapping(new MappingMetaData(mappingType, new CompressedString(mappingSource)));
253252
}
254253

255254
public Builder state(State state) {
@@ -273,8 +272,8 @@ public static void toXContent(IndexMetaData indexMetaData, XContentBuilder build
273272
builder.endObject();
274273

275274
builder.startArray("mappings");
276-
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
277-
byte[] data = entry.getValue().uncompressed();
275+
for (Map.Entry<String, MappingMetaData> entry : indexMetaData.mappings().entrySet()) {
276+
byte[] data = entry.getValue().source().uncompressed();
278277
XContentParser parser = XContentFactory.xContent(data).createParser(data);
279278
Map<String, Object> mapping = parser.map();
280279
parser.close();
@@ -307,12 +306,13 @@ public static IndexMetaData fromXContent(XContentParser parser, @Nullable Settin
307306
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
308307
Map<String, Object> mapping = parser.map();
309308
if (mapping.size() == 1) {
309+
String mappingType = mapping.keySet().iterator().next();
310310
String mappingSource = XContentFactory.jsonBuilder().map(mapping).string();
311311

312312
if (mappingSource == null) {
313313
// crap, no mapping source, warn?
314314
} else {
315-
builder.putMapping(mapping.keySet().iterator().next(), new CompressedString(mappingSource));
315+
builder.putMapping(new MappingMetaData(mappingType, new CompressedString(mappingSource)));
316316
}
317317
}
318318
}
@@ -332,7 +332,8 @@ public static IndexMetaData readFrom(StreamInput in, Settings globalSettings) th
332332
builder.settings(readSettingsFromStream(in, globalSettings));
333333
int mappingsSize = in.readVInt();
334334
for (int i = 0; i < mappingsSize; i++) {
335-
builder.putMapping(in.readUTF(), CompressedString.readCompressedString(in));
335+
MappingMetaData mappingMd = MappingMetaData.readFrom(in);
336+
builder.putMapping(mappingMd);
336337
}
337338
return builder.build();
338339
}
@@ -342,9 +343,8 @@ public static void writeTo(IndexMetaData indexMetaData, StreamOutput out) throws
342343
out.writeByte(indexMetaData.state().id());
343344
writeSettingsToStream(indexMetaData.settings(), out);
344345
out.writeVInt(indexMetaData.mappings().size());
345-
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
346-
out.writeUTF(entry.getKey());
347-
entry.getValue().writeTo(out);
346+
for (MappingMetaData mappingMd : indexMetaData.mappings().values()) {
347+
MappingMetaData.writeTo(mappingMd, out);
348348
}
349349
}
350350
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.metadata;
21+
22+
import org.elasticsearch.common.compress.CompressedString;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.index.mapper.DocumentMapper;
26+
27+
import java.io.IOException;
28+
29+
/**
30+
* @author kimchy (shay.banon)
31+
*/
32+
public class MappingMetaData {
33+
34+
private final String type;
35+
36+
private final CompressedString source;
37+
38+
public MappingMetaData(DocumentMapper docMapper) {
39+
this.type = docMapper.type();
40+
this.source = docMapper.mappingSource();
41+
}
42+
43+
public MappingMetaData(String type, CompressedString source) {
44+
this.type = type;
45+
this.source = source;
46+
}
47+
48+
public String type() {
49+
return this.type;
50+
}
51+
52+
public CompressedString source() {
53+
return this.source;
54+
}
55+
56+
public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException {
57+
out.writeUTF(mappingMd.type());
58+
mappingMd.source().writeTo(out);
59+
}
60+
61+
public static MappingMetaData readFrom(StreamInput in) throws IOException {
62+
return new MappingMetaData(in.readUTF(), CompressedString.readCompressedString(in));
63+
}
64+
}

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,15 @@ public void createIndex(final Request request, final Listener listener) {
186186
}
187187
}
188188
// now, update the mappings with the actual source
189-
mappings.clear();
189+
Map<String, MappingMetaData> mappingsMetaData = Maps.newHashMap();
190190
for (DocumentMapper mapper : mapperService) {
191-
mappings.put(mapper.type(), mapper.mappingSource());
191+
MappingMetaData mappingMd = new MappingMetaData(mapper);
192+
mappingsMetaData.put(mapper.type(), mappingMd);
192193
}
193194

194195
final IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings);
195-
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
196-
indexMetaDataBuilder.putMapping(entry.getKey(), entry.getValue());
196+
for (MappingMetaData mappingMd : mappingsMetaData.values()) {
197+
indexMetaDataBuilder.putMapping(mappingMd);
197198
}
198199
indexMetaDataBuilder.state(request.state);
199200
final IndexMetaData indexMetaData = indexMetaDataBuilder.build();
@@ -300,6 +301,13 @@ public Request mappings(Map<String, String> mappings) {
300301
return this;
301302
}
302303

304+
public Request mappingsMetaData(Map<String, MappingMetaData> mappings) throws IOException {
305+
for (Map.Entry<String, MappingMetaData> entry : mappings.entrySet()) {
306+
this.mappings.put(entry.getKey(), entry.getValue().source().string());
307+
}
308+
return this;
309+
}
310+
303311
public Request mappingsCompressed(Map<String, CompressedString> mappings) throws IOException {
304312
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
305313
this.mappings.put(entry.getKey(), entry.getValue().string());

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.cluster.ClusterState;
2424
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2525
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
26-
import org.elasticsearch.common.collect.Tuple;
2726
import org.elasticsearch.common.component.AbstractComponent;
2827
import org.elasticsearch.common.compress.CompressedString;
2928
import org.elasticsearch.common.inject.Inject;
@@ -77,7 +76,7 @@ public void updateMapping(final String index, final String type, final String ma
7776
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
7877
// only add the current relevant mapping (if exists)
7978
if (indexMetaData.mappings().containsKey(type)) {
80-
indexService.mapperService().add(type, indexMetaData.mappings().get(type).string());
79+
indexService.mapperService().add(type, indexMetaData.mappings().get(type).source().string());
8180
}
8281
}
8382
MapperService mapperService = indexService.mapperService();
@@ -104,7 +103,7 @@ public void updateMapping(final String index, final String type, final String ma
104103

105104
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
106105
IndexMetaData indexMetaData = currentState.metaData().index(index);
107-
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, existingMapper.mappingSource()));
106+
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(existingMapper)));
108107
return newClusterStateBuilder().state(currentState).metaData(builder).build();
109108
} catch (Exception e) {
110109
logger.warn("failed to dynamically update the mapping in cluster_state from shard", e);
@@ -160,7 +159,7 @@ public void putMapping(final PutRequest request, final Listener listener) {
160159
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
161160
// only add the current relevant mapping (if exists)
162161
if (indexMetaData.mappings().containsKey(request.mappingType)) {
163-
indexService.mapperService().add(request.mappingType, indexMetaData.mappings().get(request.mappingType).string());
162+
indexService.mapperService().add(request.mappingType, indexMetaData.mappings().get(request.mappingType).source().string());
164163
}
165164
}
166165

@@ -197,7 +196,7 @@ public void putMapping(final PutRequest request, final Listener listener) {
197196
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
198197
}
199198

200-
final Map<String, Tuple<String, CompressedString>> mappings = newHashMap();
199+
final Map<String, MappingMetaData> mappings = newHashMap();
201200
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
202201
String index = entry.getKey();
203202
// do the actual merge here on the master, and update the mapping source
@@ -214,7 +213,7 @@ public void putMapping(final PutRequest request, final Listener listener) {
214213
// same source, no changes, ignore it
215214
} else {
216215
// use the merged mapping source
217-
mappings.put(index, new Tuple<String, CompressedString>(existingMapper.type(), updatedSource));
216+
mappings.put(index, new MappingMetaData(existingMapper));
218217
if (logger.isDebugEnabled()) {
219218
logger.debug("[{}] update_mapping [{}] with source [{}]", index, existingMapper.type(), updatedSource);
220219
} else if (logger.isInfoEnabled()) {
@@ -223,7 +222,7 @@ public void putMapping(final PutRequest request, final Listener listener) {
223222
}
224223
} else {
225224
CompressedString newSource = newMapper.mappingSource();
226-
mappings.put(index, new Tuple<String, CompressedString>(newMapper.type(), newSource));
225+
mappings.put(index, new MappingMetaData(newMapper));
227226
if (logger.isDebugEnabled()) {
228227
logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), newSource);
229228
} else if (logger.isInfoEnabled()) {
@@ -244,9 +243,9 @@ public void putMapping(final PutRequest request, final Listener listener) {
244243
if (indexMetaData == null) {
245244
throw new IndexMissingException(new Index(indexName));
246245
}
247-
Tuple<String, CompressedString> mapping = mappings.get(indexName);
248-
if (mapping != null) {
249-
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2()));
246+
MappingMetaData mappingMd = mappings.get(indexName);
247+
if (mappingMd != null) {
248+
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mappingMd));
250249
}
251250
}
252251

modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public LocalGatewayStartedShards currentStartedShards() {
180180
try {
181181
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index())
182182
.settings(indexMetaData.settings())
183-
.mappingsCompressed(indexMetaData.mappings())
183+
.mappingsMetaData(indexMetaData.mappings())
184184
.state(indexMetaData.state())
185185
.blocks(ImmutableSet.of(GatewayService.INDEX_NOT_RECOVERED_BLOCK))
186186
.timeout(timeValueSeconds(30)),

modules/elasticsearch/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private void updateClusterStateFromGateway(final MetaData fMetaData, final Gatew
149149
try {
150150
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index())
151151
.settings(indexMetaData.settings())
152-
.mappingsCompressed(indexMetaData.mappings())
152+
.mappingsMetaData(indexMetaData.mappings())
153153
.state(indexMetaData.state())
154154
.blocks(ImmutableSet.of(GatewayService.INDEX_NOT_RECOVERED_BLOCK))
155155
.timeout(timeValueSeconds(30)),

modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
2929
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3030
import org.elasticsearch.cluster.metadata.IndexMetaData;
31+
import org.elasticsearch.cluster.metadata.MappingMetaData;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.cluster.node.DiscoveryNodes;
3334
import org.elasticsearch.cluster.routing.*;
34-
import org.elasticsearch.common.collect.ImmutableMap;
3535
import org.elasticsearch.common.collect.Tuple;
3636
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3737
import org.elasticsearch.common.compress.CompressedString;
@@ -55,7 +55,6 @@
5555
import org.elasticsearch.indices.IndicesService;
5656
import org.elasticsearch.threadpool.ThreadPool;
5757

58-
import java.util.Map;
5958
import java.util.Set;
6059
import java.util.concurrent.ConcurrentMap;
6160

@@ -219,11 +218,10 @@ private void applyMappings(ClusterChangedEvent event) {
219218
String index = indexMetaData.index();
220219
IndexService indexService = indicesService.indexServiceSafe(index);
221220
MapperService mapperService = indexService.mapperService();
222-
ImmutableMap<String, CompressedString> mappings = indexMetaData.mappings();
223221
// go over and add the relevant mappings (or update them)
224-
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
225-
String mappingType = entry.getKey();
226-
CompressedString mappingSource = entry.getValue();
222+
for (MappingMetaData mappingMd : indexMetaData.mappings().values()) {
223+
String mappingType = mappingMd.type();
224+
CompressedString mappingSource = mappingMd.source();
227225
if (!seenMappings.containsKey(new Tuple<String, String>(index, mappingType))) {
228226
seenMappings.put(new Tuple<String, String>(index, mappingType), true);
229227
}
@@ -252,7 +250,7 @@ private void applyMappings(ClusterChangedEvent event) {
252250
}
253251
// go over and remove mappings
254252
for (DocumentMapper documentMapper : mapperService) {
255-
if (seenMappings.containsKey(new Tuple<String, String>(index, documentMapper.type())) && !mappings.containsKey(documentMapper.type())) {
253+
if (seenMappings.containsKey(new Tuple<String, String>(index, documentMapper.type())) && !indexMetaData.mappings().containsKey(documentMapper.type())) {
256254
// we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it
257255
mapperService.remove(documentMapper.type());
258256
seenMappings.remove(new Tuple<String, String>(index, documentMapper.type()));

0 commit comments

Comments
 (0)