From 5958e2058330f8cb3e506636b766b7cce858afd9 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 4 May 2022 17:19:12 -0700 Subject: [PATCH 1/8] Uploading no compact markers to the global marker index --- pkg/storage/tsdb/bucketindex/markers.go | 6 + .../tsdb/bucketindex/markers_bucket_client.go | 29 +++- .../bucketindex/markers_bucket_client_test.go | 137 ++++++++++++++---- 3 files changed, 139 insertions(+), 33 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/markers.go b/pkg/storage/tsdb/bucketindex/markers.go index 608a442c443..af1bcc8f46f 100644 --- a/pkg/storage/tsdb/bucketindex/markers.go +++ b/pkg/storage/tsdb/bucketindex/markers.go @@ -27,6 +27,12 @@ func BlockDeletionMarkFilepath(blockID ulid.ULID) string { return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.DeletionMarkFilename) } +// NoCompactMarkFilenameMarkFilepath returns the path, relative to the tenant's bucket location, +// of a block no compact mark in the bucket markers location. +func NoCompactMarkFilenameMarkFilepath(blockID ulid.ULID) string { + return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.NoCompactMarkFilename) +} + // IsBlockDeletionMarkFilename returns whether the input filename matches the expected pattern // of block deletion markers stored in the markers location. func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) { diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index d49acbcb139..3b9b5a4cabd 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -29,7 +29,7 @@ func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket { // Upload implements objstore.Bucket. func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader) error { - blockID, ok := b.isBlockDeletionMark(name) + globalMarkPath, ok := b.isMark(name) if !ok { return b.parent.Upload(ctx, name, r) } @@ -46,7 +46,6 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read } // Upload it to the global markers location too. - globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID))) return b.parent.Upload(ctx, globalMarkPath, bytes.NewBuffer(body)) } @@ -58,8 +57,7 @@ func (b *globalMarkersBucket) Delete(ctx context.Context, name string) error { } // Delete the marker in the global markers location too. - if blockID, ok := b.isBlockDeletionMark(name); ok { - globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID))) + if globalMarkPath, ok := b.isMark(name); ok { if err := b.parent.Delete(ctx, globalMarkPath); err != nil { if !b.parent.IsObjNotFoundErr(err) { return err @@ -128,6 +126,29 @@ func (b *globalMarkersBucket) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpe return b } +func (b *globalMarkersBucket) isMark(name string) (string, bool) { + marks := map[string]func(ulid.ULID) string{ + metadata.DeletionMarkFilename: BlockDeletionMarkFilepath, + metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath, + } + + for mark, f := range marks { + if path.Base(name) == mark { + // Parse the block ID in the path. If there's not block ID, then it's not the per-block + // deletion mark. + id, ok := block.IsBlockDir(path.Dir(name)) + + if ok { + return path.Clean(path.Join(path.Dir(name), "../", f(id))), ok + } + + return "", ok + } + } + + return "", false +} + func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool) { if path.Base(name) != metadata.DeletionMarkFilename { return ulid.ULID{}, false diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go index e76edb285c0..20c64b966d5 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go @@ -11,43 +11,114 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) -func TestGlobalMarkersBucket_Delete_ShouldSucceedIfDeletionMarkDoesNotExistInTheBlockButExistInTheGlobalLocation(t *testing.T) { - bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) +func TestGlobalMarker_ShouldUploadGlobalLocation(t *testing.T) { + block1 := ulid.MustNew(1, nil) - ctx := context.Background() - bkt = BucketWithGlobalMarkers(bkt) + tests := []struct { + mark string + globalpath string + }{ + { + mark: metadata.DeletionMarkFilename, + globalpath: "markers/" + block1.String() + "-deletion-mark.json", + }, + { + mark: metadata.NoCompactMarkFilename, + globalpath: "markers/" + block1.String() + "-no-compact-mark.json", + }, + } + + for _, tc := range tests { + t.Run(tc.mark, func(t *testing.T) { + originalPath := block1.String() + "/" + tc.mark + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + bkt = BucketWithGlobalMarkers(bkt) + + bkt.Upload(ctx, originalPath, strings.NewReader("{}")) - // Create a mocked block deletion mark in the global location. - blockID := ulid.MustNew(1, nil) - globalPath := BlockDeletionMarkFilepath(blockID) - require.NoError(t, bkt.Upload(ctx, globalPath, strings.NewReader("{}"))) + // Ensure it exists on originalPath + ok, err := bkt.Exists(ctx, originalPath) + require.NoError(t, err) + require.True(t, ok) - // Ensure it exists before deleting it. - ok, err := bkt.Exists(ctx, globalPath) - require.NoError(t, err) - require.True(t, ok) + // Ensure it exists on globalPath + ok, err = bkt.Exists(ctx, tc.globalpath) + require.NoError(t, err) + require.True(t, ok) - require.NoError(t, bkt.Delete(ctx, globalPath)) + bkt.Delete(ctx, originalPath) - // Ensure has been actually deleted. - ok, err = bkt.Exists(ctx, globalPath) - require.NoError(t, err) - require.False(t, ok) + // Ensure it deleted on originalPath + ok, err = bkt.Exists(ctx, originalPath) + require.NoError(t, err) + require.False(t, ok) + + // Ensure it exists on globalPath + ok, err = bkt.Exists(ctx, tc.globalpath) + require.NoError(t, err) + require.False(t, ok) + }) + } } -func TestGlobalMarkersBucket_isBlockDeletionMark(t *testing.T) { +func TestGlobalMarkersBucket_Delete_ShouldSucceedIfMarkDoesNotExistInTheBlockButExistInTheGlobalLocation(t *testing.T) { + tests := []struct { + name string + pathF func(ulid.ULID) string + }{ + { + name: metadata.DeletionMarkFilename, + pathF: BlockDeletionMarkFilepath, + }, + { + name: metadata.NoCompactMarkFilename, + pathF: NoCompactMarkFilenameMarkFilepath, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + bkt = BucketWithGlobalMarkers(bkt) + + // Create a mocked block deletion mark in the global location. + blockID := ulid.MustNew(1, nil) + globalPath := tc.pathF(blockID) + require.NoError(t, bkt.Upload(ctx, globalPath, strings.NewReader("{}"))) + + // Ensure it exists before deleting it. + ok, err := bkt.Exists(ctx, globalPath) + require.NoError(t, err) + require.True(t, ok) + + require.NoError(t, bkt.Delete(ctx, globalPath)) + + // Ensure has been actually deleted. + ok, err = bkt.Exists(ctx, globalPath) + require.NoError(t, err) + require.False(t, ok) + }) + } +} + +func TestGlobalMarkersBucket_isMark(t *testing.T) { block1 := ulid.MustNew(1, nil) tests := []struct { - name string - expectedOk bool - expectedID ulid.ULID + name string + expectedOk bool + expectedGlobalPath string }{ { name: "", @@ -59,13 +130,21 @@ func TestGlobalMarkersBucket_isBlockDeletionMark(t *testing.T) { name: block1.String() + "/index", expectedOk: false, }, { - name: block1.String() + "/deletion-mark.json", - expectedOk: true, - expectedID: block1, + name: block1.String() + "/deletion-mark.json", + expectedOk: true, + expectedGlobalPath: "markers/" + block1.String() + "-deletion-mark.json", + }, { + name: "/path/to/" + block1.String() + "/deletion-mark.json", + expectedOk: true, + expectedGlobalPath: "/path/to/markers/" + block1.String() + "-deletion-mark.json", + }, { + name: block1.String() + "/no-compact-mark.json", + expectedOk: true, + expectedGlobalPath: "markers/" + block1.String() + "-no-compact-mark.json", }, { - name: "/path/to/" + block1.String() + "/deletion-mark.json", - expectedOk: true, - expectedID: block1, + name: "/path/to/" + block1.String() + "/no-compact-mark.json", + expectedOk: true, + expectedGlobalPath: "/path/to/markers/" + block1.String() + "-no-compact-mark.json", }, } @@ -73,9 +152,9 @@ func TestGlobalMarkersBucket_isBlockDeletionMark(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - actualID, actualOk := b.isBlockDeletionMark(tc.name) + globalPath, actualOk := b.isMark(tc.name) assert.Equal(t, tc.expectedOk, actualOk) - assert.Equal(t, tc.expectedID, actualID) + assert.Equal(t, tc.expectedGlobalPath, globalPath) }) } } From cd9707cc1217b456a115ae1b9604cf6f6778c87e Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 4 May 2022 18:04:56 -0700 Subject: [PATCH 2/8] Creating new Metric cortex_compactor_blocks_marked_for_no_compaction_on_storage_total --- pkg/compactor/blocks_cleaner.go | 29 ++++++++++++------- pkg/storage/tsdb/bucketindex/index.go | 3 ++ pkg/storage/tsdb/bucketindex/markers.go | 18 ++++++++++++ .../tsdb/bucketindex/markers_bucket_client.go | 10 ------- pkg/storage/tsdb/bucketindex/updater.go | 25 ++++++++++------ 5 files changed, 55 insertions(+), 30 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 4a412e4aa24..df1900a705d 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -45,17 +45,18 @@ type BlocksCleaner struct { lastOwnedUsers []string // Metrics. - runsStarted prometheus.Counter - runsCompleted prometheus.Counter - runsFailed prometheus.Counter - runsLastSuccess prometheus.Gauge - blocksCleanedTotal prometheus.Counter - blocksFailedTotal prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - tenantBlocks *prometheus.GaugeVec - tenantMarkedBlocks *prometheus.GaugeVec - tenantPartialBlocks *prometheus.GaugeVec - tenantBucketIndexLastUpdate *prometheus.GaugeVec + runsStarted prometheus.Counter + runsCompleted prometheus.Counter + runsFailed prometheus.Counter + runsLastSuccess prometheus.Gauge + blocksCleanedTotal prometheus.Counter + blocksFailedTotal prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + tenantBlocks *prometheus.GaugeVec + tenantMarkedBlocks *prometheus.GaugeVec + tenantPartialBlocks *prometheus.GaugeVec + tenantBucketIndexLastUpdate *prometheus.GaugeVec + blocksMarkedForNoCompactionOnStorage *prometheus.GaugeVec } func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { @@ -69,6 +70,10 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use Name: "cortex_compactor_block_cleanup_started_total", Help: "Total number of blocks cleanup runs started.", }), + blocksMarkedForNoCompactionOnStorage: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_compactor_blocks_marked_for_no_compaction_on_storage_total", + Help: "Total number of blocks marked for on storage.", + }, []string{"user"}), runsCompleted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_completed_total", Help: "Total number of blocks cleanup runs successfully completed.", @@ -241,6 +246,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID c.tenantBlocks.DeleteLabelValues(userID) c.tenantMarkedBlocks.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) + c.blocksMarkedForNoCompactionOnStorage.DeleteLabelValues(userID) if deletedBlocks > 0 { level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks) @@ -331,6 +337,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // Generate an updated in-memory version of the bucket index. w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) idx, partials, err := w.UpdateIndex(ctx, idx) + c.blocksMarkedForNoCompactionOnStorage.WithLabelValues(userID).Set(float64(idx.TotalBlocksBlocksMarkedForNoCompaction)) if err != nil { return err } diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 5c5f6cb5d4b..54b8cc4edbd 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -41,6 +41,9 @@ type Index struct { // UpdatedAt is a unix timestamp (seconds precision) of when the index has been updated // (written in the storage) the last time. UpdatedAt int64 `json:"updated_at"` + + // TotalBlocksBlocksMarkedForNoCompaction is then number of blocks marked for no compaction + TotalBlocksBlocksMarkedForNoCompaction int64 `json:"total_blocks_marked_for_no_compaction"` } func (idx *Index) GetUpdatedAt() time.Time { diff --git a/pkg/storage/tsdb/bucketindex/markers.go b/pkg/storage/tsdb/bucketindex/markers.go index af1bcc8f46f..395d789f8fb 100644 --- a/pkg/storage/tsdb/bucketindex/markers.go +++ b/pkg/storage/tsdb/bucketindex/markers.go @@ -51,6 +51,24 @@ func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) { return id, err == nil } +// IsBlockNoCompactMarkFilename returns whether the input filename matches the expected pattern +// of block no compact markers stored in the markers location. +func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) { + parts := strings.SplitN(name, "-", 2) + if len(parts) != 2 { + return ulid.ULID{}, false + } + + // Ensure the 2nd part matches the block deletion mark filename. + if parts[1] != metadata.NoCompactMarkFilename { + return ulid.ULID{}, false + } + + // Ensure the 1st part is a valid block ID. + id, err := ulid.Parse(filepath.Base(parts[0])) + return id, err == nil +} + // MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for // a deletion mark in the block location. Found deletion marks are copied to the global markers location. // The migration continues on error and returns once all blocks have been checked. diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index 3b9b5a4cabd..c628ec44e43 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -148,13 +148,3 @@ func (b *globalMarkersBucket) isMark(name string) (string, bool) { return "", false } - -func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool) { - if path.Base(name) != metadata.DeletionMarkFilename { - return ulid.ULID{}, false - } - - // Parse the block ID in the path. If there's not block ID, then it's not the per-block - // deletion mark. - return block.IsBlockDir(path.Dir(name)) -} diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 91a8f3e4b29..a3281af565a 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -57,16 +57,17 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid return nil, nil, err } - blockDeletionMarks, err := w.updateBlockDeletionMarks(ctx, oldBlockDeletionMarks) + blockDeletionMarks, totalBlocksBlocksMarkedForNoCompaction, err := w.updateBlockMarks(ctx, oldBlockDeletionMarks) if err != nil { return nil, nil, err } return &Index{ - Version: IndexVersion1, - Blocks: blocks, - BlockDeletionMarks: blockDeletionMarks, - UpdatedAt: time.Now().Unix(), + Version: IndexVersion1, + Blocks: blocks, + BlockDeletionMarks: blockDeletionMarks, + UpdatedAt: time.Now().Unix(), + TotalBlocksBlocksMarkedForNoCompaction: totalBlocksBlocksMarkedForNoCompaction, }, partials, nil } @@ -163,19 +164,25 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo return block, nil } -func (w *Updater) updateBlockDeletionMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, error) { +func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, int64, error) { out := make([]*BlockDeletionMark, 0, len(old)) discovered := map[ulid.ULID]struct{}{} + totalBlocksBlocksMarkedForNoCompaction := int64(0) // Find all markers in the storage. err := w.bkt.Iter(ctx, MarkersPathname+"/", func(name string) error { if blockID, ok := IsBlockDeletionMarkFilename(path.Base(name)); ok { discovered[blockID] = struct{}{} } + + if _, ok := IsBlockNoCompactMarkFilename(path.Base(name)); ok { + totalBlocksBlocksMarkedForNoCompaction++ + } + return nil }) if err != nil { - return nil, errors.Wrap(err, "list block deletion marks") + return nil, totalBlocksBlocksMarkedForNoCompaction, errors.Wrap(err, "list block deletion marks") } // Since deletion marks are immutable, all markers already existing in the index can just be copied. @@ -199,13 +206,13 @@ func (w *Updater) updateBlockDeletionMarks(ctx context.Context, old []*BlockDele continue } if err != nil { - return nil, err + return nil, totalBlocksBlocksMarkedForNoCompaction, err } out = append(out, m) } - return out, nil + return out, totalBlocksBlocksMarkedForNoCompaction, nil } func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid.ULID) (*BlockDeletionMark, error) { From 0a60c99ac96abd519ac23f2b976fbfad46b53da4 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 4 May 2022 18:30:26 -0700 Subject: [PATCH 3/8] Fix Lint --- pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go index 20c64b966d5..8666b62fbad 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go @@ -43,7 +43,8 @@ func TestGlobalMarker_ShouldUploadGlobalLocation(t *testing.T) { ctx := context.Background() bkt = BucketWithGlobalMarkers(bkt) - bkt.Upload(ctx, originalPath, strings.NewReader("{}")) + err := bkt.Upload(ctx, originalPath, strings.NewReader("{}")) + require.NoError(t, err) // Ensure it exists on originalPath ok, err := bkt.Exists(ctx, originalPath) @@ -55,7 +56,8 @@ func TestGlobalMarker_ShouldUploadGlobalLocation(t *testing.T) { require.NoError(t, err) require.True(t, ok) - bkt.Delete(ctx, originalPath) + err = bkt.Delete(ctx, originalPath) + require.NoError(t, err) // Ensure it deleted on originalPath ok, err = bkt.Exists(ctx, originalPath) From 91744744ee21190b6fb823d7f9e725cf36804022 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 4 May 2022 20:50:16 -0700 Subject: [PATCH 4/8] Update MigrateBlockDeletionMarksToGlobalLocation to also migrate non compact markers --- pkg/storage/tsdb/bucketindex/markers.go | 39 +++++++++++-------- .../tsdb/bucketindex/markers_bucket_client.go | 4 +- pkg/storage/tsdb/bucketindex/markers_test.go | 16 +++++--- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/markers.go b/pkg/storage/tsdb/bucketindex/markers.go index 395d789f8fb..d223f854c0c 100644 --- a/pkg/storage/tsdb/bucketindex/markers.go +++ b/pkg/storage/tsdb/bucketindex/markers.go @@ -76,6 +76,11 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore bucket := bucket.NewUserBucketClient(userID, bkt, cfgProvider) userBucket := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr) + marks := map[string]func(ulid.ULID) string{ + metadata.DeletionMarkFilename: BlockDeletionMarkFilepath, + metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath, + } + // Find all blocks in the storage. var blocks []ulid.ULID err := userBucket.Iter(ctx, "", func(name string) error { @@ -91,22 +96,24 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore errs := tsdb_errors.NewMulti() for _, blockID := range blocks { - // Look up the deletion mark (if any). - reader, err := userBucket.Get(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename)) - if userBucket.IsObjNotFoundErr(err) { - continue - } else if err != nil { - errs.Add(err) - continue - } - - // Upload it to the global markers location. - uploadErr := userBucket.Upload(ctx, BlockDeletionMarkFilepath(blockID), reader) - if closeErr := reader.Close(); closeErr != nil { - errs.Add(closeErr) - } - if uploadErr != nil { - errs.Add(uploadErr) + for mark, globalFilePath := range marks { + // Look up mark (if any). + reader, err := userBucket.Get(ctx, path.Join(blockID.String(), mark)) + if userBucket.IsObjNotFoundErr(err) { + continue + } else if err != nil { + errs.Add(err) + continue + } + + // Upload it to the global markers location. + uploadErr := userBucket.Upload(ctx, globalFilePath(blockID), reader) + if closeErr := reader.Close(); closeErr != nil { + errs.Add(closeErr) + } + if uploadErr != nil { + errs.Add(uploadErr) + } } } diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index c628ec44e43..a39b509a0ef 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -132,14 +132,14 @@ func (b *globalMarkersBucket) isMark(name string) (string, bool) { metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath, } - for mark, f := range marks { + for mark, globalFilePath := range marks { if path.Base(name) == mark { // Parse the block ID in the path. If there's not block ID, then it's not the per-block // deletion mark. id, ok := block.IsBlockDir(path.Dir(name)) if ok { - return path.Clean(path.Join(path.Dir(name), "../", f(id))), ok + return path.Clean(path.Join(path.Dir(name), "../", globalFilePath(id))), ok } return "", ok diff --git a/pkg/storage/tsdb/bucketindex/markers_test.go b/pkg/storage/tsdb/bucketindex/markers_test.go index 4bf1d53f825..2aebc4ad18d 100644 --- a/pkg/storage/tsdb/bucketindex/markers_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_test.go @@ -49,8 +49,10 @@ func TestMigrateBlockDeletionMarksToGlobalLocation(t *testing.T) { block1 := ulid.MustNew(1, nil) block2 := ulid.MustNew(2, nil) block3 := ulid.MustNew(3, nil) + block4 := ulid.MustNew(4, nil) require.NoError(t, bkt.Upload(ctx, path.Join("user-1", block1.String(), metadata.DeletionMarkFilename), strings.NewReader("{}"))) require.NoError(t, bkt.Upload(ctx, path.Join("user-1", block3.String(), metadata.DeletionMarkFilename), strings.NewReader("{}"))) + require.NoError(t, bkt.Upload(ctx, path.Join("user-1", block4.String(), metadata.NoCompactMarkFilename), strings.NewReader("{}"))) t.Run("doesn't increase thanos_objstore_bucket_operation_failures_total for NotFound deletion markers", func(t *testing.T) { reg := prometheus.NewPedanticRegistry() @@ -77,14 +79,16 @@ func TestMigrateBlockDeletionMarksToGlobalLocation(t *testing.T) { // Ensure deletion marks have been copied. for _, tc := range []struct { - blockID ulid.ULID - expectedExists bool + blockID ulid.ULID + expectedExists bool + globalFilePathFunc func(ulid.ULID) string }{ - {block1, true}, - {block2, false}, - {block3, true}, + {block1, true, BlockDeletionMarkFilepath}, + {block2, false, BlockDeletionMarkFilepath}, + {block3, true, BlockDeletionMarkFilepath}, + {block4, true, NoCompactMarkFilenameMarkFilepath}, } { - ok, err := bkt.Exists(ctx, path.Join("user-1", BlockDeletionMarkFilepath(tc.blockID))) + ok, err := bkt.Exists(ctx, path.Join("user-1", tc.globalFilePathFunc(tc.blockID))) require.NoError(t, err) require.Equal(t, tc.expectedExists, ok) } From 11387a79047ec6f932c36eb956095722819bf8f4 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 4 May 2022 21:12:40 -0700 Subject: [PATCH 5/8] Not saving the NonCompactionMark information on the index yet --- pkg/compactor/blocks_cleaner.go | 4 +-- pkg/compactor/blocks_cleaner_test.go | 2 +- pkg/storage/tsdb/bucketindex/index.go | 3 --- pkg/storage/tsdb/bucketindex/storage_test.go | 4 +-- pkg/storage/tsdb/bucketindex/updater.go | 17 ++++++------- pkg/storage/tsdb/bucketindex/updater_test.go | 25 ++++++++++++------- pkg/storage/tsdb/testutil/block_mock.go | 20 +++++++++++++++ pkg/storegateway/gateway_test.go | 2 +- .../metadata_fetcher_filters_test.go | 2 +- 9 files changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index df1900a705d..2433e50d98d 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -336,8 +336,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // Generate an updated in-memory version of the bucket index. w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) - idx, partials, err := w.UpdateIndex(ctx, idx) - c.blocksMarkedForNoCompactionOnStorage.WithLabelValues(userID).Set(float64(idx.TotalBlocksBlocksMarkedForNoCompaction)) + idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx) + c.blocksMarkedForNoCompactionOnStorage.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) if err != nil { return err } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 56ebdf3d889..bf03128da2f 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -421,7 +421,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) { id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil) w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger) - idx, _, err := w.UpdateIndex(ctx, nil) + idx, _, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assert.ElementsMatch(t, []ulid.ULID{id1, id2, id3}, idx.Blocks.GetULIDs()) diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 54b8cc4edbd..5c5f6cb5d4b 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -41,9 +41,6 @@ type Index struct { // UpdatedAt is a unix timestamp (seconds precision) of when the index has been updated // (written in the storage) the last time. UpdatedAt int64 `json:"updated_at"` - - // TotalBlocksBlocksMarkedForNoCompaction is then number of blocks marked for no compaction - TotalBlocksBlocksMarkedForNoCompaction int64 `json:"total_blocks_marked_for_no_compaction"` } func (idx *Index) GetUpdatedAt() time.Time { diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index 548d9842260..c27e37aac4a 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -52,7 +52,7 @@ func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { // Write the index. u := NewUpdater(bkt, userID, nil, logger) - expectedIdx, _, err := u.UpdateIndex(ctx, nil) + expectedIdx, _, _, err := u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, WriteIndex(ctx, bkt, userID, nil, expectedIdx)) @@ -89,7 +89,7 @@ func BenchmarkReadIndex(b *testing.B) { // Write the index. u := NewUpdater(bkt, userID, nil, logger) - idx, _, err := u.UpdateIndex(ctx, nil) + idx, _, _, err := u.UpdateIndex(ctx, nil) require.NoError(b, err) require.NoError(b, WriteIndex(ctx, bkt, userID, nil, idx)) diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index a3281af565a..9988d887358 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -42,7 +42,7 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon // UpdateIndex generates the bucket index and returns it, without storing it to the storage. // If the old index is not passed in input, then the bucket index will be generated from scratch. -func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, error) { +func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, int64, error) { var oldBlocks []*Block var oldBlockDeletionMarks []*BlockDeletionMark @@ -54,21 +54,20 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid blocks, partials, err := w.updateBlocks(ctx, oldBlocks) if err != nil { - return nil, nil, err + return nil, nil, 0, err } blockDeletionMarks, totalBlocksBlocksMarkedForNoCompaction, err := w.updateBlockMarks(ctx, oldBlockDeletionMarks) if err != nil { - return nil, nil, err + return nil, nil, 0, err } return &Index{ - Version: IndexVersion1, - Blocks: blocks, - BlockDeletionMarks: blockDeletionMarks, - UpdatedAt: time.Now().Unix(), - TotalBlocksBlocksMarkedForNoCompaction: totalBlocksBlocksMarkedForNoCompaction, - }, partials, nil + Version: IndexVersion1, + Blocks: blocks, + BlockDeletionMarks: blockDeletionMarks, + UpdatedAt: time.Now().Unix(), + }, partials, totalBlocksBlocksMarkedForNoCompaction, nil } func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Block, partials map[ulid.ULID]error, _ error) { diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index a90a595b443..42bc8fc19a7 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -36,7 +36,7 @@ func TestUpdater_UpdateIndex(t *testing.T) { block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) w := NewUpdater(bkt, userID, nil, logger) - returnedIdx, _, err := w.UpdateIndex(ctx, nil) + returnedIdx, _, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2}, @@ -47,7 +47,7 @@ func TestUpdater_UpdateIndex(t *testing.T) { block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50) block4Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block4) - returnedIdx, _, err = w.UpdateIndex(ctx, returnedIdx) + returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2, block3, block4}, @@ -56,7 +56,7 @@ func TestUpdater_UpdateIndex(t *testing.T) { // Hard delete a block and update the index. require.NoError(t, block.Delete(ctx, log.NewNopLogger(), bucket.NewUserBucketClient(userID, bkt, nil), block2.ULID)) - returnedIdx, _, err = w.UpdateIndex(ctx, returnedIdx) + returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block3, block4}, @@ -82,7 +82,7 @@ func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { require.NoError(t, bkt.Delete(ctx, path.Join(userID, block3.ULID.String(), metadata.MetaFilename))) w := NewUpdater(bkt, userID, nil, logger) - idx, partials, err := w.UpdateIndex(ctx, nil) + idx, partials, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2}, @@ -105,20 +105,23 @@ func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) + block4 := testutil.MockStorageBlock(t, bkt, userID, 50, 50) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + testutil.MockStorageNonCompactionMark(t, bkt, userID, block4) // Overwrite a block's meta.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block3.ULID.String(), metadata.MetaFilename), bytes.NewReader([]byte("invalid!}")))) w := NewUpdater(bkt, userID, nil, logger) - idx, partials, err := w.UpdateIndex(ctx, nil) + idx, partials, nonCompactBlocks, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, - []tsdb.BlockMeta{block1, block2}, + []tsdb.BlockMeta{block1, block2, block4}, []*metadata.DeletionMark{block2Mark}) assert.Len(t, partials, 1) assert.True(t, errors.Is(partials[block3.ULID], ErrBlockMetaCorrupted)) + assert.Equal(t, nonCompactBlocks, int64(1)) } func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) { @@ -134,18 +137,22 @@ func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) { block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) + block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + block4Mark := testutil.MockStorageNonCompactionMark(t, bkt, userID, block4) // Overwrite a block's deletion-mark.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block2Mark.ID.String(), metadata.DeletionMarkFilename), bytes.NewReader([]byte("invalid!}")))) + require.NoError(t, bkt.Upload(ctx, path.Join(userID, block4Mark.ID.String(), metadata.NoCompactMarkFilename), bytes.NewReader([]byte("invalid!}")))) w := NewUpdater(bkt, userID, nil, logger) - idx, partials, err := w.UpdateIndex(ctx, nil) + idx, partials, nonCompactBlocks, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, - []tsdb.BlockMeta{block1, block2, block3}, + []tsdb.BlockMeta{block1, block2, block3, block4}, []*metadata.DeletionMark{}) assert.Empty(t, partials) + assert.Equal(t, nonCompactBlocks, int64(1)) } func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { @@ -156,7 +163,7 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { for _, oldIdx := range []*Index{nil, {}} { w := NewUpdater(bkt, userID, nil, log.NewNopLogger()) - idx, partials, err := w.UpdateIndex(ctx, oldIdx) + idx, partials, _, err := w.UpdateIndex(ctx, oldIdx) require.NoError(t, err) assert.Equal(t, IndexVersion1, idx.Version) diff --git a/pkg/storage/tsdb/testutil/block_mock.go b/pkg/storage/tsdb/testutil/block_mock.go index a2a931aa59c..5f577700999 100644 --- a/pkg/storage/tsdb/testutil/block_mock.go +++ b/pkg/storage/tsdb/testutil/block_mock.go @@ -66,3 +66,23 @@ func MockStorageDeletionMark(t testing.TB, bucket objstore.Bucket, userID string return &mark } + +func MockStorageNonCompactionMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *metadata.NoCompactMark { + mark := metadata.NoCompactMark{ + ID: meta.ULID, + Version: metadata.NoCompactMarkVersion1, + NoCompactTime: time.Now().Unix(), + Reason: metadata.OutOfOrderChunksNoCompactReason, + } + + markContent, err := json.Marshal(mark) + if err != nil { + panic("failed to marshal mocked block meta") + } + + markContentReader := strings.NewReader(string(markContent)) + markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), metadata.NoCompactMarkFilename) + require.NoError(t, bucket.Upload(context.Background(), markPath, markContentReader)) + + return &mark +} diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index acb08d698f6..ac18a7e1650 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -1213,7 +1213,7 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index { updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()) - idx, _, err := updater.UpdateIndex(context.Background(), nil) + idx, _, _, err := updater.UpdateIndex(context.Background(), nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx)) diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 99c8bc44e22..d7e877bfd94 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -69,7 +69,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { var err error u := bucketindex.NewUpdater(bkt, userID, nil, logger) - idx, _, err = u.UpdateIndex(ctx, nil) + idx, _, _, err = u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, idx)) } From ea48b6dee9d73042a596b5e41c4e4e5dbc0492ef Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 6 May 2022 12:35:26 -0700 Subject: [PATCH 6/8] Update Changelog and rename the metric to be in the same pattern of cortex_bucket_blocks_marked_for_deletion_count --- CHANGELOG.md | 2 ++ pkg/compactor/blocks_cleaner.go | 39 ++++++++++++++-------------- pkg/compactor/blocks_cleaner_test.go | 14 ++++++++++ pkg/compactor/compactor_test.go | 8 ++++++ 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d46774c7e93..64422420b84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ * [BUGFIX] Query Frontend: If 'LogQueriesLongerThan' is set to < 0, log all queries as described in the docs. #4633 * [BUGFIX] Distributor: update defaultReplicationStrategy to not fail with extend-write when a single instance is unhealthy. #4636 * [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716 +* [ENHANCEMENT] Compactor: uploading blocks no compaction marks to the global location and introduce a new metric #4729 + * `cortex_bucket_blocks_marked_for_no_compaction_count`: Total number of blocks marked for no compaction in the bucket. ## 1.11.0 2021-11-25 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 2433e50d98d..124193aac98 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -45,18 +45,18 @@ type BlocksCleaner struct { lastOwnedUsers []string // Metrics. - runsStarted prometheus.Counter - runsCompleted prometheus.Counter - runsFailed prometheus.Counter - runsLastSuccess prometheus.Gauge - blocksCleanedTotal prometheus.Counter - blocksFailedTotal prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - tenantBlocks *prometheus.GaugeVec - tenantMarkedBlocks *prometheus.GaugeVec - tenantPartialBlocks *prometheus.GaugeVec - tenantBucketIndexLastUpdate *prometheus.GaugeVec - blocksMarkedForNoCompactionOnStorage *prometheus.GaugeVec + runsStarted prometheus.Counter + runsCompleted prometheus.Counter + runsFailed prometheus.Counter + runsLastSuccess prometheus.Gauge + blocksCleanedTotal prometheus.Counter + blocksFailedTotal prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + tenantBlocks *prometheus.GaugeVec + tenantMarkedBlocks *prometheus.GaugeVec + tenantPartialBlocks *prometheus.GaugeVec + tenantBucketIndexLastUpdate *prometheus.GaugeVec + tenantMarkedNoCompactBlocks *prometheus.GaugeVec } func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { @@ -70,10 +70,6 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use Name: "cortex_compactor_block_cleanup_started_total", Help: "Total number of blocks cleanup runs started.", }), - blocksMarkedForNoCompactionOnStorage: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_compactor_blocks_marked_for_no_compaction_on_storage_total", - Help: "Total number of blocks marked for on storage.", - }, []string{"user"}), runsCompleted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_completed_total", Help: "Total number of blocks cleanup runs successfully completed.", @@ -111,6 +107,10 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use Name: "cortex_bucket_blocks_marked_for_deletion_count", Help: "Total number of blocks marked for deletion in the bucket.", }, []string{"user"}), + tenantMarkedNoCompactBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_bucket_blocks_marked_for_no_compaction_count", + Help: "Total number of blocks marked for no compaction in the bucket.", + }, []string{"user"}), tenantPartialBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_blocks_partials_count", Help: "Total number of partial blocks.", @@ -174,6 +174,7 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error { if !isActive[userID] && !isDeleted[userID] { c.tenantBlocks.DeleteLabelValues(userID) c.tenantMarkedBlocks.DeleteLabelValues(userID) + c.tenantMarkedNoCompactBlocks.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) } @@ -245,8 +246,8 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID // Given all blocks have been deleted, we can also remove the metrics. c.tenantBlocks.DeleteLabelValues(userID) c.tenantMarkedBlocks.DeleteLabelValues(userID) + c.tenantMarkedNoCompactBlocks.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) - c.blocksMarkedForNoCompactionOnStorage.DeleteLabelValues(userID) if deletedBlocks > 0 { level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks) @@ -337,7 +338,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // Generate an updated in-memory version of the bucket index. w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx) - c.blocksMarkedForNoCompactionOnStorage.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) + c.tenantMarkedNoCompactBlocks.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) if err != nil { return err } @@ -375,8 +376,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) - c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() + c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) return nil } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index bf03128da2f..e848dd64d36 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -98,6 +98,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json") require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here"))) + // No Compact blocks marker + createTSDBBlock(t, bucketClient, "user-5", 10, 30, nil) + block12 := createTSDBBlock(t, bucketClient, "user-5", 30, 50, nil) + createNoCompactionMark(t, bucketClient, "user-5", block12) + // The fixtures have been created. If the bucket client wasn't wrapped to write // deletion marks to the global location too, then this is the right time to do it. if options.markersMigrationEnabled { @@ -202,17 +207,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions # TYPE cortex_bucket_blocks_count gauge cortex_bucket_blocks_count{user="user-1"} 2 cortex_bucket_blocks_count{user="user-2"} 1 + cortex_bucket_blocks_count{user="user-5"} 2 # HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket. # TYPE cortex_bucket_blocks_marked_for_deletion_count gauge cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1 cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 + cortex_bucket_blocks_marked_for_deletion_count{user="user-5"} 0 + # HELP cortex_bucket_blocks_marked_for_no_compaction_count Total number of blocks marked for no compaction in the bucket. + # TYPE cortex_bucket_blocks_marked_for_no_compaction_count gauge + cortex_bucket_blocks_marked_for_no_compaction_count{user="user-1"} 0 + cortex_bucket_blocks_marked_for_no_compaction_count{user="user-2"} 0 + cortex_bucket_blocks_marked_for_no_compaction_count{user="user-5"} 1 # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 2 cortex_bucket_blocks_partials_count{user="user-2"} 0 + cortex_bucket_blocks_partials_count{user="user-5"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", + "cortex_bucket_blocks_marked_for_no_compaction_count", "cortex_bucket_blocks_partials_count", )) } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index f419b9d248f..13121974875 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1382,6 +1382,14 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content))) } +func createNoCompactionMark(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) { + content := mockNoCompactBlockJSON(blockID.String()) + blockPath := path.Join(userID, blockID.String()) + markPath := path.Join(blockPath, metadata.NoCompactMarkFilename) + + require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content))) +} + func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) { var compactor *Compactor var log *concurrency.SyncBuffer From 5563e2bbe84179909b0f6b7f60d686e9f02e659d Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 6 May 2022 12:42:59 -0700 Subject: [PATCH 7/8] rename var names --- pkg/compactor/blocks_cleaner.go | 42 ++++++++++++++++----------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 124193aac98..d178781fd8d 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -45,18 +45,18 @@ type BlocksCleaner struct { lastOwnedUsers []string // Metrics. - runsStarted prometheus.Counter - runsCompleted prometheus.Counter - runsFailed prometheus.Counter - runsLastSuccess prometheus.Gauge - blocksCleanedTotal prometheus.Counter - blocksFailedTotal prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - tenantBlocks *prometheus.GaugeVec - tenantMarkedBlocks *prometheus.GaugeVec - tenantPartialBlocks *prometheus.GaugeVec - tenantBucketIndexLastUpdate *prometheus.GaugeVec - tenantMarkedNoCompactBlocks *prometheus.GaugeVec + runsStarted prometheus.Counter + runsCompleted prometheus.Counter + runsFailed prometheus.Counter + runsLastSuccess prometheus.Gauge + blocksCleanedTotal prometheus.Counter + blocksFailedTotal prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + tenantBlocks *prometheus.GaugeVec + tenantBlocksMarkedForDelete *prometheus.GaugeVec + tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec + tenantPartialBlocks *prometheus.GaugeVec + tenantBucketIndexLastUpdate *prometheus.GaugeVec } func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { @@ -103,11 +103,11 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use Name: "cortex_bucket_blocks_count", Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.", }, []string{"user"}), - tenantMarkedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_blocks_marked_for_deletion_count", Help: "Total number of blocks marked for deletion in the bucket.", }, []string{"user"}), - tenantMarkedNoCompactBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + tenantBlocksMarkedForNoCompaction: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_bucket_blocks_marked_for_no_compaction_count", Help: "Total number of blocks marked for no compaction in the bucket.", }, []string{"user"}), @@ -173,8 +173,8 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error { for _, userID := range c.lastOwnedUsers { if !isActive[userID] && !isDeleted[userID] { c.tenantBlocks.DeleteLabelValues(userID) - c.tenantMarkedBlocks.DeleteLabelValues(userID) - c.tenantMarkedNoCompactBlocks.DeleteLabelValues(userID) + c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) + c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) } @@ -237,7 +237,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID // to delete. We also consider them all marked for deletion given the next run will try // to delete them again. c.tenantBlocks.WithLabelValues(userID).Set(float64(failed)) - c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(failed)) + c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(failed)) c.tenantPartialBlocks.WithLabelValues(userID).Set(0) return errors.Errorf("failed to delete %d blocks", failed) @@ -245,8 +245,8 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID // Given all blocks have been deleted, we can also remove the metrics. c.tenantBlocks.DeleteLabelValues(userID) - c.tenantMarkedBlocks.DeleteLabelValues(userID) - c.tenantMarkedNoCompactBlocks.DeleteLabelValues(userID) + c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) + c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) if deletedBlocks > 0 { @@ -338,7 +338,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // Generate an updated in-memory version of the bucket index. w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx) - c.tenantMarkedNoCompactBlocks.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) if err != nil { return err } @@ -375,7 +374,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) - c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) + c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) + c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) From d718bc8391a51e25ca9feb023365a0db02272396 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 17 May 2022 09:38:50 -0700 Subject: [PATCH 8/8] Sharing the MarkersMap var --- pkg/storage/tsdb/bucketindex/markers.go | 14 ++++++++------ .../tsdb/bucketindex/markers_bucket_client.go | 8 +------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/markers.go b/pkg/storage/tsdb/bucketindex/markers.go index d223f854c0c..4e3c8985406 100644 --- a/pkg/storage/tsdb/bucketindex/markers.go +++ b/pkg/storage/tsdb/bucketindex/markers.go @@ -21,6 +21,13 @@ const ( MarkersPathname = "markers" ) +var ( + MarkersMap = map[string]func(ulid.ULID) string{ + metadata.DeletionMarkFilename: BlockDeletionMarkFilepath, + metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath, + } +) + // BlockDeletionMarkFilepath returns the path, relative to the tenant's bucket location, // of a block deletion mark in the bucket markers location. func BlockDeletionMarkFilepath(blockID ulid.ULID) string { @@ -76,11 +83,6 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore bucket := bucket.NewUserBucketClient(userID, bkt, cfgProvider) userBucket := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr) - marks := map[string]func(ulid.ULID) string{ - metadata.DeletionMarkFilename: BlockDeletionMarkFilepath, - metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath, - } - // Find all blocks in the storage. var blocks []ulid.ULID err := userBucket.Iter(ctx, "", func(name string) error { @@ -96,7 +98,7 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore errs := tsdb_errors.NewMulti() for _, blockID := range blocks { - for mark, globalFilePath := range marks { + for mark, globalFilePath := range MarkersMap { // Look up mark (if any). reader, err := userBucket.Get(ctx, path.Join(blockID.String(), mark)) if userBucket.IsObjNotFoundErr(err) { diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index a39b509a0ef..df7f9193eb1 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -7,9 +7,7 @@ import ( "io/ioutil" "path" - "github.com/oklog/ulid" "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -127,12 +125,8 @@ func (b *globalMarkersBucket) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpe } func (b *globalMarkersBucket) isMark(name string) (string, bool) { - marks := map[string]func(ulid.ULID) string{ - metadata.DeletionMarkFilename: BlockDeletionMarkFilepath, - metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath, - } - for mark, globalFilePath := range marks { + for mark, globalFilePath := range MarkersMap { if path.Base(name) == mark { // Parse the block ID in the path. If there's not block ID, then it's not the per-block // deletion mark.