diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index f61e25192c4..5b733717b31 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -122,6 +122,37 @@ Now that the planner has produced a compaction plan for the T1-T3 compaction gro ![Grouping](/images/proposals/timeseries-partitioning-in-compactor-compact.png) T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having duplication information in blocks produced by compaction group 3-8, compaction group 5-8, and compaction group 7-8, we need apply the filter the `%8 == 0` hash, as that’s the hash of the highest partition count. +### Compaction Workflow + +1. Compactor initializes Grouper and Planner. +2. Compactor retrieves block's meta.json and call Grouper to group blocks for compaction. +3. Grouper generates partitioned compaction groups: + 1. Grouper groups source blocks into unpartitioned groups. + 2. For each unpartitioned group: + 1. Generates partitioned compaction group ID which is hash of min and max time of result block. + 2. If the ID exists under the tenant directory in block storage, continue on next unpartitioned group. + 3. Calculates number of partitions. Number of partitions indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. + 4. Assign source blocks into each partition with partition ID (value is in range from 0 to number_of_partitions - 1). Note that one source block could be used in multiple partitions (explanation in [Planning the compaction](#planning-the-compaction) and [Compaction](#compaction)). So multiple partition ID could be assigned to same source block. Check more partitioning examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) + 5. Generates partitioned compaction group that indicates which partition ID each blocks got assigned. + 6. Partitioned compaction group information would be stored in block storage under the tenant directory it belongs to and the stored file can be picked up by cleaner later. Partitioned compaction group information contains partitioned compaction group ID, number of partitions, list of partitions which has partition ID and list of source blocks. + 7. Store partitioned compaction group ID in block storage under each blocks' directory that are used by the generated partitioned compaction group. +4. Grouper returns partitioned compaction groups to Compactor. Each returned group would have partition ID, number of partitions, and list of source blocks in memory. +5. Compactor iterates over each partitioned compaction group. For each iteration, calls Planner to make sure the group is ready for compaction. +6. Planner collects partitioned compaction group which is ready for compaction. + 1. For each partitions in the group and for each blocks in the partition: + 1. Make sure all source blocks fit within the time range of the group. + 2. Make sure each source block with assigned partition IDs is currently not used by another ongoing compaction. This could utilize visit marker file that is introduced in #4805 by expanding it for each partition ID of the source block. + 3. If all blocks in the partition are ready to be compacted, + 1. mark status of those blocks with assigned partition ID as `pending`. + 2. The status information of each partition ID would be stored in block storage under the corresponding block directory in order for cleaner to pick it up later. + 4. If not all blocks in the partition are ready, continue on next partition +7. Return all ready partitions to Compactor. +8. Compactor starts compacting partitioned blocks. Once compaction completed, Compactor would mark status of all blocks along with assigned partition ID in the group as `completed`. Compactor should use partitioned compaction group ID to retrieve partitioned compaction group information from block storage to get all partition IDs assigned to each block. Then, retrieve status information of each partition ID this assigned to block under current block directory in block storage. If all assigned partition ID of the block have status set to `completed`, upload deletion marker for this block. Otherwise, no deletion marker would be uploaded. + +### Clean up Workflow + +Cleaner would periodically check any tenants having deletion marker. If there is a deletion marker for the tenant, Cleaner should remove all blocks and then clean up other files including partitioned group information files after tenant clean up delay. If there is no deletion marker for tenant, Clean should scan any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should delete it. + ## Performance Currently a 400M timeseries takes 12 hours to compact, without taking block download into consideration. If we have a partition count of 2, we can reduce this down to 6 hours, and a partition count of 10 is 3 hours. The scaling is not linear, and I’m still attempting to find out why. The initial result is promising enough to continue though. @@ -264,4 +295,160 @@ T2 partition 4 - Hash(timeseries label) % 4 == 3 && % 8 == 7 T3 partition 8 - Hash(timeseries label) % 8 == 7 ``` +### Compaction Partitioning Examples + +#### Scenario: All source blocks were compacted by partitioning compaction (Idea case) + +All source blocks were previously compacted through partitioning compaction. In this case for each time range, the number of blocks belong to same time range would be 2^x if multiplier is set to 2. + +``` +Time ranges: +T1, T2, T3 + +Source blocks: +T1: B1, B2 +T2: B3, B4, B5, B6 +T3: B7, B8, B9, B10, B11, B12, B13, B14 + +Total indices size of all source blocks: +200G +``` + +Number of Partitions = (200G / 64G = 3.125) => round up to next 2^x = 4 + +Partitioning: +* For T1, there are only 2 blocks which is < 4. So + * B1 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0 or 2. Because 0 % 2 == 0. + * B2 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1 or 3. Because 1 % 2 == 1. +* For T2, + * B3 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0. + * B4 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1. + * B5 (index 2 in the time range) can be grouped with other blocks having N % 4 == 2. + * B6 (index 3 in the time range) can be grouped with other blocks having N % 4 == 3. +* For T3, + * B7 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0. + * B8 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1. + * B9 (index 2 in the time range) can be grouped with other blocks having N % 4 == 2. + * B10 (index 3 in the time range) can be grouped with other blocks having N % 4 == 3. + * B11 (index 4 in the time range) can be grouped with other blocks having N % 4 == 0. + * B12 (index 5 in the time range) can be grouped with other blocks having N % 4 == 1. + * B13 (index 6 in the time range) can be grouped with other blocks having N % 4 == 2. + * B14 (index 7 in the time range) can be grouped with other blocks having N % 4 == 3. + +Partitions in Partitioned Compaction Group: +* Partition ID: 0 \ + Number of Partitions: 4 \ + Blocks: B1, B3, B7, B11 +* Partition ID: 1 \ + Number of Partitions: 4 \ + Blocks: B2, B4, B8, B12 +* Partition ID: 2 \ + Number of Partitions: 4 \ + Blocks: B1, B5, B9, B13 +* Partition ID: 3 \ + Number of Partitions: 4 \ + Blocks: B2, B6, B10, B14 + +--- + +#### Scenario: All source blocks are level 1 blocks + +All source blocks are level 1 blocks. Since number of level 1 blocks in one time range is not guaranteed to be 2^x, all blocks need to be included in each partition. + +``` +Time ranges: +T1 + +Source blocks: +T1: B1, B2, B3 + +Total indices size of all source blocks: +100G +``` + +Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2 + +Partitioning: There is only one time range from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition. + +Partitions in Partitioned Compaction Group: +* Partition ID: 0 \ + Number of Partitions: 2 \ + Blocks: B1, B2, B3 +* Partition ID: 1 \ + Number of Partitions: 2 \ + Blocks: B1, B2, B3 + +--- + +#### Scenario: All source blocks are with compaction level > 1 and were generated by compactor without partitioning compaction + +If source block was generated by compactor without partitioning compaction, there should be only one block per time range. Since there is only one block in one time range, that one block would be included in all partitions. + +``` +Time ranges: +T1, T2, T3 + +Source blocks: +T1: B1 +T2: B2 +T3: B3 + +Total indices size of all source blocks: +100G +``` + +Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2 + +Partitioning: +* For T1, there is only one source block. Include B1 in all partitions. +* For T2, there is only one source block. Include B2 in all partitions. +* For T3, there is only one source block. Include B3 in all partitions. + +Partitions in Partitioned Compaction Group: +* Partition ID: 0 \ + Number of Partitions: 2 \ + Blocks: B1, B2, B3 +* Partition ID: 1 \ + Number of Partitions: 2 \ + Blocks: B1, B2, B3 + +--- + +#### Scenario: All source blocks are with compaction level > 1 and some of them were generated by compactor with partitioning compaction + +Blocks generated by compactor without partitioning compaction would be included in all partitions. Blocks generated with partitioning compaction would be partitioned based on multiplier. + +``` +Time ranges: +T1, T2, T3 + +Source blocks: +T1: B1 (unpartitioned) +T2: B2, B3 +T3: B4, B5, B6, B7 + +Total indices size of all source blocks: +100G +``` + +Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2 + +Partitioning: +* For T1, there is only one source block. Include B1 in all partitions. +* For T2, + * B2 (index 0 in the time range) can be grouped with other blocks having N % 2 == 0. + * B3 (index 1 in the time range) can be grouped with other blocks having N % 2 == 1. +* For T3, + * B4 (index 0 in the time range) can be grouped with other blocks having N % 2 == 0. + * B5 (index 1 in the time range) can be grouped with other blocks having N % 2 == 1. + * B6 (index 2 in the time range) can be grouped with other blocks having N % 2 == 0. + * B7 (index 3 in the time range) can be grouped with other blocks having N % 2 == 1. + +Partitions in Partitioned Compaction Group: +* Partition ID: 0 \ + Number of Partitions: 2 \ + Blocks: B1, B2, B4, B6 +* Partition ID: 1 \ + Number of Partitions: 2 \ + Blocks: B1, B3, B5, B7