Skip to content

Commit c1e4045

Browse files
committed
implementing snappy-block encoding
Signed-off-by: Alan Protasio <[email protected]>
1 parent 6099649 commit c1e4045

File tree

3 files changed

+203
-24
lines changed

3 files changed

+203
-24
lines changed

pkg/util/grpcencoding/encoding_test.go

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package grpcencoding
33
import (
44
"bytes"
55
"io"
6+
"io/ioutil"
67
"strings"
78
"testing"
89

910
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy"
11+
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"
1012
"github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
@@ -20,6 +22,9 @@ func TestCompressors(t *testing.T) {
2022
{
2123
name: snappy.Name,
2224
},
25+
{
26+
name: snappyblock.Name,
27+
},
2328
{
2429
name: zstd.Name,
2530
},
@@ -42,30 +47,43 @@ func testCompress(name string, t *testing.T) {
4247
}{
4348
{"empty", ""},
4449
{"short", "hello world"},
45-
{"long", strings.Repeat("123456789", 1024)},
50+
{"long", strings.Repeat("123456789", 2024)},
4651
}
4752
for _, test := range tests {
4853
t.Run(test.test, func(t *testing.T) {
49-
var buf bytes.Buffer
54+
buf := &bytes.Buffer{}
5055
// Compress
51-
w, err := c.Compress(&buf)
56+
w, err := c.Compress(buf)
5257
require.NoError(t, err)
5358
n, err := w.Write([]byte(test.input))
5459
require.NoError(t, err)
5560
assert.Len(t, test.input, n)
5661
err = w.Close()
5762
require.NoError(t, err)
63+
compressedBytes := buf.Bytes()
64+
buf = bytes.NewBuffer(compressedBytes)
65+
5866
// Decompress
59-
r, err := c.Decompress(&buf)
67+
r, err := c.Decompress(buf)
6068
require.NoError(t, err)
6169
out, err := io.ReadAll(r)
6270
require.NoError(t, err)
6371
assert.Equal(t, test.input, string(out))
72+
73+
if sizer, ok := c.(interface {
74+
DecompressedSize(compressedBytes []byte) int
75+
}); ok {
76+
buf = bytes.NewBuffer(compressedBytes)
77+
r, err := c.Decompress(buf)
78+
require.NoError(t, err)
79+
out, err := io.ReadAll(r)
80+
assert.Equal(t, len(out), sizer.DecompressedSize(compressedBytes))
81+
}
6482
})
6583
}
6684
}
6785

68-
func BenchmarkSnappyCompress(b *testing.B) {
86+
func BenchmarkCompress(b *testing.B) {
6987
data := []byte(strings.Repeat("123456789", 1024))
7088

7189
testCases := []struct {
@@ -74,6 +92,9 @@ func BenchmarkSnappyCompress(b *testing.B) {
7492
{
7593
name: snappy.Name,
7694
},
95+
{
96+
name: snappyblock.Name,
97+
},
7798
{
7899
name: zstd.Name,
79100
},
@@ -88,11 +109,12 @@ func BenchmarkSnappyCompress(b *testing.B) {
88109
_, _ = w.Write(data)
89110
_ = w.Close()
90111
}
112+
b.ReportAllocs()
91113
})
92114
}
93115
}
94116

95-
func BenchmarkSnappyDecompress(b *testing.B) {
117+
func BenchmarkDecompress(b *testing.B) {
96118
data := []byte(strings.Repeat("123456789", 1024))
97119

98120
testCases := []struct {
@@ -101,6 +123,9 @@ func BenchmarkSnappyDecompress(b *testing.B) {
101123
{
102124
name: snappy.Name,
103125
},
126+
{
127+
name: snappyblock.Name,
128+
},
104129
{
105130
name: zstd.Name,
106131
},
@@ -112,13 +137,40 @@ func BenchmarkSnappyDecompress(b *testing.B) {
112137
var buf bytes.Buffer
113138
w, _ := c.Compress(&buf)
114139
_, _ = w.Write(data)
115-
reader := bytes.NewReader(buf.Bytes())
140+
w.Close()
116141
b.ResetTimer()
117142
for i := 0; i < b.N; i++ {
118-
r, _ := c.Decompress(reader)
119-
_, _ = io.ReadAll(r)
120-
_, _ = reader.Seek(0, io.SeekStart)
143+
_, _, err := decompress(c, buf.Bytes(), 10000)
144+
require.NoError(b, err)
121145
}
146+
b.ReportAllocs()
122147
})
123148
}
124149
}
150+
151+
// This function was copied from: https://github.com/grpc/grpc-go/blob/70c52915099a3b30848d0cb22e2f8951dd5aed7f/rpc_util.go#L765
152+
func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) {
153+
dcReader, err := compressor.Decompress(bytes.NewReader(d))
154+
if err != nil {
155+
return nil, 0, err
156+
}
157+
if sizer, ok := compressor.(interface {
158+
DecompressedSize(compressedBytes []byte) int
159+
}); ok {
160+
if size := sizer.DecompressedSize(d); size >= 0 {
161+
if size > maxReceiveMessageSize {
162+
return nil, size, nil
163+
}
164+
// size is used as an estimate to size the buffer, but we
165+
// will read more data if available.
166+
// +MinRead so ReadFrom will not reallocate if size is correct.
167+
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
168+
bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
169+
return buf.Bytes(), int(bytesRead), err
170+
}
171+
}
172+
// Read from LimitReader with limit max+1. So if the underlying
173+
// reader is over limit, the result will be bigger than max.
174+
d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
175+
return d, len(d), err
176+
}

pkg/util/grpcencoding/snappy/snappy.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,6 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
5151
return reader{dr, &c.readersPool}, nil
5252
}
5353

