27
27
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
28
28
import org .elasticsearch .common .inject .Inject ;
29
29
import org .elasticsearch .common .settings .Settings ;
30
- import org .elasticsearch .common .timer .Timeout ;
31
- import org .elasticsearch .common .timer .TimerTask ;
32
30
import org .elasticsearch .common .unit .TimeValue ;
33
31
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
34
32
import org .elasticsearch .common .util .concurrent .ConcurrentMapLong ;
50
48
import org .elasticsearch .search .internal .InternalSearchRequest ;
51
49
import org .elasticsearch .search .internal .SearchContext ;
52
50
import org .elasticsearch .search .query .*;
51
+ import org .elasticsearch .threadpool .ThreadPool ;
53
52
import org .elasticsearch .timer .TimerService ;
54
53
55
54
import javax .annotation .Nullable ;
56
55
import java .io .IOException ;
57
56
import java .util .HashMap ;
58
57
import java .util .Map ;
59
- import java .util .concurrent .TimeUnit ;
58
+ import java .util .concurrent .ScheduledFuture ;
60
59
import java .util .concurrent .atomic .AtomicLong ;
61
60
62
61
import static org .elasticsearch .common .unit .TimeValue .*;
@@ -81,7 +80,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
81
80
private final FetchPhase fetchPhase ;
82
81
83
82
84
- private final TimeValue defaultKeepAlive ;
83
+ private final long defaultKeepAlive ;
84
+
85
+ private final ScheduledFuture keepAliveReaper ;
85
86
86
87
87
88
private final AtomicLong idGenerator = new AtomicLong ();
@@ -92,7 +93,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
92
93
93
94
private final ImmutableMap <String , SearchParseElement > elementParsers ;
94
95
95
- @ Inject public SearchService (Settings settings , ClusterService clusterService , IndicesService indicesService , IndicesLifecycle indicesLifecycle , TimerService timerService ,
96
+ @ Inject public SearchService (Settings settings , ClusterService clusterService , IndicesService indicesService , IndicesLifecycle indicesLifecycle , ThreadPool threadPool , TimerService timerService ,
96
97
ScriptService scriptService , DfsPhase dfsPhase , QueryPhase queryPhase , FetchPhase fetchPhase ) {
97
98
super (settings );
98
99
this .clusterService = clusterService ;
@@ -103,15 +104,18 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
103
104
this .queryPhase = queryPhase ;
104
105
this .fetchPhase = fetchPhase ;
105
106
107
+ TimeValue keepAliveInterval = componentSettings .getAsTime ("keep_alive_interval" , timeValueMinutes (1 ));
106
108
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
107
- this .defaultKeepAlive = componentSettings .getAsTime ("default_keep_alive" , timeValueMinutes (5 ));
109
+ this .defaultKeepAlive = componentSettings .getAsTime ("default_keep_alive" , timeValueMinutes (5 )). millis () ;
108
110
109
111
Map <String , SearchParseElement > elementParsers = new HashMap <String , SearchParseElement >();
110
112
elementParsers .putAll (dfsPhase .parseElements ());
111
113
elementParsers .putAll (queryPhase .parseElements ());
112
114
elementParsers .putAll (fetchPhase .parseElements ());
113
115
this .elementParsers = ImmutableMap .copyOf (elementParsers );
114
116
indicesLifecycle .addListener (indicesLifecycleListener );
117
+
118
+ this .keepAliveReaper = threadPool .scheduleWithFixedDelay (new Reaper (), keepAliveInterval );
115
119
}
116
120
117
121
@ Override protected void doStart () throws ElasticSearchException {
@@ -125,6 +129,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
125
129
}
126
130
127
131
@ Override protected void doClose () throws ElasticSearchException {
132
+ keepAliveReaper .cancel (false );
128
133
indicesService .indicesLifecycle ().removeListener (indicesLifecycleListener );
129
134
}
130
135
@@ -344,9 +349,9 @@ private SearchContext createContext(InternalSearchRequest request) throws Elasti
344
349
fetchPhase .preProcess (context );
345
350
346
351
// compute the context keep alive
347
- TimeValue keepAlive = defaultKeepAlive ;
352
+ long keepAlive = defaultKeepAlive ;
348
353
if (request .scroll () != null && request .scroll ().keepAlive () != null ) {
349
- keepAlive = request .scroll ().keepAlive ();
354
+ keepAlive = request .scroll ().keepAlive (). millis () ;
350
355
}
351
356
context .keepAlive (keepAlive );
352
357
} catch (RuntimeException e ) {
@@ -371,18 +376,12 @@ private void freeContext(SearchContext context) {
371
376
}
372
377
373
378
private void contextProcessing (SearchContext context ) {
374
- if (context .keepAliveTimeout () != null ) {
375
- ((KeepAliveTimerTask ) context .keepAliveTimeout ().getTask ()).processing ();
376
- }
379
+ // disable timeout while executing a search
380
+ context .accessed (-1 );
377
381
}
378
382
379
383
private void contextProcessedSuccessfully (SearchContext context ) {
380
- if (context .keepAliveTimeout () != null ) {
381
- ((KeepAliveTimerTask ) context .keepAliveTimeout ().getTask ()).doneProcessing ();
382
- } else {
383
- context .accessed (timerService .estimatedTimeInMillis ());
384
- context .keepAliveTimeout (timerService .newTimeout (new KeepAliveTimerTask (context ), context .keepAlive (), TimerService .ExecutionType .DEFAULT ));
385
- }
384
+ context .accessed (timerService .estimatedTimeInMillis ());
386
385
}
387
386
388
387
private void cleanContext (SearchContext context ) {
@@ -451,7 +450,7 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co
451
450
context .scroll (request .scroll ());
452
451
// update the context keep alive based on the new scroll value
453
452
if (request .scroll () != null && request .scroll ().keepAlive () != null ) {
454
- context .keepAlive (request .scroll ().keepAlive ());
453
+ context .keepAlive (request .scroll ().keepAlive (). millis () );
455
454
}
456
455
}
457
456
@@ -466,35 +465,15 @@ class CleanContextOnIndicesLifecycleListener extends IndicesLifecycle.Listener {
466
465
}
467
466
}
468
467
469
- class KeepAliveTimerTask implements TimerTask {
470
-
471
- private final SearchContext context ;
472
-
473
- KeepAliveTimerTask (SearchContext context ) {
474
- this .context = context ;
475
- }
476
-
477
- public void processing () {
478
- context .keepAliveTimeout ().cancel ();
479
- }
480
-
481
- public void doneProcessing () {
482
- context .accessed (timerService .estimatedTimeInMillis ());
483
- context .keepAliveTimeout (timerService .newTimeout (this , context .keepAlive (), TimerService .ExecutionType .DEFAULT ));
484
- }
485
-
486
- @ Override public void run (Timeout timeout ) throws Exception {
487
- if (timeout .isCancelled ()) {
488
- return ;
489
- }
490
- long currentTime = timerService .estimatedTimeInMillis ();
491
- long nextDelay = context .keepAlive ().millis () - (currentTime - context .lastAccessTime ());
492
- if (nextDelay <= 0 ) {
493
- // Time out, free the context (and remove it from the active context)
494
- freeContext (context .id ());
495
- } else {
496
- // Read occurred before the timeout - set a new timeout with shorter delay.
497
- context .keepAliveTimeout (timerService .newTimeout (this , nextDelay , TimeUnit .MILLISECONDS , TimerService .ExecutionType .DEFAULT ));
468
+ class Reaper implements Runnable {
469
+ @ Override public void run () {
470
+ for (SearchContext context : activeContexts .values ()) {
471
+ if (context .lastAccessTime () == -1 ) { // its being processed or timeout is disabled
472
+ continue ;
473
+ }
474
+ if ((timerService .estimatedTimeInMillis () - context .lastAccessTime () > context .keepAlive ())) {
475
+ freeContext (context );
476
+ }
498
477
}
499
478
}
500
479
}
0 commit comments