From 4dcdd0381156f23d84abb18093985abf823f449c Mon Sep 17 00:00:00 2001 From: khushjain Date: Sun, 3 May 2026 12:33:24 -0400 Subject: [PATCH 1/3] add support for countDist in rollup for streaming expressions --- ...SOLR-18220-support-countdist-in-rollup.yml | 7 ++++ .../pages/stream-decorator-reference.adoc | 5 +-- .../stream/metrics/CountDistinctMetric.java | 18 ++++++---- .../solrj/io/stream/StreamDecoratorTest.java | 33 ++++++++++++++++++- .../client/solrj/io/stream/StreamingTest.java | 25 ++++++++++++-- 5 files changed, 77 insertions(+), 11 deletions(-) create mode 100644 changelog/unreleased/SOLR-18220-support-countdist-in-rollup.yml diff --git a/changelog/unreleased/SOLR-18220-support-countdist-in-rollup.yml b/changelog/unreleased/SOLR-18220-support-countdist-in-rollup.yml new file mode 100644 index 00000000000..abb23ccc64f --- /dev/null +++ b/changelog/unreleased/SOLR-18220-support-countdist-in-rollup.yml @@ -0,0 +1,7 @@ +title: "Support 'countDist' (count distinct) metric in rollup for streaming expressions" +type: added +authors: + - name: khushjain +links: + - name: SOLR-18220 + url: https://issues.apache.org/jira/browse/SOLR-18220 diff --git a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc index 7f14117ef1d..5abd5efad11 100644 --- a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc +++ b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc @@ -1448,7 +1448,7 @@ For faster aggregation over low to moderate cardinality fields, the `facet` func * `StreamExpression` (Mandatory) * `over`: (Mandatory) A list of fields to group by. * `metrics`: (Mandatory) The list of metrics to compute. -Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`, `missing(col)`. +Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`, `missing(col)`, `countDist(col)`. === rollup Syntax @@ -1466,7 +1466,8 @@ rollup( avg(a_i), avg(a_f), count(*), - missing(a_i) + missing(a_i), + countDist(a_i) ) ---- diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java index 678f6eea84c..09171a06672 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountDistinctMetric.java @@ -17,6 +17,7 @@ package org.apache.solr.client.solrj.io.stream.metrics; import java.io.IOException; +import java.util.HashSet; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; @@ -29,6 +30,7 @@ public class CountDistinctMetric extends Metric { public static final String APPROX_COUNT_DISTINCT = "hll"; private String columnName; + private HashSet distinctValues = new HashSet<>(); public CountDistinctMetric(String columnName) { this(columnName, false); @@ -53,6 +55,10 @@ public CountDistinctMetric(StreamExpression expression, StreamFactory factory) expression, functionName)); } + if (1 != expression.getParameters().size()) { + throw new IOException( + String.format(Locale.ROOT, "Invalid expression %s - unknown operands found", expression)); + } init(functionName, columnName); } @@ -66,7 +72,10 @@ private void init(String functionName, String columnName) { @Override public void update(Tuple tuple) { - // Nop for now + Object value = tuple.get(columnName); + if (value != null) { + distinctValues.add(value); + } } @Override @@ -81,14 +90,11 @@ public String[] getColumns() { @Override public Number getValue() { - // No op for now - return null; + return distinctValues.size(); } @Override public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { - return new StreamExpression(getFunctionName()) - .withParameter(columnName) - .withParameter(Boolean.toString(outputLong)); + return new StreamExpression(getFunctionName()).withParameter(columnName); } } diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 58f6d910ada..3ffdf4826db 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -50,6 +50,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; @@ -1363,7 +1364,8 @@ public void testRollupStream() throws Exception { .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) .withFunctionName("avg", MeanMetric.class) - .withFunctionName("count", CountMetric.class); + .withFunctionName("count", CountMetric.class) + .withFunctionName("countDist", CountDistinctMetric.class); StreamExpression expression; TupleStream stream; @@ -1388,6 +1390,8 @@ public void testRollupStream() throws Exception { + "avg(a_i)," + "avg(a_f)," + "count(*)," + + "countDist(a_i)," + + "countDist(a_s)" + ")"); stream = factory.constructStream(expression); stream.setStreamContext(streamContext); @@ -1408,6 +1412,8 @@ public void testRollupStream() throws Exception { Double avgi = tuple.getDouble("avg(a_i)"); Double avgf = tuple.getDouble("avg(a_f)"); Double count = tuple.getDouble("count(*)"); + Double countDistI = tuple.getDouble("countDist(a_i)"); + Double countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello0", bucket); assertEquals(17.0D, sumi, 0.0); @@ -1419,6 +1425,8 @@ public void testRollupStream() throws Exception { assertEquals(4.25D, avgi, 0.0); assertEquals(4.5D, avgf, 0.0); assertEquals(4, count, 0.0); + assertEquals(4, countDistI, 0.0); + assertEquals(1, countDistS, 0.0); tuple = tuples.get(1); bucket = tuple.getString("a_s"); @@ -1431,6 +1439,8 @@ public void testRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello3", bucket); assertEquals(38.0D, sumi, 0.0); @@ -1442,6 +1452,8 @@ public void testRollupStream() throws Exception { assertEquals(9.5D, avgi, 0.0); assertEquals(6.5D, avgf, 0.0); assertEquals(4, count, 0.0); + assertEquals(4, countDistI, 0.0); + assertEquals(1, countDistS, 0.0); tuple = tuples.get(2); bucket = tuple.getString("a_s"); @@ -1454,6 +1466,8 @@ public void testRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello4", bucket); assertEquals(15, sumi.longValue()); @@ -1465,6 +1479,8 @@ public void testRollupStream() throws Exception { assertEquals(7.5D, avgi, 0.0); assertEquals(5.5D, avgf, 0.0); assertEquals(2, count, 0.0); + assertEquals(2, countDistI, 0.0); + assertEquals(1, countDistS, 0.0); } finally { solrClientCache.close(); @@ -1497,6 +1513,7 @@ public void testHashRollupStream() throws Exception { .withFunctionName("max", MaxMetric.class) .withFunctionName("avg", MeanMetric.class) .withFunctionName("count", CountMetric.class) + .withFunctionName("countDist", CountDistinctMetric.class) .withFunctionName("sort", SortStream.class); StreamExpression expression; @@ -1522,6 +1539,8 @@ public void testHashRollupStream() throws Exception { + "avg(a_i)," + "avg(a_f)," + "count(*)," + + "countDist(a_i)," + + "countDist(a_s)" + "), by=\"avg(a_f) asc\")"); stream = factory.constructStream(expression); stream.setStreamContext(streamContext); @@ -1542,6 +1561,8 @@ public void testHashRollupStream() throws Exception { Double avgi = tuple.getDouble("avg(a_i)"); Double avgf = tuple.getDouble("avg(a_f)"); Double count = tuple.getDouble("count(*)"); + Double countDistI = tuple.getDouble("countDist(a_i)"); + Double countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello0", bucket); assertEquals(17.0D, sumi, 0.0); @@ -1553,6 +1574,8 @@ public void testHashRollupStream() throws Exception { assertEquals(4.25D, avgi, 0.0); assertEquals(4.5D, avgf, 0.0); assertEquals(4, count, 0.0); + assertEquals(4, countDistI, 0.0); + assertEquals(1, countDistS, 0.0); tuple = tuples.get(1); bucket = tuple.getString("a_s"); @@ -1565,6 +1588,8 @@ public void testHashRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); System.out.println("################:bucket" + bucket); @@ -1578,6 +1603,8 @@ public void testHashRollupStream() throws Exception { assertEquals(7.5D, avgi, 0.0); assertEquals(5.5D, avgf, 0.0); assertEquals(2, count, 0.0); + assertEquals(2, countDistI, 0.0); + assertEquals(1, countDistS, 0.0); tuple = tuples.get(2); bucket = tuple.getString("a_s"); @@ -1590,6 +1617,8 @@ public void testHashRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello3", bucket); assertEquals(38.0D, sumi, 0.0); @@ -1601,6 +1630,8 @@ public void testHashRollupStream() throws Exception { assertEquals(9.5D, avgi, 0.0); assertEquals(6.5D, avgf, 0.0); assertEquals(4, count, 0.0); + assertEquals(4, countDistI, 0.0); + assertEquals(1, countDistS, 0.0); } finally { solrClientCache.close(); diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 601a4fe6f67..2720b6e5a49 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -41,6 +41,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.Bucket; +import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; @@ -1662,7 +1663,9 @@ public void testRollupStream() throws Exception { new MeanMetric("a_i"), new MeanMetric("a_f"), new CountMetric(), - new MissingMetric("b_f") + new MissingMetric("b_f"), + new CountDistinctMetric("a_i"), + new CountDistinctMetric("a_s") }; RollupStream rollupStream = new RollupStream(stream, buckets, metrics); @@ -1685,6 +1688,8 @@ public void testRollupStream() throws Exception { Double avgf = tuple.getDouble("avg(a_f)"); Double count = tuple.getDouble("count(*)"); Double missingBf = tuple.getDouble("missing(b_f)"); + Double countDistI = tuple.getDouble("countDist(a_i)"); + Double countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello0", bucket); assertEquals(17, sumi, 0.001); @@ -1697,6 +1702,8 @@ public void testRollupStream() throws Exception { assertEquals(4.5, avgf, 0.001); assertEquals(4, count, 0.001); assertEquals(2, missingBf, 0.001); + assertEquals(4, countDistI, 0.001); + assertEquals(1, countDistS, 0.001); tuple = tuples.get(1); bucket = tuple.getString("a_s"); @@ -1710,6 +1717,8 @@ public void testRollupStream() throws Exception { avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); missingBf = tuple.getDouble("missing(b_f)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello3", bucket); assertEquals(38, sumi, 0.001); @@ -1722,6 +1731,8 @@ public void testRollupStream() throws Exception { assertEquals(6.5, avgf, 0.001); assertEquals(4, count, 0.001); assertEquals(3, missingBf, 0.001); + assertEquals(4, countDistI, 0.001); + assertEquals(1, countDistS, 0.001); tuple = tuples.get(2); bucket = tuple.getString("a_s"); @@ -1735,6 +1746,8 @@ public void testRollupStream() throws Exception { avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); missingBf = tuple.getDouble("missing(b_f)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello4", bucket); assertEquals(15, sumi.longValue()); @@ -1747,6 +1760,8 @@ public void testRollupStream() throws Exception { assertEquals(5.5, avgf, 0.01); assertEquals(2, count, 0.01); assertEquals(0, missingBf, 0.01); + assertEquals(2, countDistI, 0.01); + assertEquals(1, countDistS, 0.01); // Test will null metrics rollupStream = new RollupStream(stream, buckets, metrics); @@ -1785,7 +1800,9 @@ public void testRollupStream() throws Exception { new MeanMetric("a_i"), new MeanMetric("a_f"), new CountMetric(), - new MissingMetric("b_f") + new MissingMetric("b_f"), + new CountDistinctMetric("a_i"), + new CountDistinctMetric("a_s") }; rollupStream = new RollupStream(stream, buckets1, metrics1); @@ -1806,6 +1823,8 @@ public void testRollupStream() throws Exception { avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); missingBf = tuple.getDouble("missing(b_f)"); + countDistI = tuple.getDouble("countDist(a_i)"); + countDistS = tuple.getDouble("countDist(a_s)"); assertEquals(14, sumi, 0.01); assertEquals(10, sumf, 0.01); @@ -1817,6 +1836,8 @@ public void testRollupStream() throws Exception { assertEquals(10, avgf, 0.01); assertEquals(1, count, 0.01); assertEquals(1, missingBf, 0.01); + assertEquals(1, countDistI, 0.01); + assertEquals(0, countDistS, 0.01); } finally { solrClientCache.close(); } From f146f147f13b399a5b59c4aa33d6a7e46a0e8b66 Mon Sep 17 00:00:00 2001 From: khushjain Date: Sun, 3 May 2026 14:45:32 -0400 Subject: [PATCH 2/3] Update tests --- .../solrj/io/stream/StreamDecoratorTest.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 3ffdf4826db..e8b9538c33f 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -1344,7 +1344,7 @@ public void testRollupStream() throws Exception { new UpdateRequest() .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") - .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") + .add(id, "2", "a_s", "hello0", "a_i", "0", "a_f", "2") .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") @@ -1416,16 +1416,16 @@ public void testRollupStream() throws Exception { Double countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello0", bucket); - assertEquals(17.0D, sumi, 0.0); + assertEquals(15.0D, sumi, 0.0); assertEquals(18.0D, sumf, 0.0); assertEquals(0.0D, mini, 0.0); assertEquals(1.0D, minf, 0.0); assertEquals(14.0D, maxi, 0.0); assertEquals(10.0D, maxf, 0.0); - assertEquals(4.25D, avgi, 0.0); + assertEquals(3.75D, avgi, 0.0); assertEquals(4.5D, avgf, 0.0); assertEquals(4, count, 0.0); - assertEquals(4, countDistI, 0.0); + assertEquals(3, countDistI, 0.0); assertEquals(1, countDistS, 0.0); tuple = tuples.get(1); @@ -1492,7 +1492,7 @@ public void testHashRollupStream() throws Exception { new UpdateRequest() .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") - .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") + .add(id, "2", "a_s", "hello0", "a_i", "0", "a_f", "2") .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") @@ -1565,16 +1565,16 @@ public void testHashRollupStream() throws Exception { Double countDistS = tuple.getDouble("countDist(a_s)"); assertEquals("hello0", bucket); - assertEquals(17.0D, sumi, 0.0); + assertEquals(15.0D, sumi, 0.0); assertEquals(18.0D, sumf, 0.0); assertEquals(0.0D, mini, 0.0); assertEquals(1.0D, minf, 0.0); assertEquals(14.0D, maxi, 0.0); assertEquals(10.0D, maxf, 0.0); - assertEquals(4.25D, avgi, 0.0); + assertEquals(3.75D, avgi, 0.0); assertEquals(4.5D, avgf, 0.0); assertEquals(4, count, 0.0); - assertEquals(4, countDistI, 0.0); + assertEquals(3, countDistI, 0.0); assertEquals(1, countDistS, 0.0); tuple = tuples.get(1); @@ -2024,7 +2024,7 @@ public void testParallelRollupStream() throws Exception { new UpdateRequest() .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") - .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") + .add(id, "2", "a_s", "hello0", "a_i", "0", "a_f", "2") .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") @@ -2101,13 +2101,13 @@ public void testParallelRollupStream() throws Exception { Double count = tuple.getDouble("count(*)"); assertEquals("hello0", bucket); - assertEquals(17.0D, sumi, 0.0); + assertEquals(15.0D, sumi, 0.0); assertEquals(18.0D, sumf, 0.0); assertEquals(0.0D, mini, 0.0); assertEquals(1.0D, minf, 0.0); assertEquals(14.0D, maxi, 0.0); assertEquals(10.0D, maxf, 0.0); - assertEquals(4.25D, avgi, 0.0); + assertEquals(3.75D, avgi, 0.0); assertEquals(4.5D, avgf, 0.0); assertEquals(4, count, 0.0); @@ -2166,7 +2166,7 @@ public void testParallelHashRollupStream() throws Exception { new UpdateRequest() .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") - .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") + .add(id, "2", "a_s", "hello0", "a_i", "0", "a_f", "2") .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") @@ -2244,13 +2244,13 @@ public void testParallelHashRollupStream() throws Exception { Double count = tuple.getDouble("count(*)"); assertEquals("hello0", bucket); - assertEquals(17.0D, sumi, 0.0); + assertEquals(15.0D, sumi, 0.0); assertEquals(18.0D, sumf, 0.0); assertEquals(0.0D, mini, 0.0); assertEquals(1.0D, minf, 0.0); assertEquals(14.0D, maxi, 0.0); assertEquals(10.0D, maxf, 0.0); - assertEquals(4.25D, avgi, 0.0); + assertEquals(3.75D, avgi, 0.0); assertEquals(4.5D, avgf, 0.0); assertEquals(4, count, 0.0); From 0d7fe1a029e48787e7b37ba068807a11c76c1681 Mon Sep 17 00:00:00 2001 From: khushjain Date: Mon, 4 May 2026 10:57:14 -0400 Subject: [PATCH 3/3] Add toExpression test --- .../stream/StreamExpressionToExpressionTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpressionTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpressionTest.java index 37429f7a939..178592c481c 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpressionTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpressionTest.java @@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.metrics.CountDistinctMetric; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; @@ -55,6 +56,7 @@ public StreamExpressionToExpressionTest() { .withFunctionName("intersect", IntersectStream.class) .withFunctionName("complement", ComplementStream.class) .withFunctionName("count", CountMetric.class) + .withFunctionName("countDist", CountDistinctMetric.class) .withFunctionName("sum", SumMetric.class) .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) @@ -626,6 +628,19 @@ public void testCountMetric() throws Exception { assertEquals("count(*)", expressionString); } + @Test + public void testCountDistinctMetric() throws Exception { + + Metric metric; + String expressionString; + + // Basic test + metric = new CountDistinctMetric(StreamExpressionParser.parse("countDist(foo)"), factory); + expressionString = metric.toExpression(factory).toString(); + + assertEquals("countDist(foo)", expressionString); + } + @Test public void testMaxMetric() throws Exception {