54-
// If a Compressor implements DecompressedSize(compressedBytes []byte) int,
55-
// gRPC will call it to determine the size of the buffer allocated for the
56-
// result of decompression.
57-
// Return -1 to indicate unknown size.
58-
//
59-
// This is an EXPERIMENTAL feature of grpc-go.
60-
func (c *compressor) DecompressedSize(compressedBytes []byte) int {
61-
decompressedSize, err := snappy.DecodedLen(compressedBytes)
62-
if err != nil {
63-
return -1
64-
}
65-
return decompressedSize
66-
}
67-
6854
type writeCloser struct {
6955
writer *snappy.Writer
7056
pool *sync.Pool
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package snappyblock
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"sync"
7+
8+
"github.com/golang/snappy"
9+
"google.golang.org/grpc/encoding"
10+
)
11+
12+
// Name is the name registered for the snappy compressor.
13+
const Name = "snappy-block"
14+
15+
func init() {
16+
encoding.RegisterCompressor(newCompressor())
17+
}
18+
19+
type compressor struct {
20+
writersPool sync.Pool
21+
readersPool sync.Pool
22+
}
23+
24+
func newCompressor() *compressor {
25+
c := &compressor{}
26+
c.readersPool = sync.Pool{
27+
New: func() interface{} {
28+
return &reader{
29+
pool: &c.readersPool,
30+
cbuff: bytes.NewBuffer(make([]byte, 0, 512)),
31+
}
32+
},
33+
}
34+
c.writersPool = sync.Pool{
35+
New: func() interface{} {
36+
return &writeCloser{
37+
pool: &c.writersPool,
38+
buff: bytes.NewBuffer(make([]byte, 0, 512)),
39+
}
40+
},
41+
}
42+
return c
43+
}
44+
45+
func (c *compressor) Name() string {
46+
return Name
47+
}
48+
49+
func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
50+
wr := c.writersPool.Get().(*writeCloser)
51+
wr.Reset(w)
52+
return wr, nil
53+
}
54+
55+
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
56+
dr := c.readersPool.Get().(*reader)
57+
err := dr.Reset(r)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
return dr, nil
63+
}
64+
65+
// DecompressedSize If a Compressor implements DecompressedSize(compressedBytes []byte) int,
66+
// gRPC will call it to determine the size of the buffer allocated for the
67+
// result of decompression.
68+
// Return -1 to indicate unknown size.
69+
//
70+
// This is an EXPERIMENTAL feature of grpc-go.
71+
func (c *compressor) DecompressedSize(compressedBytes []byte) int {
72+
decompressedSize, err := snappy.DecodedLen(compressedBytes)
73+
if err != nil {
74+
return -1
75+
}
76+
return decompressedSize
77+
}
78+
79+
type writeCloser struct {
80+
i io.Writer
81+
pool *sync.Pool
82+
buff *bytes.Buffer
83+
84+
dst []byte
85+
}
86+
87+
func (w *writeCloser) Reset(i io.Writer) {
88+
w.i = i
89+
}
90+
91+
func (w *writeCloser) Write(p []byte) (n int, err error) {
92+
return w.buff.Write(p)
93+
}
94+
95+
func (w *writeCloser) Close() error {
96+
defer func() {
97+
w.buff.Reset()
98+
w.dst = w.dst[:0]
99+
w.pool.Put(w)
100+
}()
101+
102+
if w.i != nil {
103+
w.dst = snappy.Encode(w.dst, w.buff.Bytes())
104+
w.i.Write(w.dst)
105+
}
106+
107+
return nil
108+
}
109+
110+
type reader struct {
111+
pool *sync.Pool
112+
cbuff *bytes.Buffer
113+
dbuff *bytes.Buffer
114+
dst []byte
115+
}
116+
117+
func (r *reader) Reset(ir io.Reader) error {
118+
_, err := r.cbuff.ReadFrom(ir)
119+
120+
if err != nil {
121+
return err
122+
}
123+
124+
r.dst, err = snappy.Decode(r.dst, r.cbuff.Bytes())
125+
126+
if err != nil {
127+
return err
128+
}
129+
130+
r.dbuff = bytes.NewBuffer(r.dst)
131+
return nil
132+
}
133+
134+
func (r *reader) Read(p []byte) (n int, err error) {
135+
n, err = r.dbuff.Read(p)
136+
if err == io.EOF {
137+
r.cbuff.Reset()
138+
r.pool.Put(r)
139+
}
140+
return n, err
141+
}

0 commit comments

Comments
 (0)