Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/engine/basic_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Re
var builder ResultBuilder
switch params.GetExpression().(type) {
case syntax.LogSelectorExpr:
builder = newStreamsResultBuilder()
builder = newStreamsResultBuilder(params.Direction())
case syntax.SampleExpr:
if params.Step() > 0 {
builder = newMatrixResultBuilder()
Expand Down
20 changes: 18 additions & 2 deletions pkg/engine/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ var (
_ ResultBuilder = &matrixResultBuilder{}
)

func newStreamsResultBuilder() *streamsResultBuilder {
func newStreamsResultBuilder(dir logproto.Direction) *streamsResultBuilder {
return &streamsResultBuilder{
direction: dir,
data: make(logqlmodel.Streams, 0),
streams: make(map[string]int),
rowBuilders: nil,
}
}

type streamsResultBuilder struct {
direction logproto.Direction

streams map[string]int
data logqlmodel.Streams
count int
Expand Down Expand Up @@ -213,7 +216,7 @@ func (b *streamsResultBuilder) resetRowBuilder(i int) {
}

func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) {
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
for rowIdx := range numRows {
if col.IsNull(rowIdx) {
continue
}
Expand All @@ -222,6 +225,19 @@ func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int))
}

func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result {
// Executor does not guarantee order of entries, so we sort them here.
for _, stream := range b.data {
if b.direction == logproto.BACKWARD {
sort.Slice(stream.Entries, func(a, b int) bool {
return stream.Entries[a].Timestamp.After(stream.Entries[b].Timestamp)
})
} else {
sort.Slice(stream.Entries, func(a, b int) bool {
return stream.Entries[a].Timestamp.Before(stream.Entries[b].Timestamp)
})
}
}

sort.Sort(b.data)
return logqlmodel.Result{
Data: b.data,
Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/compat_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ func BenchmarkStreamsResultBuilder(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
rb := newStreamsResultBuilder()
rb := newStreamsResultBuilder(logproto.BACKWARD)
// Collect records twice on purpose to see how efficient CollectRecord is when the builder already has
// some data
rb.CollectRecord(record1)
Expand Down
12 changes: 6 additions & 6 deletions pkg/engine/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func TestStreamsResultBuilder(t *testing.T) {
t.Run("empty builder returns non-nil result", func(t *testing.T) {
builder := newStreamsResultBuilder()
builder := newStreamsResultBuilder(logproto.BACKWARD)
md, _ := metadata.NewContext(t.Context())
require.NotNil(t, builder.Build(stats.Result{}, md).Data)
})
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestStreamsResultBuilder(t *testing.T) {
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()

builder := newStreamsResultBuilder()
builder := newStreamsResultBuilder(logproto.BACKWARD)
err := collectResult(context.Background(), pipeline, builder)

require.NoError(t, err)
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestStreamsResultBuilder(t *testing.T) {
pipeline := executor.NewBufferedPipeline(record)
defer pipeline.Close()

builder := newStreamsResultBuilder()
builder := newStreamsResultBuilder(logproto.BACKWARD)
err := collectResult(context.Background(), pipeline, builder)

require.NoError(t, err)
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestStreamsResultBuilder(t *testing.T) {
record2 := rows2.Record(memory.DefaultAllocator, schema)
defer record2.Release()

builder := newStreamsResultBuilder()
builder := newStreamsResultBuilder(logproto.FORWARD)

// Collect first record
builder.CollectRecord(record1)
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestStreamsResultBuilder(t *testing.T) {
nil,
)

builder := newStreamsResultBuilder()
builder := newStreamsResultBuilder(logproto.BACKWARD)

// First record: 5 rows (buffer grows to 5)
rows1 := make(arrowtest.Rows, 5)
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestStreamsResultBuilder(t *testing.T) {
nil,
)

builder := newStreamsResultBuilder()
builder := newStreamsResultBuilder(logproto.BACKWARD)

// First record: 3 valid rows
rows1 := make(arrowtest.Rows, 3)
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (e *Engine) collectResult(ctx context.Context, logger log.Logger, params lo
var builder ResultBuilder
switch params.GetExpression().(type) {
case syntax.LogSelectorExpr:
builder = newStreamsResultBuilder()
builder = newStreamsResultBuilder(params.Direction())
case syntax.SampleExpr:
if params.Step() > 0 {
builder = newMatrixResultBuilder()
Expand Down
18 changes: 12 additions & 6 deletions pkg/engine/internal/arrowagg/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,21 @@ func newMapping(schema *arrow.Schema, to []arrow.Field) *mapping {
checked: make(map[*arrow.Schema]struct{}),
}

for i, field := range to {
for i, target := range to {
// Default to -1 for fields that are not found in the schema.
mapping.lookups[i] = -1

for j, schemaField := range schema.Fields() {
if field.Equal(schemaField) {
mapping.lookups[i] = j
break
}
fieldIdxs := schema.FieldIndices(target.Name)
if len(fieldIdxs) == 0 {
continue
} else if len(fieldIdxs) > 1 {
// this should not occur as FQN should make field names unique.
panic("mapper: multiple fields with the same name in schema")
}

// this check might be unnecessary given FQN uniqueness?
if schema.Field(fieldIdxs[0]).Equal(target) {
mapping.lookups[i] = fieldIdxs[0]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/scalar"
)

Expand Down Expand Up @@ -136,3 +137,89 @@ func compareScalars(left, right scalar.Scalar, nullsFirst bool) (int, error) {

return 0, nil
}

// compareArrays compares values at the given indices from two arrays, returning:
//
// - -1 if left < right
// - 0 if left == right
// - 1 if left > right
//
// If nullsFirst is true, then null values are considered to sort before
// non-null values.
//
// compareArrays returns an error if the two arrays are of different types,
// or if the array type is not supported for comparison.
func compareArrays(left, right arrow.Array, leftIdx, rightIdx int, nullsFirst bool) (int, error) {
leftNull := left == nil || !left.IsValid(leftIdx)
rightNull := right == nil || !right.IsValid(rightIdx)

// First, handle one or both of the values being null.
switch {
case leftNull && rightNull:
return 0, nil

case leftNull && !rightNull:
if nullsFirst {
return -1, nil
}
return 1, nil

case !leftNull && rightNull:
if nullsFirst {
return 1, nil
}
return -1, nil
}

if !arrow.TypeEqual(left.DataType(), right.DataType()) {
// We should never hit this, since compareRow is only called for two arrays
// coming from the same [arrow.Field].
return 0, errors.New("received arrays of different types")
}

// Fast-path: if both arrays reference the same underlying data and same index,
// they're equal. This is an optimization for common cases.
if left == right && leftIdx == rightIdx {
return 0, nil
}

// Switch on the array type to compare the values. This is only composed of
// types we know the query engine uses, and types that we know have clear
// sorting semantics.
//
// Unsupported types are treated as equal for consistent sorting, but
// otherwise it's up to the caller to detect unexpected sort types and reject
// the query.
switch left := left.(type) {
case *array.Binary:
right := right.(*array.Binary)
return bytes.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

case *array.Duration:
right := right.(*array.Duration)
return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

case *array.Float64:
right := right.(*array.Float64)
return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

case *array.Uint64:
right := right.(*array.Uint64)
return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

case *array.Int64:
right := right.(*array.Int64)
return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

case *array.String:
right := right.(*array.String)
return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

case *array.Timestamp:
right := right.(*array.Timestamp)
return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil

}

return 0, nil
}
76 changes: 54 additions & 22 deletions pkg/engine/internal/executor/topk_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ import (

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"

"github.com/grafana/loki/v3/pkg/engine/internal/arrowagg"
"github.com/grafana/loki/v3/pkg/util/topk"
)

// topkBatch calculates the top K rows from a stream of [arrow.Record]s, where
// rows are sorted by the specified Fields.
// rows are ranked by the specified Fields.
//
// Rows with equal values for all sort fields are sorted by the order in which
// Rows with equal values for all the fields are ranked by the order in which
// they were appended.
//
// topkBatch only identifies which rows belong in the top K, but does not
// guarantee any specific ordering of those rows in the compacted output. Callers
// should sort the result if a specific order is required.
type topkBatch struct {
// Fields holds the list of fields to sort by, in order of precedence. If an
// incoming record is missing one of these fields, the value for that field
Expand Down Expand Up @@ -144,25 +147,17 @@ func (b *topkBatch) less(left, right *topkReference) bool {
leftArray := b.findRecordArray(left.Record, b.mapper, fieldIndex)
rightArray := b.findRecordArray(right.Record, b.mapper, fieldIndex)

var leftScalar, rightScalar scalar.Scalar

if leftArray != nil {
leftScalar, _ = scalar.GetScalar(leftArray, left.Row)
}
if rightArray != nil {
rightScalar, _ = scalar.GetScalar(rightArray, right.Row)
}

res, err := compareScalars(leftScalar, rightScalar, b.NullsFirst)
// Compare directly from arrays without creating scalars to avoid allocations
res, err := compareArrays(leftArray, rightArray, left.Row, right.Row, b.NullsFirst)
if err != nil {
// Treat failure to compare scalars as equal, so that the sort order is
// Treat failure to compare as equal, so that the sort order is
// consistent. This should only happen when given invalid values to
// compare, as we know leftScalar and rightScalar are of the same type.
// compare, as we know leftArray and rightArray are of the same type.
continue
}
switch res {
case 0: // left == right
// Continue to the next field if two scalars are equal.
// Continue to the next field if two values are equal.
continue
case -1: // left < right
return true
Expand Down Expand Up @@ -215,9 +210,9 @@ func (b *topkBatch) Size() (rows int, unused int) {
// the current top K rows.
//
// The returned record will have a combined schema from all of the input
// records. The sort order of fields in the returned record is not guaranteed.
// Rows that did not have one of the combined fields will be filled with null
// values for those fields.
// records. Neither the order of fields nor the order of rows in the returned
// record is guaranteed. Rows that did not have one of the
// combined fields will be filled with null values for those fields.
//
// Compact returns nil if no rows are in the top K.
func (b *topkBatch) Compact() arrow.Record {
Expand All @@ -228,13 +223,23 @@ func (b *topkBatch) Compact() arrow.Record {

// Get all row references to compact.
rowRefs := b.heap.PopAll()
slices.Reverse(rowRefs)

compactor := arrowagg.NewRecords(memory.DefaultAllocator)
recordRows := make(map[arrow.Record][]int, len(b.usedCount))
for _, ref := range rowRefs {
compactor.AppendSlice(ref.Record, int64(ref.Row), int64(ref.Row)+1)
recordRows[ref.Record] = append(recordRows[ref.Record], ref.Row)
}

compactor := arrowagg.NewRecords(memory.DefaultAllocator)
for rec, rows := range recordRows {
slices.Sort(rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it even possible that rows is not sorted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think so atleast incase of global topK as the records returned by local topK are no more guaranteed to be sorted

iterContiguousRanges(rows, func(start, end int) bool {
compactor.AppendSlice(rec, int64(start), int64(end))
return true
})
}

// Rows are grouped by their source record and appended
// in contiguous ranges for efficiency.
compacted, err := compactor.Aggregate()
if err != nil {
// Aggregate should only fail if we didn't aggregate anything, which we
Expand All @@ -257,3 +262,30 @@ func (b *topkBatch) Reset() {
clear(b.usedCount)
clear(b.usedSchemas)
}

// iterContiguousRanges iterates over contiguous ranges of row indices from a sorted
// slice. Rows must be sorted in ascending order.
//
// For example, if rows is [1, 2, 3, 5, 6, 7], it will yield two ranges:
// [1, 4) and [5, 8), representing the contiguous sequences.
//
// The function calls yield for each contiguous range found. If yield returns false,
// iteration stops.
func iterContiguousRanges(rows []int, yield func(start, end int) bool) {
if len(rows) == 0 {
return
}

startRow := rows[0]
for i := 1; i < len(rows); i++ {
// If current row is not contiguous with previous, yield the previous range
if rows[i] != rows[i-1]+1 {
if !yield(startRow, rows[i-1]+1) {
return
}
startRow = rows[i]
}
}
// Yield the final contiguous range
yield(startRow, rows[len(rows)-1]+1)
}
Loading