Skip to content

Commit 40d8240

Browse files
authored
Avoid deletion of blocks which are not shipped (#3346)
* Avoid deletion of blocks which are not shipped Signed-off-by: Ganesh Vernekar <[email protected]> * Add CHANGELOG entry Signed-off-by: Ganesh Vernekar <[email protected]> * Use the DefaultBlocksToDelete from TSDB Signed-off-by: Ganesh Vernekar <[email protected]> * Fix lint and tests Signed-off-by: Ganesh Vernekar <[email protected]> * Add unit test and fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 6907f16 commit 40d8240

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
* [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299
9898
* [BUGFIX] Fixed Gossip memberlist members joining when addresses are configured using DNS-based service discovery. #3360
9999
* [BUGFIX] Ingester: fail to start an ingester running the blocks storage, if unable to load any existing TSDB at startup. #3354
100+
* [BUGFIX] Blocks storage: Avoid deletion of blocks in the ingester which are not shipped to the storage yet. #3346
100101

101102
## 1.4.0 / 2020-10-02
102103

pkg/ingester/ingester_v2.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/go-kit/kit/log/level"
14+
"github.com/oklog/ulid"
1415
"github.com/pkg/errors"
1516
"github.com/prometheus/client_golang/prometheus"
1617
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -110,6 +111,32 @@ func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
110111
}
111112
}
112113

114+
// blocksToDelete filters the input blocks and returns the blocks which are safe to be deleted from the ingester.
115+
func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
116+
if u.DB == nil {
117+
return nil
118+
}
119+
deletable := tsdb.DefaultBlocksToDelete(u.DB)(blocks)
120+
if u.shipper == nil {
121+
return deletable
122+
}
123+
124+
shipperMeta, err := shipper.ReadMetaFile(u.Dir())
125+
if err != nil {
126+
// If there is any issue with the shipper, we should be conservative and not delete anything.
127+
level.Error(util.Logger).Log("msg", "failed to read shipper meta during deletion of blocks", "user", u.userID, "err", err)
128+
return nil
129+
}
130+
131+
result := map[ulid.ULID]struct{}{}
132+
for _, shippedID := range shipperMeta.Uploaded {
133+
if _, ok := deletable[shippedID]; ok {
134+
result[shippedID] = struct{}{}
135+
}
136+
}
137+
return result
138+
}
139+
113140
func (u *userTSDB) isIdle(now time.Time, idle time.Duration) bool {
114141
lu := u.lastUpdate.Load()
115142

@@ -972,6 +999,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
972999
StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize,
9731000
WALCompression: i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled,
9741001
SeriesLifecycleCallback: userDB,
1002+
BlocksToDelete: userDB.blocksToDelete,
9751003
})
9761004
if err != nil {
9771005
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

pkg/ingester/ingester_v2_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"testing"
1818
"time"
1919

20+
"github.com/oklog/ulid"
2021
"github.com/pkg/errors"
2122
"github.com/prometheus/client_golang/prometheus"
2223
"github.com/prometheus/client_golang/prometheus/testutil"
@@ -27,6 +28,7 @@ import (
2728
"github.com/stretchr/testify/assert"
2829
"github.com/stretchr/testify/mock"
2930
"github.com/stretchr/testify/require"
31+
"github.com/thanos-io/thanos/pkg/shipper"
3032
"github.com/weaveworks/common/httpgrpc"
3133
"github.com/weaveworks/common/middleware"
3234
"github.com/weaveworks/common/user"
@@ -2213,3 +2215,87 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) {
22132215
db = i.getTSDB(userID)
22142216
require.Nil(t, db)
22152217
}
2218+
2219+
func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
2220+
chunkRange := 2 * time.Hour
2221+
chunkRangeMilliSec := chunkRange.Milliseconds()
2222+
cfg := defaultIngesterTestConfig()
2223+
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{chunkRange}
2224+
cfg.BlocksStorageConfig.TSDB.Retention = time.Millisecond // Which means delete all but first block.
2225+
cfg.LifecyclerConfig.JoinAfter = 0
2226+
2227+
// Create ingester
2228+
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
2229+
require.NoError(t, err)
2230+
t.Cleanup(cleanup)
2231+
2232+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
2233+
t.Cleanup(func() {
2234+
_ = services.StopAndAwaitTerminated(context.Background(), i)
2235+
})
2236+
2237+
// Wait until it's ACTIVE
2238+
test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} {
2239+
return i.lifecycler.GetState()
2240+
})
2241+
2242+
// Push some data to create 3 blocks.
2243+
ctx := user.InjectOrgID(context.Background(), userID)
2244+
for j := int64(0); j < 5; j++ {
2245+
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec)
2246+
_, err := i.v2Push(ctx, req)
2247+
require.NoError(t, err)
2248+
}
2249+
2250+
db := i.getTSDB(userID)
2251+
require.NotNil(t, db)
2252+
require.Nil(t, db.Compact())
2253+
2254+
oldBlocks := db.Blocks()
2255+
require.Equal(t, 3, len(oldBlocks))
2256+
2257+
// Saying that we have shipped the second block, so only that should get deleted.
2258+
require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{
2259+
Version: shipper.MetaVersion1,
2260+
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
2261+
}))
2262+
2263+
// Add more samples that could trigger another compaction and hence reload of blocks.
2264+
for j := int64(5); j < 6; j++ {
2265+
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec)
2266+
_, err := i.v2Push(ctx, req)
2267+
require.NoError(t, err)
2268+
}
2269+
require.Nil(t, db.Compact())
2270+
2271+
// Only the second block should be gone along with a new block.
2272+
newBlocks := db.Blocks()
2273+
require.Equal(t, 3, len(newBlocks))
2274+
require.Equal(t, oldBlocks[0].Meta().ULID, newBlocks[0].Meta().ULID) // First block remains same.
2275+
require.Equal(t, oldBlocks[2].Meta().ULID, newBlocks[1].Meta().ULID) // 3rd block becomes 2nd now.
2276+
require.NotEqual(t, oldBlocks[1].Meta().ULID, newBlocks[2].Meta().ULID) // The new block won't match previous 2nd block.
2277+
2278+
// Shipping 2 more blocks, hence all the blocks from first round.
2279+
require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{
2280+
Version: shipper.MetaVersion1,
2281+
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
2282+
}))
2283+
2284+
// Add more samples that could trigger another compaction and hence reload of blocks.
2285+
for j := int64(6); j < 7; j++ {
2286+
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec)
2287+
_, err := i.v2Push(ctx, req)
2288+
require.NoError(t, err)
2289+
}
2290+
require.Nil(t, db.Compact())
2291+
2292+
// All blocks from the old blocks should be gone now.
2293+
newBlocks2 := db.Blocks()
2294+
require.Equal(t, 2, len(newBlocks2))
2295+
2296+
require.Equal(t, newBlocks[2].Meta().ULID, newBlocks2[0].Meta().ULID) // Block created in last round.
2297+
for _, b := range oldBlocks {
2298+
// Second block is not one among old blocks.
2299+
require.NotEqual(t, b.Meta().ULID, newBlocks2[1].Meta().ULID)
2300+
}
2301+
}

0 commit comments

Comments
 (0)