From f065efe10375ece301ff7379cc09dfb808ddba32 Mon Sep 17 00:00:00 2001 From: Oleg Kozliuk Date: Tue, 16 Sep 2025 08:31:48 +0000 Subject: [PATCH 1/2] TODO cleanup: use buffer pool instead of generic --- pkg/segmentwriter/segment.go | 13 ++++++++----- pkg/util/bufferpool/pool.go | 9 +++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/segmentwriter/segment.go b/pkg/segmentwriter/segment.go index 4557380d10..aa54c4b97b 100644 --- a/pkg/segmentwriter/segment.go +++ b/pkg/segmentwriter/segment.go @@ -33,6 +33,7 @@ import ( "github.com/grafana/pyroscope/pkg/model/pprofsplit" pprofmodel "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/segmentwriter/memdb" + "github.com/grafana/pyroscope/pkg/util/bufferpool" "github.com/grafana/pyroscope/pkg/util/retry" ) @@ -238,12 +239,14 @@ func (s *segment) flush(ctx context.Context) (err error) { return nil } - // TODO(kolesnikovae): Use buffer pool for blockData. + // blockData is a buffer taken from pool, so it needs to be returned + // bufferpool.Put handles nil value safely blockData, blockMeta, err := s.flushBlock(stream) + defer bufferpool.Put(blockData) if err != nil { return fmt.Errorf("failed to flush block %s: %w", s.ulid.String(), err) } - if err = s.sw.uploadBlock(ctx, blockData, blockMeta, s); err != nil { + if err = s.sw.uploadBlock(ctx, blockData.Bytes(), blockMeta, s); err != nil { return fmt.Errorf("failed to upload block %s: %w", s.ulid.String(), err) } if err = s.sw.storeMetadata(ctx, blockMeta, s); err != nil { @@ -253,7 +256,7 @@ func (s *segment) flush(ctx context.Context) (err error) { return nil } -func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta, error) { +func (s *segment) flushBlock(stream flushStream) (*bufferpool.Buffer, *metastorev1.BlockMeta, error) { start := time.Now() hostname, _ := os.Hostname() @@ -271,7 +274,7 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta Datasets: make([]*metastorev1.Dataset, 0, len(stream.heads)), } - blockFile := bytes.NewBuffer(nil) + blockFile := bufferpool.GetBuffer(len(stream.heads) * 2 / 1024) // rough estimate, 2kb per head w := &writerOffset{Writer: blockFile} for stream.Next() { @@ -297,7 +300,7 @@ func (s *segment) flushBlock(stream flushStream) ([]byte, *metastorev1.BlockMeta } meta.Size = uint64(w.offset) s.debuginfo.flushBlockDuration = time.Since(start) - return blockFile.Bytes(), meta, nil + return blockFile, meta, nil } type writerOffset struct { diff --git a/pkg/util/bufferpool/pool.go b/pkg/util/bufferpool/pool.go index b4fa477460..9ed00851ae 100644 --- a/pkg/util/bufferpool/pool.go +++ b/pkg/util/bufferpool/pool.go @@ -96,3 +96,12 @@ func (b *Buffer) ReadFrom(r io.Reader) (int64, error) { b.B = buf.Bytes() return n, err } + +func (b *Buffer) Write(p []byte) (n int, err error) { + b.B = append(b.B, p...) + return len(p), nil +} + +func (b *Buffer) Bytes() []byte { + return b.B +} From 411cb4f7822b2b4bdbe000614c943858d713d9d8 Mon Sep 17 00:00:00 2001 From: Oleg Kozliuk Date: Tue, 16 Sep 2025 08:38:38 +0000 Subject: [PATCH 2/2] Use fixed initial size --- pkg/segmentwriter/segment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/segmentwriter/segment.go b/pkg/segmentwriter/segment.go index aa54c4b97b..4aca293e7f 100644 --- a/pkg/segmentwriter/segment.go +++ b/pkg/segmentwriter/segment.go @@ -274,7 +274,7 @@ func (s *segment) flushBlock(stream flushStream) (*bufferpool.Buffer, *metastore Datasets: make([]*metastorev1.Dataset, 0, len(stream.heads)), } - blockFile := bufferpool.GetBuffer(len(stream.heads) * 2 / 1024) // rough estimate, 2kb per head + blockFile := bufferpool.GetBuffer(2 * 1024) // reserve extra to avoid resizes w := &writerOffset{Writer: blockFile} for stream.Next() {