Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/segmentwriter/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/grafana/pyroscope/pkg/model/pprofsplit"
pprofmodel "github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/segmentwriter/memdb"
"github.com/grafana/pyroscope/pkg/util/bufferpool"
"github.com/grafana/pyroscope/pkg/util/retry"
)

Expand Down Expand Up @@ -238,12 +239,14 @@ func (s *segment) flush(ctx context.Context) (err error) {
return nil
}

// TODO(kolesnikovae): Use buffer pool for blockData.
// blockData is a buffer taken from pool, so it needs to be returned
// bufferpool.Put handles nil value safely
blockData, blockMeta, err := s.flushBlock(stream)
defer bufferpool.Put(blockData)
if err != nil {
return fmt.Errorf("failed to flush block %s: %w", s.ulid.String(), err)
}
if err = s.sw.uploadBlock(ctx, blockData, blockMeta, s); err != nil {
if err = s.sw.uploadBlock(ctx, blockData.Bytes(), blockMeta, s); err != nil {
return fmt.Errorf("failed to upload block %s: %w", s.ulid.String(), err)
}
if err = s.sw.storeMetadata(ctx, blockMeta, s); err != nil {
Expand All @@ -253,7 +256,7 @@ func (s *segment) flush(ctx context.Context) (err error) {
return nil
}

func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta, error) {
func (s *segment) flushBlock(stream flushStream) (*bufferpool.Buffer, *metastorev1.BlockMeta, error) {
start := time.Now()
hostname, _ := os.Hostname()

Expand All @@ -271,7 +274,7 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta
Datasets: make([]*metastorev1.Dataset, 0, len(stream.heads)),
}

blockFile := bytes.NewBuffer(nil)
blockFile := bufferpool.GetBuffer(2 * 1024) // reserve extra to avoid resizes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe remember the size of the buffer between flushes?

Also 2k is likely too short

Not sure if you have access https://ops.grafana-ops.net/goto/bey8h9q72mu4gd?orgId=1

image

Do you know why was it divided by 1024? Was it a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The division was a typo - sorry about that, already fixed before comment.

Good point about buffer size - I tried to take a safe route, but it looks like - esp. considering how bufferpool works - it won't work. Will rethink


w := &writerOffset{Writer: blockFile}
for stream.Next() {
Expand All @@ -297,7 +300,7 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta
}
meta.Size = uint64(w.offset)
s.debuginfo.flushBlockDuration = time.Since(start)
return blockFile.Bytes(), meta, nil
return blockFile, meta, nil
}

type writerOffset struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/bufferpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,12 @@ func (b *Buffer) ReadFrom(r io.Reader) (int64, error) {
b.B = buf.Bytes()
return n, err
}

func (b *Buffer) Write(p []byte) (n int, err error) {
b.B = append(b.B, p...)
return len(p), nil
}

func (b *Buffer) Bytes() []byte {
return b.B
}
Loading