Skip to content

Commit 01b18ad

Browse files
committed
- fixed terms aggs ordering by sub metric agg - introduced MetricsAggregator to enable access to metrics values without creating buckets
fixes: elastic#4643
1 parent 1584c73 commit 01b18ad

34 files changed

+1175
-193
lines changed

src/main/java/org/elasticsearch/search/aggregations/Aggregator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ public Aggregator parent() {
106106
return parent;
107107
}
108108

109+
public Aggregator[] subAggregators() {
110+
return subAggregators;
111+
}
112+
109113
/**
110114
* @return The current aggregation context.
111115
*/

src/main/java/org/elasticsearch/search/aggregations/bucket/Bucket.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ public interface Bucket {
3535

3636
Aggregations getAggregations();
3737

38-
static class Comparator<B extends Bucket> implements java.util.Comparator<B> {
38+
static class SubAggregationComparator<B extends Bucket> implements java.util.Comparator<B> {
3939

4040
private final String aggName;
4141
private final String valueName;
4242
private final boolean asc;
4343

44-
public Comparator(String expression, boolean asc) {
44+
public SubAggregationComparator(String expression, boolean asc) {
4545
this.asc = asc;
4646
int i = expression.indexOf('.');
4747
if (i < 0) {
@@ -53,7 +53,7 @@ public Comparator(String expression, boolean asc) {
5353
}
5454
}
5555

56-
public Comparator(String aggName, String valueName, boolean asc) {
56+
public SubAggregationComparator(String aggName, String valueName, boolean asc) {
5757
this.aggName = aggName;
5858
this.valueName = valueName;
5959
this.asc = asc;

src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.elasticsearch.search.aggregations.support.AggregationContext;
3030

3131
import java.io.IOException;
32+
import java.util.ArrayList;
3233
import java.util.Arrays;
34+
import java.util.List;
3335

3436
/**
3537
*
@@ -38,10 +40,19 @@ public abstract class BucketsAggregator extends Aggregator {
3840

3941
private LongArray docCounts;
4042

43+
private final Aggregator[] collectableSugAggregators;
44+
4145
public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories,
4246
long estimatedBucketsCount, AggregationContext context, Aggregator parent) {
4347
super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent);
4448
docCounts = BigArrays.newLongArray(estimatedBucketsCount, context.pageCacheRecycler(), true);
49+
List<Aggregator> collectables = new ArrayList<Aggregator>(subAggregators.length);
50+
for (int i = 0; i < subAggregators.length; i++) {
51+
if (subAggregators[i].shouldCollect()) {
52+
collectables.add((subAggregators[i]));
53+
}
54+
}
55+
collectableSugAggregators = collectables.toArray(new Aggregator[collectables.size()]);
4556
}
4657

4758
/**
@@ -50,17 +61,17 @@ public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMod
5061
protected final void collectBucket(int doc, long bucketOrd) throws IOException {
5162
docCounts = BigArrays.grow(docCounts, bucketOrd + 1);
5263
docCounts.increment(bucketOrd, 1);
53-
for (int i = 0; i < subAggregators.length; i++) {
54-
subAggregators[i].collect(doc, bucketOrd);
64+
for (int i = 0; i < collectableSugAggregators.length; i++) {
65+
collectableSugAggregators[i].collect(doc, bucketOrd);
5566
}
5667
}
5768

5869
/**
5970
* Utility method to collect the given doc in the given bucket but not to update the doc counts of the bucket
6071
*/
6172
protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOException {
62-
for (int i = 0; i < subAggregators.length; i++) {
63-
subAggregators[i].collect(doc, bucketOrd);
73+
for (int i = 0; i < collectableSugAggregators.length; i++) {
74+
collectableSugAggregators[i].collect(doc, bucketOrd);
6475
}
6576
}
6677

src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalOrder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.common.io.stream.StreamOutput;
2424
import org.elasticsearch.common.xcontent.XContentBuilder;
25+
import org.elasticsearch.search.aggregations.bucket.Bucket;
2526

2627
import java.io.IOException;
2728
import java.util.Comparator;
@@ -70,11 +71,11 @@ static class Aggregation extends InternalOrder {
7071
static final byte ID = 0;
7172

7273
Aggregation(String key, boolean asc) {
73-
super(ID, key, asc, new HistogramBase.Bucket.Comparator<HistogramBase.Bucket>(key, asc));
74+
super(ID, key, asc, new Bucket.SubAggregationComparator<HistogramBase.Bucket>(key, asc));
7475
}
7576

7677
Aggregation(String aggName, String valueName, boolean asc) {
77-
super(ID, key(aggName, valueName), asc, new HistogramBase.Bucket.Comparator<HistogramBase.Bucket>(aggName, valueName, asc));
78+
super(ID, key(aggName, valueName), asc, new Bucket.SubAggregationComparator<HistogramBase.Bucket>(aggName, valueName, asc));
7879
}
7980

8081
private static String key(String aggName, String valueName) {

src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public InternalTerms reduce(ReduceContext reduceContext) {
147147

148148
// TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ?
149149
final int size = Math.min(requiredSize, buckets.v().size());
150-
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator());
150+
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
151151
boolean[] states = buckets.v().allocated;
152152
Object[] internalBuckets = buckets.v().values;
153153
for (int i = 0; i < states.length; i++) {

src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.index.fielddata.DoubleValues;
2424
import org.elasticsearch.search.aggregations.Aggregator;
2525
import org.elasticsearch.search.aggregations.AggregatorFactories;
26-
import org.elasticsearch.search.aggregations.InternalAggregations;
2726
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
2827
import org.elasticsearch.search.aggregations.bucket.LongHash;
2928
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
@@ -49,7 +48,7 @@ public DoubleTermsAggregator(String name, AggregatorFactories factories, Numeric
4948
InternalOrder order, int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) {
5049
super(name, BucketAggregationMode.PER_BUCKET, factories, estimatedBucketCount, aggregationContext, parent);
5150
this.valuesSource = valuesSource;
52-
this.order = order;
51+
this.order = InternalOrder.validate(order, this);
5352
this.requiredSize = requiredSize;
5453
this.shardSize = shardSize;
5554
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.pageCacheRecycler());
@@ -77,24 +76,13 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException {
7776
}
7877
}
7978

80-
// private impl that stores a bucket ord. This allows for computing the aggregations lazily.
81-
static class OrdinalBucket extends DoubleTerms.Bucket {
82-
83-
long bucketOrd;
84-
85-
public OrdinalBucket() {
86-
super(0, 0, (InternalAggregations) null);
87-
}
88-
89-
}
90-
9179
@Override
9280
public DoubleTerms buildAggregation(long owningBucketOrdinal) {
9381
assert owningBucketOrdinal == 0;
9482
final int size = (int) Math.min(bucketOrds.size(), shardSize);
9583

96-
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator());
97-
OrdinalBucket spare = null;
84+
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
85+
DoubleTerms.Bucket spare = null;
9886
for (long i = 0; i < bucketOrds.capacity(); ++i) {
9987
final long ord = bucketOrds.id(i);
10088
if (ord < 0) {
@@ -103,17 +91,17 @@ public DoubleTerms buildAggregation(long owningBucketOrdinal) {
10391
}
10492

10593
if (spare == null) {
106-
spare = new OrdinalBucket();
94+
spare = new DoubleTerms.Bucket(0, 0, null);
10795
}
10896
spare.term = Double.longBitsToDouble(bucketOrds.key(i));
10997
spare.docCount = bucketDocCount(ord);
11098
spare.bucketOrd = ord;
111-
spare = (OrdinalBucket) ordered.insertWithOverflow(spare);
99+
spare = (DoubleTerms.Bucket) ordered.insertWithOverflow(spare);
112100
}
113101

114102
final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
115103
for (int i = ordered.size() - 1; i >= 0; --i) {
116-
final OrdinalBucket bucket = (OrdinalBucket) ordered.pop();
104+
final DoubleTerms.Bucket bucket = (DoubleTerms.Bucket) ordered.pop();
117105
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
118106
list[i] = bucket;
119107
}

0 commit comments

Comments
 (0)