Skip to content

Commit 08ecd9d

Browse files
committed
refactor all Queue and BlockingQueue creations into a single factory method
1 parent e3a9271 commit 08ecd9d

14 files changed

+63
-59
lines changed

src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.action.search.type;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2423
import org.elasticsearch.search.SearchShardTarget;
2524
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -36,19 +35,20 @@
3635
*/
3736
public class TransportSearchCache {
3837

39-
private final Queue<Collection<DfsSearchResult>> cacheDfsResults = new LinkedTransferQueue<Collection<DfsSearchResult>>();
38+
private final Queue<Collection<DfsSearchResult>> cacheDfsResults = ConcurrentCollections.newQueue();
4039

41-
private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = new LinkedTransferQueue<Map<SearchShardTarget, QuerySearchResultProvider>>();
40+
private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = ConcurrentCollections.newQueue();
4241

43-
private final Queue<Map<SearchShardTarget, FetchSearchResult>> cacheFetchResults = new LinkedTransferQueue<Map<SearchShardTarget, FetchSearchResult>>();
42+
private final Queue<Map<SearchShardTarget, FetchSearchResult>> cacheFetchResults = ConcurrentCollections.newQueue();
4443

45-
private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = new LinkedTransferQueue<Map<SearchShardTarget, QueryFetchSearchResult>>();
44+
private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = ConcurrentCollections.newQueue();
4645

4746

4847
public Collection<DfsSearchResult> obtainDfsResults() {
4948
Collection<DfsSearchResult> dfsSearchResults;
5049
while ((dfsSearchResults = cacheDfsResults.poll()) == null) {
51-
cacheDfsResults.offer(new LinkedTransferQueue<DfsSearchResult>());
50+
Queue<DfsSearchResult> offer = ConcurrentCollections.newQueue();
51+
cacheDfsResults.offer(offer);
5252
}
5353
return dfsSearchResults;
5454
}

src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.action.search.type;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.search.*;
2524
import org.elasticsearch.cluster.ClusterService;
@@ -29,6 +28,7 @@
2928
import org.elasticsearch.common.component.AbstractComponent;
3029
import org.elasticsearch.common.inject.Inject;
3130
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3232
import org.elasticsearch.search.SearchShardTarget;
3333
import org.elasticsearch.search.action.SearchServiceListener;
3434
import org.elasticsearch.search.action.SearchServiceTransportAction;
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.threadpool.ThreadPool;
4040

4141
import java.util.Map;
42+
import java.util.Queue;
4243
import java.util.concurrent.atomic.AtomicInteger;
4344

4445
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@@ -84,7 +85,7 @@ private class AsyncAction {
8485

8586
private final DiscoveryNodes nodes;
8687

87-
private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
88+
private volatile Queue<ShardSearchFailure> shardFailures;
8889

8990
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
9091

@@ -104,7 +105,7 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action
104105
}
105106

106107
protected final ShardSearchFailure[] buildShardFailures() {
107-
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
108+
Queue<ShardSearchFailure> localFailures = shardFailures;
108109
if (localFailures == null) {
109110
return ShardSearchFailure.EMPTY_ARRAY;
110111
}
@@ -115,7 +116,7 @@ protected final ShardSearchFailure[] buildShardFailures() {
115116
// we simply try and return as much as possible
116117
protected final void addShardFailure(ShardSearchFailure failure) {
117118
if (shardFailures == null) {
118-
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
119+
shardFailures = ConcurrentCollections.newQueue();
119120
}
120121
shardFailures.add(failure);
121122
}

src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.action.search.type;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.search.*;
2524
import org.elasticsearch.cluster.ClusterService;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.common.inject.Inject;
3130
import org.elasticsearch.common.settings.Settings;
3231
import org.elasticsearch.common.trove.ExtTIntArrayList;
32+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3333
import org.elasticsearch.search.SearchShardTarget;
3434
import org.elasticsearch.search.action.SearchServiceListener;
3535
import org.elasticsearch.search.action.SearchServiceTransportAction;
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.threadpool.ThreadPool;
4444

4545
import java.util.Map;
46+
import java.util.Queue;
4647
import java.util.concurrent.atomic.AtomicInteger;
4748

4849
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@@ -88,7 +89,7 @@ private class AsyncAction {
8889

8990
private final DiscoveryNodes nodes;
9091

91-
protected volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
92+
protected volatile Queue<ShardSearchFailure> shardFailures;
9293

9394
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
9495

@@ -109,7 +110,7 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action
109110
}
110111

111112
protected final ShardSearchFailure[] buildShardFailures() {
112-
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
113+
Queue<ShardSearchFailure> localFailures = shardFailures;
113114
if (localFailures == null) {
114115
return ShardSearchFailure.EMPTY_ARRAY;
115116
}
@@ -120,7 +121,7 @@ protected final ShardSearchFailure[] buildShardFailures() {
120121
// we simply try and return as much as possible
121122
protected final void addShardFailure(ShardSearchFailure failure) {
122123
if (shardFailures == null) {
123-
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
124+
shardFailures = ConcurrentCollections.newQueue();
124125
}
125126
shardFailures.add(failure);
126127
}

src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.action.search.type;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.apache.lucene.search.ScoreDoc;
2423
import org.elasticsearch.action.ActionListener;
2524
import org.elasticsearch.action.search.*;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.common.component.AbstractComponent;
3130
import org.elasticsearch.common.inject.Inject;
3231
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3333
import org.elasticsearch.search.SearchShardTarget;
3434
import org.elasticsearch.search.action.SearchServiceListener;
3535
import org.elasticsearch.search.action.SearchServiceTransportAction;
@@ -43,6 +43,7 @@
4343

4444
import java.io.IOException;
4545
import java.util.Map;
46+
import java.util.Queue;
4647
import java.util.concurrent.atomic.AtomicInteger;
4748

4849
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@@ -88,7 +89,7 @@ private class AsyncAction {
8889

8990
private final DiscoveryNodes nodes;
9091

91-
protected volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
92+
protected volatile Queue<ShardSearchFailure> shardFailures;
9293

9394
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
9495

@@ -108,7 +109,7 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action
108109
}
109110

110111
protected final ShardSearchFailure[] buildShardFailures() {
111-
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
112+
Queue<ShardSearchFailure> localFailures = shardFailures;
112113
if (localFailures == null) {
113114
return ShardSearchFailure.EMPTY_ARRAY;
114115
}
@@ -119,7 +120,7 @@ protected final ShardSearchFailure[] buildShardFailures() {
119120
// we simply try and return as much as possible
120121
protected final void addShardFailure(ShardSearchFailure failure) {
121122
if (shardFailures == null) {
122-
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
123+
shardFailures = ConcurrentCollections.newQueue();
123124
}
124125
shardFailures.add(failure);
125126
}

src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.action.search.type;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.search.*;
2524
import org.elasticsearch.action.support.TransportAction;
@@ -34,6 +33,7 @@
3433
import org.elasticsearch.common.Nullable;
3534
import org.elasticsearch.common.settings.Settings;
3635
import org.elasticsearch.common.trove.ExtTIntArrayList;
36+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3737
import org.elasticsearch.search.SearchPhaseResult;
3838
import org.elasticsearch.search.SearchShardTarget;
3939
import org.elasticsearch.search.action.SearchServiceListener;
@@ -46,6 +46,7 @@
4646

4747
import java.util.Arrays;
4848
import java.util.Map;
49+
import java.util.Queue;
4950
import java.util.Set;
5051
import java.util.concurrent.atomic.AtomicInteger;
5152

@@ -92,7 +93,7 @@ protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult>
9293

9394
private final AtomicInteger totalOps = new AtomicInteger();
9495

95-
private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
96+
private volatile Queue<ShardSearchFailure> shardFailures;
9697

9798
protected volatile ShardDoc[] sortedShardList;
9899

@@ -308,7 +309,7 @@ protected final long buildTookInMillis() {
308309
}
309310

310311
protected final ShardSearchFailure[] buildShardFailures() {
311-
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
312+
Queue<ShardSearchFailure> localFailures = shardFailures;
312313
if (localFailures == null) {
313314
return ShardSearchFailure.EMPTY_ARRAY;
314315
}
@@ -319,7 +320,7 @@ protected final ShardSearchFailure[] buildShardFailures() {
319320
// we simply try and return as much as possible
320321
protected final void addShardFailure(ShardSearchFailure failure) {
321322
if (shardFailures == null) {
322-
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
323+
shardFailures = ConcurrentCollections.newQueue();
323324
}
324325
shardFailures.add(failure);
325326
}

src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.collect.ImmutableList;
2323
import com.google.common.collect.Lists;
2424
import com.google.common.collect.Sets;
25-
import jsr166y.LinkedTransferQueue;
2625
import org.elasticsearch.ElasticSearchException;
2726
import org.elasticsearch.ExceptionsHelper;
2827
import org.elasticsearch.action.ActionListener;
@@ -38,13 +37,11 @@
3837
import org.elasticsearch.common.settings.Settings;
3938
import org.elasticsearch.common.transport.TransportAddress;
4039
import org.elasticsearch.common.unit.TimeValue;
40+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4141
import org.elasticsearch.threadpool.ThreadPool;
4242
import org.elasticsearch.transport.*;
4343

44-
import java.util.HashSet;
45-
import java.util.Iterator;
46-
import java.util.List;
47-
import java.util.Set;
44+
import java.util.*;
4845
import java.util.concurrent.CountDownLatch;
4946
import java.util.concurrent.ScheduledFuture;
5047
import java.util.concurrent.atomic.AtomicInteger;
@@ -351,7 +348,7 @@ public synchronized void sample() {
351348
}
352349

353350
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
354-
final LinkedTransferQueue<ClusterStateResponse> clusterStateResponses = new LinkedTransferQueue<ClusterStateResponse>();
351+
final Queue<ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newQueue();
355352
for (final DiscoveryNode listedNode : nodesToPing) {
356353
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
357354
@Override

src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.cluster.action.shard;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.elasticsearch.ElasticSearchException;
2423
import org.elasticsearch.cluster.ClusterService;
2524
import org.elasticsearch.cluster.ClusterState;
@@ -38,6 +37,7 @@
3837
import org.elasticsearch.common.io.stream.Streamable;
3938
import org.elasticsearch.common.io.stream.VoidStreamable;
4039
import org.elasticsearch.common.settings.Settings;
40+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4141
import org.elasticsearch.threadpool.ThreadPool;
4242
import org.elasticsearch.transport.*;
4343

@@ -62,7 +62,7 @@ public class ShardStateAction extends AbstractComponent {
6262

6363
private final ThreadPool threadPool;
6464

65-
private final BlockingQueue<ShardRouting> startedShardsQueue = new LinkedTransferQueue<ShardRouting>();
65+
private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
6666

6767
@Inject
6868
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,

src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.cluster.service;
2121

22-
import jsr166y.LinkedTransferQueue;
2322
import org.elasticsearch.ElasticSearchException;
2423
import org.elasticsearch.ElasticSearchIllegalStateException;
2524
import org.elasticsearch.cluster.*;
@@ -34,6 +33,7 @@
3433
import org.elasticsearch.common.inject.Inject;
3534
import org.elasticsearch.common.settings.Settings;
3635
import org.elasticsearch.common.unit.TimeValue;
36+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3737
import org.elasticsearch.discovery.Discovery;
3838
import org.elasticsearch.discovery.DiscoveryService;
3939
import org.elasticsearch.node.settings.NodeSettingsService;
@@ -76,7 +76,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
7676
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
7777
private final List<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
7878

79-
private final Queue<NotifyTimeout> onGoingTimeouts = new LinkedTransferQueue<NotifyTimeout>();
79+
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
8080

8181
private volatile ClusterState clusterState = newClusterStateBuilder().build();
8282

0 commit comments

Comments
 (0)