Skip to content

Commit 05b35bb

Browse files
authored
Merge pull request #1122 from grafana/20181117_fix_cassandra_storage_tests
fix cassandra storage test fixture, set a correct From parameter
2 parents b5f8b82 + d53e6b3 commit 05b35bb

File tree

10 files changed

+47
-40
lines changed

10 files changed

+47
-40
lines changed

pkg/chunk/aws/dynamodb_storage_client.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,10 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
523523
for _, chunk := range chunks {
524524
key := chunk.ExternalKey()
525525
chunksByKey[key] = chunk
526-
tableName := a.schemaCfg.ChunkTableFor(chunk.From)
526+
tableName, err := a.schemaCfg.ChunkTableFor(chunk.From)
527+
if err != nil {
528+
return nil, err
529+
}
527530
outstanding.Add(tableName, key, placeholder)
528531
}
529532

@@ -646,7 +649,11 @@ func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chu
646649
}
647650
key := chunks[i].ExternalKey()
648651

649-
table := a.schemaCfg.ChunkTableFor(chunks[i].From)
652+
table, err := a.schemaCfg.ChunkTableFor(chunks[i].From)
653+
if err != nil {
654+
return err
655+
}
656+
650657
dynamoDBWrites.Add(table, key, placeholder, buf)
651658
}
652659

pkg/chunk/aws/dynamodb_storage_client_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/prometheus/common/model"
8+
79
"github.com/stretchr/testify/require"
810

911
"github.com/cortexproject/cortex/pkg/chunk/testutils"
@@ -26,7 +28,7 @@ func TestChunksPartialError(t *testing.T) {
2628
}
2729
ctx := context.Background()
2830
// Create more chunks than we can read in one batch
29-
_, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50)
31+
_, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50, model.Now())
3032
require.NoError(t, err)
3133
err = client.PutChunks(ctx, chunks)
3234
require.NoError(t, err)

pkg/chunk/aws/fixtures.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/cortexproject/cortex/pkg/chunk"
88
"github.com/cortexproject/cortex/pkg/chunk/testutils"
99
"github.com/cortexproject/cortex/pkg/util"
10-
"github.com/prometheus/common/model"
1110
)
1211

1312
type fixture struct {
@@ -32,7 +31,7 @@ var Fixtures = []testutils.Fixture{
3231
fixture{
3332
name: "S3 chunks",
3433
clients: func() (chunk.IndexClient, chunk.ObjectClient, chunk.TableClient, chunk.SchemaConfig, error) {
35-
schemaConfig := chunk.SchemaConfig{} // Defaults == S3
34+
schemaConfig := testutils.DefaultSchemaConfig("s3")
3635
dynamoDB := newMockDynamoDB(0, 0)
3736
table := &dynamoTableClient{
3837
DynamoDB: dynamoDB,
@@ -61,16 +60,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix
6160
provisionedErr, gangsize, maxParallelism),
6261
clients: func() (chunk.IndexClient, chunk.ObjectClient, chunk.TableClient, chunk.SchemaConfig, error) {
6362
dynamoDB := newMockDynamoDB(0, provisionedErr)
64-
schemaCfg := chunk.SchemaConfig{
65-
Configs: []chunk.PeriodConfig{{
66-
IndexType: "aws",
67-
From: model.Now(),
68-
ChunkTables: chunk.PeriodicTableConfig{
69-
Prefix: "chunks",
70-
Period: 10 * time.Minute,
71-
},
72-
}},
73-
}
63+
schemaCfg := testutils.DefaultSchemaConfig("aws")
7464
table := &dynamoTableClient{
7565
DynamoDB: dynamoDB,
7666
}

pkg/chunk/cassandra/fixtures.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/cortexproject/cortex/pkg/chunk"
88
"github.com/cortexproject/cortex/pkg/chunk/testutils"
9-
"github.com/prometheus/common/model"
109
)
1110

1211
// GOCQL doesn't provide nice mocks, so we use a real Cassandra instance.
@@ -49,7 +48,7 @@ func Fixtures() ([]testutils.Fixture, error) {
4948
}
5049

5150
// Get a SchemaConfig with the defaults.
52-
schemaConfig := chunk.DefaultSchemaConfig("cassandra", "v1", model.Now())
51+
schemaConfig := testutils.DefaultSchemaConfig("cassandra")
5352

5453
storageClient, err := NewStorageClient(cfg, schemaConfig)
5554
if err != nil {

pkg/chunk/cassandra/storage_client.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,10 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
270270
return errors.WithStack(err)
271271
}
272272
key := chunks[i].ExternalKey()
273-
tableName := s.schemaCfg.ChunkTableFor(chunks[i].From)
273+
tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From)
274+
if err != nil {
275+
return err
276+
}
274277

275278
// Must provide a range key, even though its not useds - hence 0x00.
276279
q := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)",
@@ -289,12 +292,16 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c
289292
}
290293

291294
func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
292-
tableName := s.schemaCfg.ChunkTableFor(input.From)
295+
tableName, err := s.schemaCfg.ChunkTableFor(input.From)
296+
if err != nil {
297+
return input, err
298+
}
299+
293300
var buf []byte
294301
if err := s.session.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()).
295302
WithContext(ctx).Scan(&buf); err != nil {
296303
return input, errors.WithStack(err)
297304
}
298-
err := input.Decode(decodeContext, buf)
305+
err = input.Decode(decodeContext, buf)
299306
return input, err
300307
}

