From 17e8948f1a82cb9b4c5fe81b01a873e848ed2373 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 27 Sep 2022 17:42:18 -0700 Subject: [PATCH 1/7] Added compaction and clean up workflow to partitioning compactor proposal Signed-off-by: Alex Le --- .../timeseries-partitioning-in-compactor.md | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index f61e25192c4..b4a1a620e3c 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -122,6 +122,37 @@ Now that the planner has produced a compaction plan for the T1-T3 compaction gro ![Grouping](/images/proposals/timeseries-partitioning-in-compactor-compact.png) T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having duplication information in blocks produced by compaction group 3-8, compaction group 5-8, and compaction group 7-8, we need apply the filter the `%8 == 0` hash, as that’s the hash of the highest partition count. +### Compaction Workflow + +1. Cortex Compactor initializes grouper and planner. Then, call Thanos Compactor to compact blocks. +2. Thanos Compactor retrieves block meta and call Grouper to group blocks for compaction. +3. Grouper generates compaction plans: + 1. Grouper groups source blocks into unpartitioned groups. + 2. For each unpartitioned group: + 1. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. + 2. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) + 3. Gather all partition IDs got assigned to each source blocks. + 4. For each source block, save compaction plan file into block-storage. Compaction plan file would look like: + ``` + {"partitionIDs": [0, 1, ...]} + ``` +4. Grouper returns compaction plans to Thanos Compactor. +5. Thanos Compactor iterates over each compaction plan (partitioned group). For each iteration, calls Planner to make sure the group is ready for compaction. +6. Planner collects all compaction plans which are ready for compaction. + 1. For each compaction plan and for each blocks in the plan: + 1. Retrieve block partition visit marker file from block-storage based on partition ID of the plan. + 2. If there is no visit marker file or visit marker file is expired and not in completed status, put visit marker file for this block partition in block-storage. The visit marker file would be named like `partition--visit-mark.json` and its content would look like: + ``` + {"compactorID":"","visitTime":"","status":"pending","version":1} + ``` + 4. Otherwise, skip this compaction plan. Because at least one block partition is picked up by other compactor. +7. Return all ready compaction plans to Thanos Compactor. +8. Thanos Compactor starts compacting partitioned blocks from compaction plans. Once compaction completed, Thanos Compactor would upload deletion marker for the source blocks in the plan. Also, it would update `partition--visit-mark.json` to have status set to `completed`. + +### Clean up Workflow + +Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve compaction plan to get all partition IDs assigned to this block. If visit marker files for all partitions have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. + ## Performance Currently a 400M timeseries takes 12 hours to compact, without taking block download into consideration. If we have a partition count of 2, we can reduce this down to 6 hours, and a partition count of 10 is 3 hours. The scaling is not linear, and I’m still attempting to find out why. The initial result is promising enough to continue though. @@ -264,4 +295,128 @@ T2 partition 4 - Hash(timeseries label) % 4 == 3 && % 8 == 7 T3 partition 8 - Hash(timeseries label) % 8 == 7 ``` +### Compaction Partitioning Examples + +#### Ideal case: + +All source blocks were previously compacted through partitioning compaction. In this case for each time interval, the number of blocks belong to same time interval would be 2^x if multiplier is set to 2. + +``` +Time intervals: +T1, T2, T3 + +Source blocks: +T1: B1, B2 +T2: B3, B4, B5, B6 +T3: B7, B8, B9, B10, B11, B12, B13, B14 + +Total indices size of all source blocks: +200G +``` + +Partition Number = (200G / 64G) => round up to next 2^x = 4 + +Partitioning: +* For T1, there are only 2 blocks which is < 4. So + + * B1 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0 or 2. Because 0 % 2 == 0. + * B2 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1 or 3. Because 1 % 2 == 1. + +* For T2, + + * B3 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0. + * B4 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1. + * B5 (index 2 in the time interval) can be grouped with other blocks having N % 4 == 2. + * B6 (index 3 in the time interval) can be grouped with other blocks having N % 4 == 3. + +* For T3, + + * B7 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0. + * B8 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1. + * B9 (index 2 in the time interval) can be grouped with other blocks having N % 4 == 2. + * B10 (index 3 in the time interval) can be grouped with other blocks having N % 4 == 3. + * B11 (index 4 in the time interval) can be grouped with other blocks having N % 4 == 0. + * B12 (index 5 in the time interval) can be grouped with other blocks having N % 4 == 1. + * B13 (index 6 in the time interval) can be grouped with other blocks having N % 4 == 2. + * B14 (index 7 in the time interval) can be grouped with other blocks having N % 4 == 3. + +Compaction Plans: + +* Partition ID: 0 \ + Partition Number: 4 \ + Blocks: B1, B3, B7, B11 +* Partition ID: 1 \ + Partition Number: 4 \ + Blocks: B2, B4, B8, B12 +* Partition ID: 2 \ + Partition Number: 4 \ + Blocks: B1, B5, B9, B13 +* Partition ID: 3 \ + Partition Number: 4 \ + Blocks: B2, B6, B10, B14 + +--- + +#### Only Level 1 Blocks: + +All source blocks are level 1 blocks. Since number of level 1 blocks in one time interval is not guaranteed to be 2^x, all blocks need to be included in each partition. + +``` +Time intervals: +T1 + +Source blocks: +T1: B1, B2, B3 + +Total indices size of all source blocks: +100G +``` + +Partition Number = (100G / 64G) => round up to next 2^x = 2 + +Partitioning: There is only one time interval from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partitions. + +Compaction Plans: + +* Partition ID: 0 \ + Partition Number: 2 \ + Blocks: B1, B2, B3 +* Partition ID: 1 \ + Partition Number: 2 \ + Blocks: B1, B2, B3 + +--- + +#### Legacy High Level Blocks: + +For legacy high level blocks, there would be only one block for each time interval. Since there is only one block in one time interval, that one block would be included in all partitions. + +``` +Time intervals: +T1, T2, T3 + +Source blocks: +T1: B1 +T2: B2 +T3: B3 + +Total indices size of all source blocks: +100G +``` + +Partition Number = (100G / 64G) => round up to next 2^x = 2 + +Partitioning: +* For T1, there is only one source block. Include B1 in all partitions. +* For T2, there is only one source block. Include B2 in all partitions. +* For T3, there is only one source block. Include B3 in all partitions. + +Compaction Plans: + +* Partition ID: 0 \ + Partition Number: 2 \ + Blocks: B1, B2, B3 +* Partition ID: 1 \ + Partition Number: 2 \ + Blocks: B1, B2, B3 From d02b10d13959034ca6ace930b4542662a0a4b8c0 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 29 Sep 2022 17:19:57 -0700 Subject: [PATCH 2/7] Make workflow part more concise and add additional partition scenario Signed-off-by: Alex Le --- .../timeseries-partitioning-in-compactor.md | 93 ++++++++++++------- 1 file changed, 61 insertions(+), 32 deletions(-) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index b4a1a620e3c..2dd28620d3c 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -124,34 +124,27 @@ T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having ### Compaction Workflow -1. Cortex Compactor initializes grouper and planner. Then, call Thanos Compactor to compact blocks. -2. Thanos Compactor retrieves block meta and call Grouper to group blocks for compaction. +1. Compactor initializes Grouper and Planner. +2. Compactor retrieves block's meta.json and call Grouper to group blocks for compaction. 3. Grouper generates compaction plans: 1. Grouper groups source blocks into unpartitioned groups. 2. For each unpartitioned group: 1. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. 2. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) - 3. Gather all partition IDs got assigned to each source blocks. - 4. For each source block, save compaction plan file into block-storage. Compaction plan file would look like: - ``` - {"partitionIDs": [0, 1, ...]} - ``` -4. Grouper returns compaction plans to Thanos Compactor. -5. Thanos Compactor iterates over each compaction plan (partitioned group). For each iteration, calls Planner to make sure the group is ready for compaction. + 3. Generates compaction plan that indicates which partition ID each blocks got assigned. +4. Grouper returns compaction plans to Compactor. +5. Compactor iterates over each compaction plan (partitioned group). For each iteration, calls Planner to make sure the group is ready for compaction. 6. Planner collects all compaction plans which are ready for compaction. 1. For each compaction plan and for each blocks in the plan: - 1. Retrieve block partition visit marker file from block-storage based on partition ID of the plan. - 2. If there is no visit marker file or visit marker file is expired and not in completed status, put visit marker file for this block partition in block-storage. The visit marker file would be named like `partition--visit-mark.json` and its content would look like: - ``` - {"compactorID":"","visitTime":"","status":"pending","version":1} - ``` - 4. Otherwise, skip this compaction plan. Because at least one block partition is picked up by other compactor. -7. Return all ready compaction plans to Thanos Compactor. -8. Thanos Compactor starts compacting partitioned blocks from compaction plans. Once compaction completed, Thanos Compactor would upload deletion marker for the source blocks in the plan. Also, it would update `partition--visit-mark.json` to have status set to `completed`. + 1. Make sure the source block fits within the time range of the group. + 2. Make sure the source block with assigned partition ID is currently not used by another ongoing compaction. + 2. If all blocks in the group are ready to be compacted, mark status of those blocks along with assigned partition ID as `pending`. +7. Return all ready compaction plans to Compactor. +8. Compactor starts compacting partitioned blocks from compaction plans. Once compaction completed, Compactor would upload deletion marker for the source blocks in the plan. Also, it would mark status of all blocks along with assigned partition ID in the group as `completed`. ### Clean up Workflow -Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve compaction plan to get all partition IDs assigned to this block. If visit marker files for all partitions have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. +Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve compaction plan to get all partition IDs assigned to this block. If this source block's all assigned partition ID have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. ## Performance @@ -297,7 +290,7 @@ T3 partition 8 - Hash(timeseries label) % 8 == 7 ### Compaction Partitioning Examples -#### Ideal case: +#### Scenario: All source blocks were compacted by partitioning compaction (Idea case) All source blocks were previously compacted through partitioning compaction. In this case for each time interval, the number of blocks belong to same time interval would be 2^x if multiplier is set to 2. @@ -314,23 +307,18 @@ Total indices size of all source blocks: 200G ``` -Partition Number = (200G / 64G) => round up to next 2^x = 4 +Partition Number = (200G / 64G = 3.125) => round up to next 2^x = 4 Partitioning: * For T1, there are only 2 blocks which is < 4. So - * B1 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0 or 2. Because 0 % 2 == 0. * B2 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1 or 3. Because 1 % 2 == 1. - * For T2, - * B3 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0. * B4 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1. * B5 (index 2 in the time interval) can be grouped with other blocks having N % 4 == 2. * B6 (index 3 in the time interval) can be grouped with other blocks having N % 4 == 3. - * For T3, - * B7 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0. * B8 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1. * B9 (index 2 in the time interval) can be grouped with other blocks having N % 4 == 2. @@ -357,7 +345,7 @@ Compaction Plans: --- -#### Only Level 1 Blocks: +#### Scenario: All source blocks are level 1 blocks All source blocks are level 1 blocks. Since number of level 1 blocks in one time interval is not guaranteed to be 2^x, all blocks need to be included in each partition. @@ -368,13 +356,13 @@ T1 Source blocks: T1: B1, B2, B3 -Total indices size of all source blocks: +Total indices size of all source blocks: 100G ``` -Partition Number = (100G / 64G) => round up to next 2^x = 2 +Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 -Partitioning: There is only one time interval from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partitions. +Partitioning: There is only one time interval from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition. Compaction Plans: @@ -387,9 +375,9 @@ Compaction Plans: --- -#### Legacy High Level Blocks: +#### Scenario: All source blocks are with compaction level > 1 and were generated by compactor without partitioning compaction -For legacy high level blocks, there would be only one block for each time interval. Since there is only one block in one time interval, that one block would be included in all partitions. +If source block was generated by compactor without partitioning compaction, there should be only one block per time interval. Since there is only one block in one time interval, that one block would be included in all partitions. ``` Time intervals: @@ -404,7 +392,7 @@ Total indices size of all source blocks: 100G ``` -Partition Number = (100G / 64G) => round up to next 2^x = 2 +Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 Partitioning: * For T1, there is only one source block. Include B1 in all partitions. @@ -420,3 +408,44 @@ Compaction Plans: Partition Number: 2 \ Blocks: B1, B2, B3 +--- + +#### Scenario: All source blocks are with compaction level > 1 and some of them were generated by compactor with partitioning compaction + +Blocks generated by compactor without partitioning compaction would be included in all partitions. Blocks generated with partitioning compaction would be partitioned based on multiplier. + +``` +Time intervals: +T1, T2, T3 + +Source blocks: +T1: B1 (unpartitioned) +T2: B2, B3 +T3: B4, B5, B6, B7 + +Total indices size of all source blocks: +100G +``` + +Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 + +Partitioning: +* For T1, there is only one source block. Include B1 in all partitions. +* For T2, + * B2 (index 0 in the time interval) can be grouped with other blocks having N % 2 == 0. + * B3 (index 1 in the time interval) can be grouped with other blocks having N % 2 == 1. +* For T3, + * B4 (index 0 in the time interval) can be grouped with other blocks having N % 2 == 0. + * B5 (index 1 in the time interval) can be grouped with other blocks having N % 2 == 1. + * B6 (index 2 in the time interval) can be grouped with other blocks having N % 2 == 0. + * B7 (index 3 in the time interval) can be grouped with other blocks having N % 2 == 1. + +Compaction Plans: + +* Partition ID: 0 \ + Partition Number: 2 \ + Blocks: B1, B2, B4, B6 +* Partition ID: 1 \ + Partition Number: 2 \ + Blocks: B1, B3, B5, B7 + From 452cc198bb97216eb072ffe69d061d5aef100efc Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 30 Sep 2022 11:21:52 -0700 Subject: [PATCH 3/7] Make it clear where to store partition related information and updated some wording Signed-off-by: Alex Le --- .../timeseries-partitioning-in-compactor.md | 86 ++++++++++--------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index 2dd28620d3c..b0bddd3f8d4 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -126,25 +126,27 @@ T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having 1. Compactor initializes Grouper and Planner. 2. Compactor retrieves block's meta.json and call Grouper to group blocks for compaction. -3. Grouper generates compaction plans: +3. Grouper generates compaction groups: 1. Grouper groups source blocks into unpartitioned groups. 2. For each unpartitioned group: 1. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. 2. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) - 3. Generates compaction plan that indicates which partition ID each blocks got assigned. -4. Grouper returns compaction plans to Compactor. -5. Compactor iterates over each compaction plan (partitioned group). For each iteration, calls Planner to make sure the group is ready for compaction. -6. Planner collects all compaction plans which are ready for compaction. - 1. For each compaction plan and for each blocks in the plan: + 3. Generates compaction group that indicates which partition ID each blocks got assigned. + 4. Partitioned compaction group information would be stored in block storage in order for cleaner to pick it up later. +4. Grouper returns compaction groups to Compactor. +5. Compactor iterates over each partitioned compaction group. For each iteration, calls Planner to make sure the group is ready for compaction. +6. Planner collects all compaction groups which are ready for compaction. + 1. For each compaction group and for each blocks in the plan: 1. Make sure the source block fits within the time range of the group. - 2. Make sure the source block with assigned partition ID is currently not used by another ongoing compaction. + 2. Make sure the source block with assigned partition ID is currently not used by another ongoing compaction. This could utilize visit marker file that is introduced in #4805 by expanding it for each partition ID of the source block. 2. If all blocks in the group are ready to be compacted, mark status of those blocks along with assigned partition ID as `pending`. -7. Return all ready compaction plans to Compactor. -8. Compactor starts compacting partitioned blocks from compaction plans. Once compaction completed, Compactor would upload deletion marker for the source blocks in the plan. Also, it would mark status of all blocks along with assigned partition ID in the group as `completed`. + 3. The status information of each partition ID would be stored in block storage in order for cleaner to pick it up later. +7. Return all ready compaction groups to Compactor. +8. Compactor starts compacting partitioned blocks from compaction groups. Once compaction completed, Compactor would upload deletion marker for the source blocks in the plan. Also, it would mark status of all blocks along with assigned partition ID in the group as `completed`. ### Clean up Workflow -Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve compaction plan to get all partition IDs assigned to this block. If this source block's all assigned partition ID have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. +Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve compaction group information from block storage to get all partition IDs assigned to this block as well as status information of each partition ID. If this source block's all assigned partition ID have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. ## Performance @@ -292,10 +294,10 @@ T3 partition 8 - Hash(timeseries label) % 8 == 7 #### Scenario: All source blocks were compacted by partitioning compaction (Idea case) -All source blocks were previously compacted through partitioning compaction. In this case for each time interval, the number of blocks belong to same time interval would be 2^x if multiplier is set to 2. +All source blocks were previously compacted through partitioning compaction. In this case for each time range, the number of blocks belong to same time range would be 2^x if multiplier is set to 2. ``` -Time intervals: +Time ranges: T1, T2, T3 Source blocks: @@ -311,24 +313,24 @@ Partition Number = (200G / 64G = 3.125) => round up to next 2^x = 4 Partitioning: * For T1, there are only 2 blocks which is < 4. So - * B1 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0 or 2. Because 0 % 2 == 0. - * B2 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1 or 3. Because 1 % 2 == 1. + * B1 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0 or 2. Because 0 % 2 == 0. + * B2 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1 or 3. Because 1 % 2 == 1. * For T2, - * B3 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0. - * B4 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1. - * B5 (index 2 in the time interval) can be grouped with other blocks having N % 4 == 2. - * B6 (index 3 in the time interval) can be grouped with other blocks having N % 4 == 3. + * B3 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0. + * B4 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1. + * B5 (index 2 in the time range) can be grouped with other blocks having N % 4 == 2. + * B6 (index 3 in the time range) can be grouped with other blocks having N % 4 == 3. * For T3, - * B7 (index 0 in the time interval) can be grouped with other blocks having N % 4 == 0. - * B8 (index 1 in the time interval) can be grouped with other blocks having N % 4 == 1. - * B9 (index 2 in the time interval) can be grouped with other blocks having N % 4 == 2. - * B10 (index 3 in the time interval) can be grouped with other blocks having N % 4 == 3. - * B11 (index 4 in the time interval) can be grouped with other blocks having N % 4 == 0. - * B12 (index 5 in the time interval) can be grouped with other blocks having N % 4 == 1. - * B13 (index 6 in the time interval) can be grouped with other blocks having N % 4 == 2. - * B14 (index 7 in the time interval) can be grouped with other blocks having N % 4 == 3. + * B7 (index 0 in the time range) can be grouped with other blocks having N % 4 == 0. + * B8 (index 1 in the time range) can be grouped with other blocks having N % 4 == 1. + * B9 (index 2 in the time range) can be grouped with other blocks having N % 4 == 2. + * B10 (index 3 in the time range) can be grouped with other blocks having N % 4 == 3. + * B11 (index 4 in the time range) can be grouped with other blocks having N % 4 == 0. + * B12 (index 5 in the time range) can be grouped with other blocks having N % 4 == 1. + * B13 (index 6 in the time range) can be grouped with other blocks having N % 4 == 2. + * B14 (index 7 in the time range) can be grouped with other blocks having N % 4 == 3. -Compaction Plans: +Compaction Groups: * Partition ID: 0 \ Partition Number: 4 \ @@ -347,10 +349,10 @@ Compaction Plans: #### Scenario: All source blocks are level 1 blocks -All source blocks are level 1 blocks. Since number of level 1 blocks in one time interval is not guaranteed to be 2^x, all blocks need to be included in each partition. +All source blocks are level 1 blocks. Since number of level 1 blocks in one time range is not guaranteed to be 2^x, all blocks need to be included in each partition. ``` -Time intervals: +Time ranges: T1 Source blocks: @@ -362,9 +364,9 @@ Total indices size of all source blocks: Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 -Partitioning: There is only one time interval from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition. +Partitioning: There is only one time range from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition. -Compaction Plans: +Compaction Groups: * Partition ID: 0 \ Partition Number: 2 \ @@ -377,10 +379,10 @@ Compaction Plans: #### Scenario: All source blocks are with compaction level > 1 and were generated by compactor without partitioning compaction -If source block was generated by compactor without partitioning compaction, there should be only one block per time interval. Since there is only one block in one time interval, that one block would be included in all partitions. +If source block was generated by compactor without partitioning compaction, there should be only one block per time range. Since there is only one block in one time range, that one block would be included in all partitions. ``` -Time intervals: +Time ranges: T1, T2, T3 Source blocks: @@ -399,7 +401,7 @@ Partitioning: * For T2, there is only one source block. Include B2 in all partitions. * For T3, there is only one source block. Include B3 in all partitions. -Compaction Plans: +Compaction Groups: * Partition ID: 0 \ Partition Number: 2 \ @@ -415,7 +417,7 @@ Compaction Plans: Blocks generated by compactor without partitioning compaction would be included in all partitions. Blocks generated with partitioning compaction would be partitioned based on multiplier. ``` -Time intervals: +Time ranges: T1, T2, T3 Source blocks: @@ -432,15 +434,15 @@ Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 Partitioning: * For T1, there is only one source block. Include B1 in all partitions. * For T2, - * B2 (index 0 in the time interval) can be grouped with other blocks having N % 2 == 0. - * B3 (index 1 in the time interval) can be grouped with other blocks having N % 2 == 1. + * B2 (index 0 in the time range) can be grouped with other blocks having N % 2 == 0. + * B3 (index 1 in the time range) can be grouped with other blocks having N % 2 == 1. * For T3, - * B4 (index 0 in the time interval) can be grouped with other blocks having N % 2 == 0. - * B5 (index 1 in the time interval) can be grouped with other blocks having N % 2 == 1. - * B6 (index 2 in the time interval) can be grouped with other blocks having N % 2 == 0. - * B7 (index 3 in the time interval) can be grouped with other blocks having N % 2 == 1. + * B4 (index 0 in the time range) can be grouped with other blocks having N % 2 == 0. + * B5 (index 1 in the time range) can be grouped with other blocks having N % 2 == 1. + * B6 (index 2 in the time range) can be grouped with other blocks having N % 2 == 0. + * B7 (index 3 in the time range) can be grouped with other blocks having N % 2 == 1. -Compaction Plans: +Compaction Groups: * Partition ID: 0 \ Partition Number: 2 \ From 838c9aaa10d937fcf68fd9c8937183e84adb78d3 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 30 Sep 2022 14:39:44 -0700 Subject: [PATCH 4/7] Added detail in compaction workflow Signed-off-by: Alex Le --- .../timeseries-partitioning-in-compactor.md | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index b0bddd3f8d4..641bf6c0003 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -126,27 +126,31 @@ T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having 1. Compactor initializes Grouper and Planner. 2. Compactor retrieves block's meta.json and call Grouper to group blocks for compaction. -3. Grouper generates compaction groups: +3. Grouper generates partitioned compaction groups: 1. Grouper groups source blocks into unpartitioned groups. 2. For each unpartitioned group: 1. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. 2. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) - 3. Generates compaction group that indicates which partition ID each blocks got assigned. - 4. Partitioned compaction group information would be stored in block storage in order for cleaner to pick it up later. -4. Grouper returns compaction groups to Compactor. + 3. Generates partitioned compaction group that indicates which partition ID each blocks got assigned. + 4. Generates partitioned compaction group ID which is the hash of min and max time of result block. + 5. Partitioned compaction group information would be stored in block storage under the tenant directory it belongs to and the stored file can be picked up by cleaner later. Partitioned compaction group information contains partitioned compaction group ID, number of partitions, list of partitions which has partition ID and list of source blocks. + 6. Store partitioned compaction group ID in block storage under each blocks' directory that are used by the generated partitioned compaction group. +4. Grouper returns partitioned compaction groups to Compactor. Each returned group would have partition ID, partition number, and list of source blocks in memory. 5. Compactor iterates over each partitioned compaction group. For each iteration, calls Planner to make sure the group is ready for compaction. -6. Planner collects all compaction groups which are ready for compaction. - 1. For each compaction group and for each blocks in the plan: - 1. Make sure the source block fits within the time range of the group. - 2. Make sure the source block with assigned partition ID is currently not used by another ongoing compaction. This could utilize visit marker file that is introduced in #4805 by expanding it for each partition ID of the source block. - 2. If all blocks in the group are ready to be compacted, mark status of those blocks along with assigned partition ID as `pending`. - 3. The status information of each partition ID would be stored in block storage in order for cleaner to pick it up later. -7. Return all ready compaction groups to Compactor. -8. Compactor starts compacting partitioned blocks from compaction groups. Once compaction completed, Compactor would upload deletion marker for the source blocks in the plan. Also, it would mark status of all blocks along with assigned partition ID in the group as `completed`. +6. Planner collects partitioned compaction group which is ready for compaction. + 1. For each partitions in the group and for each blocks in the partition: + 1. Make sure all source blocks fit within the time range of the group. + 2. Make sure each source block with assigned partition IDs is currently not used by another ongoing compaction. This could utilize visit marker file that is introduced in #4805 by expanding it for each partition ID of the source block. + 3. If all blocks in the partition are ready to be compacted, + 1. mark status of those blocks with assigned partition ID as `pending`. + 2. The status information of each partition ID would be stored in block storage under the corresponding block directory in order for cleaner to pick it up later. + 4. If not all blocks in the partition are ready, continue on next partition +7. Return all ready partitions to Compactor. +8. Compactor starts compacting partitioned blocks. Once compaction completed, Compactor would upload deletion marker for the source blocks in the plan. Also, it would mark status of all blocks along with assigned partition ID in the group as `completed`. ### Clean up Workflow -Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve compaction group information from block storage to get all partition IDs assigned to this block as well as status information of each partition ID. If this source block's all assigned partition ID have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. +Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve partitioned compaction group ID under current block directory in block storage and use the ID to retrieve partitioned compaction group information from block storage to get all partition IDs assigned to this block. Then, retrieve status information of each partition ID this block got assigned under current block directory in block storage. If all assigned partition ID have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. ## Performance @@ -330,8 +334,7 @@ Partitioning: * B13 (index 6 in the time range) can be grouped with other blocks having N % 4 == 2. * B14 (index 7 in the time range) can be grouped with other blocks having N % 4 == 3. -Compaction Groups: - +Partitions in Partitioned Compaction Group: * Partition ID: 0 \ Partition Number: 4 \ Blocks: B1, B3, B7, B11 @@ -366,8 +369,7 @@ Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 Partitioning: There is only one time range from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition. -Compaction Groups: - +Partitions in Partitioned Compaction Group: * Partition ID: 0 \ Partition Number: 2 \ Blocks: B1, B2, B3 @@ -401,8 +403,7 @@ Partitioning: * For T2, there is only one source block. Include B2 in all partitions. * For T3, there is only one source block. Include B3 in all partitions. -Compaction Groups: - +Partitions in Partitioned Compaction Group: * Partition ID: 0 \ Partition Number: 2 \ Blocks: B1, B2, B3 @@ -442,8 +443,7 @@ Partitioning: * B6 (index 2 in the time range) can be grouped with other blocks having N % 2 == 0. * B7 (index 3 in the time range) can be grouped with other blocks having N % 2 == 1. -Compaction Groups: - +Partitions in Partitioned Compaction Group: * Partition ID: 0 \ Partition Number: 2 \ Blocks: B1, B2, B4, B6 From edef15e8f20dc098ed5b5ca991bf4ddf1143ad55 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 30 Sep 2022 14:57:23 -0700 Subject: [PATCH 5/7] Added detail in compaction workflow Signed-off-by: Alex Le --- .../timeseries-partitioning-in-compactor.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index 641bf6c0003..634d4b71cf3 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -129,12 +129,13 @@ T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having 3. Grouper generates partitioned compaction groups: 1. Grouper groups source blocks into unpartitioned groups. 2. For each unpartitioned group: - 1. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. - 2. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) - 3. Generates partitioned compaction group that indicates which partition ID each blocks got assigned. - 4. Generates partitioned compaction group ID which is the hash of min and max time of result block. - 5. Partitioned compaction group information would be stored in block storage under the tenant directory it belongs to and the stored file can be picked up by cleaner later. Partitioned compaction group information contains partitioned compaction group ID, number of partitions, list of partitions which has partition ID and list of source blocks. - 6. Store partitioned compaction group ID in block storage under each blocks' directory that are used by the generated partitioned compaction group. + 1. Generates partitioned compaction group ID which is hash of min and max time of result block. + 2. If the ID exists under the tenant directory in block storage, continue on next unpartitioned group. + 3. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. + 4. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) + 5. Generates partitioned compaction group that indicates which partition ID each blocks got assigned. + 6. Partitioned compaction group information would be stored in block storage under the tenant directory it belongs to and the stored file can be picked up by cleaner later. Partitioned compaction group information contains partitioned compaction group ID, number of partitions, list of partitions which has partition ID and list of source blocks. + 7. Store partitioned compaction group ID in block storage under each blocks' directory that are used by the generated partitioned compaction group. 4. Grouper returns partitioned compaction groups to Compactor. Each returned group would have partition ID, partition number, and list of source blocks in memory. 5. Compactor iterates over each partitioned compaction group. For each iteration, calls Planner to make sure the group is ready for compaction. 6. Planner collects partitioned compaction group which is ready for compaction. From 1aab1cc8443372ea45f523e919b55810100b312a Mon Sep 17 00:00:00 2001 From: Alex Le Date: Sat, 1 Oct 2022 12:38:24 -0700 Subject: [PATCH 6/7] Updated wording Signed-off-by: Alex Le --- .../timeseries-partitioning-in-compactor.md | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index 634d4b71cf3..e24e52d67a3 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -131,12 +131,12 @@ T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having 2. For each unpartitioned group: 1. Generates partitioned compaction group ID which is hash of min and max time of result block. 2. If the ID exists under the tenant directory in block storage, continue on next unpartitioned group. - 3. Calculates partition number. Partition number indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. - 4. Assign source blocks into each partition with partition ID (value is in range from 0 to Partition Number - 1). Note that one source block could be used in multiple partitions. So multiple partition ID could be assigned to same source block. Check the examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) + 3. Calculates number of partitions. Number of partitions indicates how many partitions one unpartitioned group would be partitioned into based on the total size of indices and number of time series from each source blocks in the unpartitioned group. + 4. Assign source blocks into each partition with partition ID (value is in range from 0 to number_of_partitions - 1). Note that one source block could be used in multiple partitions (explanation in [Planning the compaction](#planning-the-compaction) and [Compaction](#compaction)). So multiple partition ID could be assigned to same source block. Check more partitioning examples in [Compaction Partitioning Examples](#compaction-partitioning-examples) 5. Generates partitioned compaction group that indicates which partition ID each blocks got assigned. 6. Partitioned compaction group information would be stored in block storage under the tenant directory it belongs to and the stored file can be picked up by cleaner later. Partitioned compaction group information contains partitioned compaction group ID, number of partitions, list of partitions which has partition ID and list of source blocks. 7. Store partitioned compaction group ID in block storage under each blocks' directory that are used by the generated partitioned compaction group. -4. Grouper returns partitioned compaction groups to Compactor. Each returned group would have partition ID, partition number, and list of source blocks in memory. +4. Grouper returns partitioned compaction groups to Compactor. Each returned group would have partition ID, number of partitions, and list of source blocks in memory. 5. Compactor iterates over each partitioned compaction group. For each iteration, calls Planner to make sure the group is ready for compaction. 6. Planner collects partitioned compaction group which is ready for compaction. 1. For each partitions in the group and for each blocks in the partition: @@ -314,7 +314,7 @@ Total indices size of all source blocks: 200G ``` -Partition Number = (200G / 64G = 3.125) => round up to next 2^x = 4 +Number of Partitions = (200G / 64G = 3.125) => round up to next 2^x = 4 Partitioning: * For T1, there are only 2 blocks which is < 4. So @@ -337,16 +337,16 @@ Partitioning: Partitions in Partitioned Compaction Group: * Partition ID: 0 \ - Partition Number: 4 \ + Number of Partitions: 4 \ Blocks: B1, B3, B7, B11 * Partition ID: 1 \ - Partition Number: 4 \ + Number of Partitions: 4 \ Blocks: B2, B4, B8, B12 * Partition ID: 2 \ - Partition Number: 4 \ + Number of Partitions: 4 \ Blocks: B1, B5, B9, B13 * Partition ID: 3 \ - Partition Number: 4 \ + Number of Partitions: 4 \ Blocks: B2, B6, B10, B14 --- @@ -366,16 +366,16 @@ Total indices size of all source blocks: 100G ``` -Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 +Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2 Partitioning: There is only one time range from all source blocks which means it is compacting level 1 blocks. Partitioning needs to include all source blocks in each partition. Partitions in Partitioned Compaction Group: * Partition ID: 0 \ - Partition Number: 2 \ + Number of Partitions: 2 \ Blocks: B1, B2, B3 * Partition ID: 1 \ - Partition Number: 2 \ + Number of Partitions: 2 \ Blocks: B1, B2, B3 --- @@ -397,7 +397,7 @@ Total indices size of all source blocks: 100G ``` -Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 +Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2 Partitioning: * For T1, there is only one source block. Include B1 in all partitions. @@ -406,10 +406,10 @@ Partitioning: Partitions in Partitioned Compaction Group: * Partition ID: 0 \ - Partition Number: 2 \ + Number of Partitions: 2 \ Blocks: B1, B2, B3 * Partition ID: 1 \ - Partition Number: 2 \ + Number of Partitions: 2 \ Blocks: B1, B2, B3 --- @@ -431,7 +431,7 @@ Total indices size of all source blocks: 100G ``` -Partition Number = (100G / 64G = 1.5625) => round up to next 2^x = 2 +Number of Partitions = (100G / 64G = 1.5625) => round up to next 2^x = 2 Partitioning: * For T1, there is only one source block. Include B1 in all partitions. @@ -446,9 +446,9 @@ Partitioning: Partitions in Partitioned Compaction Group: * Partition ID: 0 \ - Partition Number: 2 \ + Number of Partitions: 2 \ Blocks: B1, B2, B4, B6 * Partition ID: 1 \ - Partition Number: 2 \ + Number of Partitions: 2 \ Blocks: B1, B3, B5, B7 From 032b7f8098b81912e1cebeb70207fc25a39f6916 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 20 Oct 2022 15:11:31 -0700 Subject: [PATCH 7/7] Updated cleaner logic to be more straightforward and keep checking completion logic in compactor Signed-off-by: Alex Le --- docs/proposals/timeseries-partitioning-in-compactor.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/proposals/timeseries-partitioning-in-compactor.md b/docs/proposals/timeseries-partitioning-in-compactor.md index e24e52d67a3..5b733717b31 100644 --- a/docs/proposals/timeseries-partitioning-in-compactor.md +++ b/docs/proposals/timeseries-partitioning-in-compactor.md @@ -147,11 +147,11 @@ T1 - Partition 1-2 was created with hash % 2 == 0, and in order to avoid having 2. The status information of each partition ID would be stored in block storage under the corresponding block directory in order for cleaner to pick it up later. 4. If not all blocks in the partition are ready, continue on next partition 7. Return all ready partitions to Compactor. -8. Compactor starts compacting partitioned blocks. Once compaction completed, Compactor would upload deletion marker for the source blocks in the plan. Also, it would mark status of all blocks along with assigned partition ID in the group as `completed`. +8. Compactor starts compacting partitioned blocks. Once compaction completed, Compactor would mark status of all blocks along with assigned partition ID in the group as `completed`. Compactor should use partitioned compaction group ID to retrieve partitioned compaction group information from block storage to get all partition IDs assigned to each block. Then, retrieve status information of each partition ID this assigned to block under current block directory in block storage. If all assigned partition ID of the block have status set to `completed`, upload deletion marker for this block. Otherwise, no deletion marker would be uploaded. ### Clean up Workflow -Cleaner would periodically check any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should retrieve partitioned compaction group ID under current block directory in block storage and use the ID to retrieve partitioned compaction group information from block storage to get all partition IDs assigned to this block. Then, retrieve status information of each partition ID this block got assigned under current block directory in block storage. If all assigned partition ID have status set to `completed`, this source block can be deleted. Otherwise, skip deletion. +Cleaner would periodically check any tenants having deletion marker. If there is a deletion marker for the tenant, Cleaner should remove all blocks and then clean up other files including partitioned group information files after tenant clean up delay. If there is no deletion marker for tenant, Clean should scan any source blocks having a deletion marker. If there is a deletion marker for the block, Cleaner should delete it. ## Performance