You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Update proposal for timeseries partitioning in compactor (#4882)
* Added compaction and clean up workflow to partitioning compactor proposal
Signed-off-by: Alex Le <[email protected]>
* Make workflow part more concise and add additional partition scenario
Signed-off-by: Alex Le <[email protected]>
* Make it clear where to store partition related information and updated some wording
Signed-off-by: Alex Le <[email protected]>
* Added detail in compaction workflow
Signed-off-by: Alex Le <[email protected]>
* Added detail in compaction workflow
Signed-off-by: Alex Le <[email protected]>
* Updated wording
Signed-off-by: Alex Le <[email protected]>
* Updated cleaner logic to be more straightforward and keep checking completion logic in compactor
Signed-off-by: Alex Le <[email protected]>
Signed-off-by: Alex Le <[email protected]>
T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having duplication information in blocks produced by compaction group 3-8, compaction group 5-8, and compaction group 7-8, we need apply the filter the `%8 == 0` hash, as that’s the hash of the highest partition count.
124
124
125
+
### Compaction Workflow
126
+
127
+
1. Compactor initializes Grouper and Planner.
128
+
2. Compactor retrieves block's meta.json and call Grouper to group blocks for compaction.
1. Grouper groups source blocks into unpartitioned groups.
131
+
2. For each unpartitioned group:
132
+
1. Generates partitioned compaction group ID which is hash of min and max time of result block.
133
+
2. If the ID exists under the tenant directory in block storage, continue on next unpartitioned group.
134
+
3. Calculates number of partitions. Number of partitions indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group.
135
+
4. Assign source blocks into each partition with partition ID (value is in range from 0 to number_of_partitions - 1). Note that one source block could be used in multiple partitions (explanation in [Planning the compaction](#planning-the-compaction) and [Compaction](#compaction)). So multiple partition ID could be assigned to same source block. Check more partitioning examples in [Compaction Partitioning Examples](#compaction-partitioning-examples)
136
+
5. Generates partitioned compaction group that indicates which partition ID each blocks got assigned.
137
+
6. Partitioned compaction group information would be stored in block storage under the tenant directory it belongs to and the stored file can be picked up by cleaner later. Partitioned compaction group information contains partitioned compaction group ID, number of partitions, list of partitions which has partition ID and list of source blocks.
138
+
7. Store partitioned compaction group ID in block storage under each blocks' directory that are used by the generated partitioned compaction group.
139
+
4. Grouper returns partitioned compaction groups to Compactor. Each returned group would have partition ID, number of partitions, and list of source blocks in memory.
140
+
5. Compactor iterates over each partitioned compaction group. For each iteration, calls Planner to make sure the group is ready for compaction.
141
+
6. Planner collects partitioned compaction group which is ready for compaction.
142
+
1. For each partitions in the group and for each blocks in the partition:
143
+
1. Make sure all source blocks fit within the time range of the group.
144
+
2. Make sure each source block with assigned partition IDs is currently not used by another ongoing compaction. This could utilize visit marker file that is introduced in #4805 by expanding it for each partition ID of the source block.
145
+
3. If all blocks in the partition are ready to be compacted,
146
+
1. mark status of those blocks with assigned partition ID as `pending`.
147
+
2. The status information of each partition ID would be stored in block storage under the corresponding block directory in order for cleaner to pick it up later.
148
+
4. If not all blocks in the partition are ready, continue on next partition
149
+
7. Return all ready partitions to Compactor.
150
+
8. Compactor starts compacting partitioned blocks. Once compaction completed, Compactor would mark status of all blocks along with assigned partition ID in the group as `completed`. Compactor should use partitioned compaction group ID to retrieve partitioned compaction group information from block storage to get all partition IDs assigned to each block. Then, retrieve status information of each partition ID this assigned to block under current block directory in block storage. If all assigned partition ID of the block have status set to `completed`, upload deletion marker for this block. Otherwise, no deletion marker would be uploaded.
151
+
152
+
### Clean up Workflow
153
+
154
+
Cleaner would periodically check any tenants having deletion marker. If there is a deletion marker for the tenant, Cleaner should remove all blocks and then clean up other files including partitioned group information files after tenant clean up delay. If there is no deletion marker for tenant, Clean should scan any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should delete it.
155
+
125
156
## Performance
126
157
127
158
Currently a 400M timeseries takes 12 hours to compact, without taking block download into consideration. If we have a partition count of 2, we can reduce this down to 6 hours, and a partition count of 10 is 3 hours. The scaling is not linear, and I’m still attempting to find out why. The initial result is promising enough to continue though.
#### Scenario: All source blocks were compacted by partitioning compaction (Idea case)
301
+
302
+
All source blocks were previously compacted through partitioning compaction. In this case for each time range, the number of blocks belong to same time range would be 2^x if multiplier is set to 2.
303
+
304
+
```
305
+
Time ranges:
306
+
T1, T2, T3
307
+
308
+
Source blocks:
309
+
T1: B1, B2
310
+
T2: B3, B4, B5, B6
311
+
T3: B7, B8, B9, B10, B11, B12, B13, B14
312
+
313
+
Total indices size of all source blocks:
314
+
200G
315
+
```
316
+
317
+
Number of Partitions = (200G / 64G = 3.125) => round up to next 2^x = 4
318
+
319
+
Partitioning:
320
+
* For T1, there are only 2 blocks which is < 4. So
321
+
* B1 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0 or 2. Because 0 % 2 == 0.
322
+
* B2 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1 or 3. Because 1 % 2 == 1.
323
+
* For T2,
324
+
* B3 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0.
325
+
* B4 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1.
326
+
* B5 (index 2 in the time range) can be grouped with other blocks having N % 4 == 2.
327
+
* B6 (index 3 in the time range) can be grouped with other blocks having N % 4 == 3.
328
+
* For T3,
329
+
* B7 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0.
330
+
* B8 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1.
331
+
* B9 (index 2 in the time range) can be grouped with other blocks having N % 4 == 2.
332
+
* B10 (index 3 in the time range) can be grouped with other blocks having N % 4 == 3.
333
+
* B11 (index 4 in the time range) can be grouped with other blocks having N % 4 == 0.
334
+
* B12 (index 5 in the time range) can be grouped with other blocks having N % 4 == 1.
335
+
* B13 (index 6 in the time range) can be grouped with other blocks having N % 4 == 2.
336
+
* B14 (index 7 in the time range) can be grouped with other blocks having N % 4 == 3.
337
+
338
+
Partitions in Partitioned Compaction Group:
339
+
* Partition ID: 0 \
340
+
Number of Partitions: 4 \
341
+
Blocks: B1, B3, B7, B11
342
+
* Partition ID: 1 \
343
+
Number of Partitions: 4 \
344
+
Blocks: B2, B4, B8, B12
345
+
* Partition ID: 2 \
346
+
Number of Partitions: 4 \
347
+
Blocks: B1, B5, B9, B13
348
+
* Partition ID: 3 \
349
+
Number of Partitions: 4 \
350
+
Blocks: B2, B6, B10, B14
351
+
352
+
---
353
+
354
+
#### Scenario: All source blocks are level 1 blocks
355
+
356
+
All source blocks are level 1 blocks. Since number of level 1 blocks in one time range is not guaranteed to be 2^x, all blocks need to be included in each partition.
357
+
358
+
```
359
+
Time ranges:
360
+
T1
361
+
362
+
Source blocks:
363
+
T1: B1, B2, B3
364
+
365
+
Total indices size of all source blocks:
366
+
100G
367
+
```
368
+
369
+
Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2
370
+
371
+
Partitioning: There is only one time range from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition.
372
+
373
+
Partitions in Partitioned Compaction Group:
374
+
* Partition ID: 0 \
375
+
Number of Partitions: 2 \
376
+
Blocks: B1, B2, B3
377
+
* Partition ID: 1 \
378
+
Number of Partitions: 2 \
379
+
Blocks: B1, B2, B3
380
+
381
+
---
382
+
383
+
#### Scenario: All source blocks are with compaction level > 1 and were generated by compactor without partitioning compaction
384
+
385
+
If source block was generated by compactor without partitioning compaction, there should be only one block per time range. Since there is only one block in one time range, that one block would be included in all partitions.
386
+
387
+
```
388
+
Time ranges:
389
+
T1, T2, T3
390
+
391
+
Source blocks:
392
+
T1: B1
393
+
T2: B2
394
+
T3: B3
395
+
396
+
Total indices size of all source blocks:
397
+
100G
398
+
```
399
+
400
+
Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2
401
+
402
+
Partitioning:
403
+
* For T1, there is only one source block. Include B1 in all partitions.
404
+
* For T2, there is only one source block. Include B2 in all partitions.
405
+
* For T3, there is only one source block. Include B3 in all partitions.
406
+
407
+
Partitions in Partitioned Compaction Group:
408
+
* Partition ID: 0 \
409
+
Number of Partitions: 2 \
410
+
Blocks: B1, B2, B3
411
+
* Partition ID: 1 \
412
+
Number of Partitions: 2 \
413
+
Blocks: B1, B2, B3
414
+
415
+
---
416
+
417
+
#### Scenario: All source blocks are with compaction level > 1 and some of them were generated by compactor with partitioning compaction
418
+
419
+
Blocks generated by compactor without partitioning compaction would be included in all partitions. Blocks generated with partitioning compaction would be partitioned based on multiplier.
420
+
421
+
```
422
+
Time ranges:
423
+
T1, T2, T3
424
+
425
+
Source blocks:
426
+
T1: B1 (unpartitioned)
427
+
T2: B2, B3
428
+
T3: B4, B5, B6, B7
429
+
430
+
Total indices size of all source blocks:
431
+
100G
432
+
```
433
+
434
+
Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2
435
+
436
+
Partitioning:
437
+
* For T1, there is only one source block. Include B1 in all partitions.
438
+
* For T2,
439
+
* B2 (index 0 in the time range) can be grouped with other blocks having N % 2 == 0.
440
+
* B3 (index 1 in the time range) can be grouped with other blocks having N % 2 == 1.
441
+
* For T3,
442
+
* B4 (index 0 in the time range) can be grouped with other blocks having N % 2 == 0.
443
+
* B5 (index 1 in the time range) can be grouped with other blocks having N % 2 == 1.
444
+
* B6 (index 2 in the time range) can be grouped with other blocks having N % 2 == 0.
445
+
* B7 (index 3 in the time range) can be grouped with other blocks having N % 2 == 1.
0 commit comments