Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Nest/Aggregations/AggregationContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public interface IAggregationContainer
[JsonProperty("moving_avg")]
IMovingAverageAggregation MovingAverage { get; set; }

[JsonProperty("moving_fn")]
IMovingFunctionAggregation MovingFunction { get; set; }

[JsonProperty("cumulative_sum")]
ICumulativeSumAggregation CumulativeSum { get; set; }

Expand Down Expand Up @@ -295,6 +298,8 @@ public class AggregationContainer : IAggregationContainer

public IMovingAverageAggregation MovingAverage { get; set; }

public IMovingFunctionAggregation MovingFunction { get; set; }

public ICumulativeSumAggregation CumulativeSum { get; set; }

public ISerialDifferencingAggregation SerialDifferencing { get; set; }
Expand Down Expand Up @@ -430,6 +435,8 @@ public class AggregationContainerDescriptor<T> : DescriptorBase<AggregationConta

IMovingAverageAggregation IAggregationContainer.MovingAverage { get; set; }

IMovingFunctionAggregation IAggregationContainer.MovingFunction { get; set; }

ICumulativeSumAggregation IAggregationContainer.CumulativeSum { get; set; }

ISerialDifferencingAggregation IAggregationContainer.SerialDifferencing { get; set; }
Expand Down Expand Up @@ -606,6 +613,10 @@ public AggregationContainerDescriptor<T> MovingAverage(string name,
Func<MovingAverageAggregationDescriptor, IMovingAverageAggregation> selector) =>
_SetInnerAggregation(name, selector, (a, d) => a.MovingAverage = d);

public AggregationContainerDescriptor<T> MovingFunction(string name,
Func<MovingFunctionAggregationDescriptor, IMovingFunctionAggregation> selector) =>
_SetInnerAggregation(name, selector, (a, d) => a.MovingFunction = d);

public AggregationContainerDescriptor<T> CumulativeSum(string name,
Func<CumulativeSumAggregationDescriptor, ICumulativeSumAggregation> selector) =>
_SetInnerAggregation(name, selector, (a, d) => a.CumulativeSum = d);
Expand Down Expand Up @@ -715,5 +726,6 @@ public void Accept(IAggregationVisitor visitor)
((IAggregationContainer)d).Aggregations = ((IAggregationContainer)left).Aggregations;
return d;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Newtonsoft.Json;

namespace Nest
{
[JsonObject(MemberSerialization = MemberSerialization.OptIn)]
[ContractJsonConverter(typeof(AggregationJsonConverter<MovingFunctionAggregation>))]
public interface IMovingFunctionAggregation : IPipelineAggregation
{
[JsonProperty("window")]
int? Window { get; set; }

[JsonProperty("script")]
string Script { get; set; }
}

public class MovingFunctionAggregation
: PipelineAggregationBase, IMovingFunctionAggregation
{
internal MovingFunctionAggregation () { }

public MovingFunctionAggregation(string name, SingleBucketsPath bucketsPath)
: base(name, bucketsPath) { }

internal override void WrapInContainer(AggregationContainer c) => c.MovingFunction = this;

public int? Window { get; set; }
public string Script { get; set; }
}

public class MovingFunctionAggregationDescriptor
: PipelineAggregationDescriptorBase<MovingFunctionAggregationDescriptor, IMovingFunctionAggregation, SingleBucketsPath>
, IMovingFunctionAggregation
{
int? IMovingFunctionAggregation.Window { get; set; }
string IMovingFunctionAggregation.Script { get; set; }

public MovingFunctionAggregationDescriptor Window(int? windowSize) => Assign(a => a.Window = windowSize);

public MovingFunctionAggregationDescriptor Script(string script) => Assign(a => a.Script = script);
}
}
5 changes: 1 addition & 4 deletions src/Nest/Aggregations/Pipeline/PipelineAggregationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ public abstract class PipelineAggregationBase : AggregationBase, IPipelineAggreg
{
internal PipelineAggregationBase() { }

public PipelineAggregationBase(string name, IBucketsPath bucketsPath) : base(name)
{
this.BucketsPath = bucketsPath;
}
public PipelineAggregationBase(string name, IBucketsPath bucketsPath) : base(name) => this.BucketsPath = bucketsPath;

public IBucketsPath BucketsPath { get; set; }
public string Format { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected override void ExpectResponse(ISearchResponse<Project> response)
projectsPerMonth.Buckets.Should().NotBeNull();
projectsPerMonth.Buckets.Count.Should().BeGreaterThan(0);

// average not calculated for the first bucket
// average not calculated for the first bucket so movingAvg.Value is expected to be null there
foreach(var item in projectsPerMonth.Buckets.Skip(1))
{
var movingAvg = item.Sum("commits_moving_avg");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Linq;
using Elastic.Xunit.XunitPlumbing;
using FluentAssertions;
using Nest;
using Tests.Core.Extensions;
using Tests.Core.ManagedElasticsearch.Clusters;
using Tests.Domain;
using Tests.Framework;
using Tests.Framework.Integration;

namespace Tests.Aggregations.Pipeline.MovingFunction
{
public class MovingFunctionAggregationUsageTests : AggregationUsageTestBase
{
public MovingFunctionAggregationUsageTests(ReadOnlyCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override object AggregationJson => new
{
projects_started_per_month = new
{
date_histogram = new
{
field = "startedOn",
interval = "month",
},
aggs = new
{
commits = new
{
sum = new
{
field = "numberOfCommits"
}
},
commits_moving_avg = new
{
moving_fn = new
{
buckets_path = "commits",
window = 30,
script = "MovingFunctions.unweightedAvg(values)"
}
}
}
}
};

protected override Func<AggregationContainerDescriptor<Project>, IAggregationContainer> FluentAggs => a => a
.DateHistogram("projects_started_per_month", dh => dh
.Field(p => p.StartedOn)
.Interval(DateInterval.Month)
.Aggregations(aa => aa
.Sum("commits", sm => sm
.Field(p => p.NumberOfCommits)
)
.MovingFunction("commits_moving_avg", mv => mv
.BucketsPath("commits")
.Window(30)
.Script("MovingFunctions.unweightedAvg(values)")
)
)
);

protected override AggregationDictionary InitializerAggs =>
new DateHistogramAggregation("projects_started_per_month")
{
Field = "startedOn",
Interval = DateInterval.Month,
Aggregations =
new SumAggregation("commits", "numberOfCommits")
&& new MovingFunctionAggregation("commits_moving_avg", "commits")
{
Window = 30,
Script = "MovingFunctions.unweightedAvg(values)"
}
};

protected override void ExpectResponse(ISearchResponse<Project> response)
{
response.ShouldBeValid();

var projectsPerMonth = response.Aggregations.DateHistogram("projects_started_per_month");
projectsPerMonth.Should().NotBeNull();
projectsPerMonth.Buckets.Should().NotBeNull();
projectsPerMonth.Buckets.Count.Should().BeGreaterThan(0);

// average not calculated for the first bucket
foreach(var item in projectsPerMonth.Buckets.Skip(1))
{
var movingAvg = item.Sum("commits_moving_avg");
movingAvg.Should().NotBeNull();
movingAvg.Value.Should().BeGreaterThan(0);
}
}
}
}