pkg/chunk/gcp/bigtable_object_client.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ func (s *bigtableObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chu
5252
return err
5353
}
5454
key := chunks[i].ExternalKey()
55-
tableName := s.schemaCfg.ChunkTableFor(chunks[i].From)
55+
tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From)
56+
if err != nil {
57+
return err
58+
}
5659
keys[tableName] = append(keys[tableName], key)
5760

5861
mut := bigtable.NewMutation()
@@ -83,7 +86,10 @@ func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chun
8386
chunks := map[string]map[string]chunk.Chunk{}
8487
keys := map[string]bigtable.RowList{}
8588
for _, c := range input {
86-
tableName := s.schemaCfg.ChunkTableFor(c.From)
89+
tableName, err := s.schemaCfg.ChunkTableFor(c.From)
90+
if err != nil {
91+
return nil, err
92+
}
8793
key := c.ExternalKey()
8894
keys[tableName] = append(keys[tableName], key)
8995
if _, ok := chunks[tableName]; !ok {

pkg/chunk/gcp/fixtures.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package gcp
22

33
import (
44
"context"
5-
"time"
65

76
"cloud.google.com/go/bigtable"
87
"cloud.google.com/go/bigtable/bttest"
98
"github.com/fsouza/fake-gcs-server/fakestorage"
10-
"github.com/prometheus/common/model"
119
"google.golang.org/api/option"
1210
"google.golang.org/grpc"
1311

@@ -56,16 +54,7 @@ func (f *fixture) Clients() (
5654
return
5755
}
5856

59-
schemaConfig = chunk.SchemaConfig{
60-
Configs: []chunk.PeriodConfig{{
61-
IndexType: "gcp",
62-
From: model.Now(),
63-
ChunkTables: chunk.PeriodicTableConfig{
64-
Prefix: "chunks",
65-
Period: 10 * time.Minute,
66-
},
67-
}},
68-
}
57+
schemaConfig = testutils.DefaultSchemaConfig("gcp-columnkey")
6958
tClient = &tableClient{
7059
client: adminClient,
7160
}

pkg/chunk/schema_config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -422,13 +422,13 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr
422422
}
423423

424424
// ChunkTableFor calculates the chunk table shard for a given point in time.
425-
func (cfg SchemaConfig) ChunkTableFor(t model.Time) string {
425+
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) {
426426
for i := range cfg.Configs {
427427
if t > cfg.Configs[i].From && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) {
428-
return cfg.Configs[i].ChunkTables.TableFor(t)
428+
return cfg.Configs[i].ChunkTables.TableFor(t), nil
429429
}
430430
}
431-
return ""
431+
return "", fmt.Errorf("no chunk table found for time %v", t)
432432
}
433433

434434
// TableFor calculates the table shard for a given point in time.

pkg/chunk/storage/object_client_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/cortexproject/cortex/pkg/chunk"
1414
"github.com/cortexproject/cortex/pkg/chunk/testutils"
15+
"github.com/prometheus/common/model"
1516
)
1617

1718
func TestChunksBasic(t *testing.T) {
@@ -23,7 +24,7 @@ func TestChunksBasic(t *testing.T) {
2324
// Write a few batches of chunks.
2425
written := []string{}
2526
for i := 0; i < 5; i++ {
26-
keys, chunks, err := testutils.CreateChunks(i, batchSize)
27+
keys, chunks, err := testutils.CreateChunks(i, batchSize, model.Now())
2728
require.NoError(t, err)
2829
written = append(written, keys...)
2930
err = client.PutChunks(ctx, chunks)

pkg/chunk/testutils/testutils.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ type Fixture interface {
2323
Teardown() error
2424
}
2525

26+
// DefaultSchemaConfig returns default schema for use in test fixtures
27+
func DefaultSchemaConfig(kind string) chunk.SchemaConfig {
28+
schemaConfig := chunk.DefaultSchemaConfig(kind, "v1", model.Now().Add(-time.Hour*2))
29+
return schemaConfig
30+
}
31+
2632
// Setup a fixture with initial tables
2733
func Setup(fixture Fixture, tableName string) (chunk.IndexClient, chunk.ObjectClient, error) {
2834
var tbmConfig chunk.TableManagerConfig
@@ -49,11 +55,11 @@ func Setup(fixture Fixture, tableName string) (chunk.IndexClient, chunk.ObjectCl
4955
}
5056

5157
// CreateChunks creates some chunks for testing
52-
func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) {
58+
func CreateChunks(startIndex, batchSize int, start model.Time) ([]string, []chunk.Chunk, error) {
5359
keys := []string{}
5460
chunks := []chunk.Chunk{}
5561
for j := 0; j < batchSize; j++ {
56-
chunk := dummyChunkFor(model.Now(), model.Metric{
62+
chunk := dummyChunkFor(start, model.Metric{
5763
model.MetricNameLabel: "foo",
5864
"index": model.LabelValue(strconv.Itoa(startIndex*batchSize + j)),
5965
})

0 commit comments

Comments
 (0)