@@ -1056,7 +1056,7 @@ impl AggregatorConfig {
10561056 if !self . timestamp_range ( ) . contains ( & output_timestamp) {
10571057 let delta = ( ts as i64 ) - ( UnixTimestamp :: now ( ) . as_secs ( ) as i64 ) ;
10581058 relay_statsd:: metric!(
1059- histogram( MetricHistograms :: InvalidBucketTimestamp ) = delta as f64
1059+ histogram( MetricHistograms :: InvalidBucketTimestamp ) = delta as f64 ,
10601060 ) ;
10611061 return Err ( AggregateMetricsErrorKind :: InvalidTimestamp ( timestamp) . into ( ) ) ;
10621062 }
@@ -1092,7 +1092,7 @@ impl AggregatorConfig {
10921092 let delay = UnixTimestamp :: now ( ) . as_secs ( ) as i64 - bucket_timestamp. as_secs ( ) as i64 ;
10931093 relay_statsd:: metric!(
10941094 histogram( MetricHistograms :: BucketsDelay ) = delay as f64 ,
1095- backedated = if flush. is_none( ) { "true" } else { "false" } ,
1095+ backdated = if flush. is_none( ) { "true" } else { "false" } ,
10961096 ) ;
10971097
10981098 // If the initial flush time has passed or cannot be represented, debounce future flushes
@@ -1173,7 +1173,6 @@ impl Ord for QueuedBucket {
11731173/// This is cheaper to pass around than a (BucketKey, Bucket) pair.
11741174pub struct HashedBucket {
11751175 // This is only public because pop_flush_buckets is used in benchmark.
1176- // TODO: Find better name for this struct
11771176 hashed_key : u64 ,
11781177 bucket : Bucket ,
11791178}
@@ -1372,8 +1371,8 @@ pub struct FlushBuckets {
13721371/// A message containing a list of [`Metric`]s to be inserted into the aggregator.
13731372#[ derive( Debug ) ]
13741373pub struct InsertMetrics {
1375- project_key : ProjectKey ,
1376- metrics : Vec < Metric > ,
1374+ pub ( crate ) project_key : ProjectKey ,
1375+ pub ( crate ) metrics : Vec < Metric > ,
13771376}
13781377
13791378impl InsertMetrics {
@@ -1403,8 +1402,8 @@ impl InsertMetrics {
14031402/// A message containing a list of [`Bucket`]s to be inserted into the aggregator.
14041403#[ derive( Debug ) ]
14051404pub struct MergeBuckets {
1406- project_key : ProjectKey ,
1407- buckets : Vec < Bucket > ,
1405+ pub ( crate ) project_key : ProjectKey ,
1406+ pub ( crate ) buckets : Vec < Bucket > ,
14081407}
14091408
14101409impl MergeBuckets {
@@ -1506,6 +1505,7 @@ impl FromMessage<MergeBuckets> for Aggregator {
15061505///
15071506/// Receivers must implement a handler for the [`FlushBuckets`] message.
15081507pub struct AggregatorService {
1508+ name : String ,
15091509 config : AggregatorConfig ,
15101510 buckets : HashMap < BucketKey , QueuedBucket > ,
15111511 receiver : Option < Recipient < FlushBuckets , NoResponse > > ,
@@ -1521,8 +1521,18 @@ impl AggregatorService {
15211521 pub fn new (
15221522 config : AggregatorConfig ,
15231523 receiver : Option < Recipient < FlushBuckets , NoResponse > > ,
1524+ ) -> Self {
1525+ Self :: named ( "default" . to_owned ( ) , config, receiver)
1526+ }
1527+
1528+ /// Like [`Self::new`], but with a provided name.
1529+ pub ( crate ) fn named (
1530+ name : String ,
1531+ config : AggregatorConfig ,
1532+ receiver : Option < Recipient < FlushBuckets , NoResponse > > ,
15241533 ) -> Self {
15251534 Self {
1535+ name,
15261536 config,
15271537 buckets : HashMap :: new ( ) ,
15281538 receiver,
@@ -1703,6 +1713,7 @@ impl AggregatorService {
17031713 Entry :: Occupied ( mut entry) => {
17041714 relay_statsd:: metric!(
17051715 counter( MetricCounters :: MergeHit ) += 1 ,
1716+ aggregator = & self . name,
17061717 metric_name = metric_name_tag( & entry. key( ) . metric_name) ,
17071718 ) ;
17081719 let bucket_value = & mut entry. get_mut ( ) . value ;
@@ -1714,10 +1725,12 @@ impl AggregatorService {
17141725 Entry :: Vacant ( entry) => {
17151726 relay_statsd:: metric!(
17161727 counter( MetricCounters :: MergeMiss ) += 1 ,
1728+ aggregator = & self . name,
17171729 metric_name = metric_name_tag( & entry. key( ) . metric_name) ,
17181730 ) ;
17191731 relay_statsd:: metric!(
17201732 set( MetricSets :: UniqueBucketsCreated ) = entry. key( ) . hash64( ) as i64 , // 2-complement
1733+ aggregator = & self . name,
17211734 metric_name = metric_name_tag( & entry. key( ) . metric_name) ,
17221735 ) ;
17231736
@@ -1743,6 +1756,7 @@ impl AggregatorService {
17431756 ) -> Result < ( ) , AggregateMetricsError > {
17441757 relay_statsd:: metric!(
17451758 counter( MetricCounters :: InsertMetric ) += 1 ,
1759+ aggregator = & self . name,
17461760 metric_type = metric. value. ty( ) . as_str( ) ,
17471761 ) ;
17481762 let key = BucketKey {
@@ -1797,7 +1811,10 @@ impl AggregatorService {
17971811 ///
17981812 /// Note that this function is primarily intended for tests.
17991813 pub fn pop_flush_buckets ( & mut self ) -> HashMap < ProjectKey , Vec < HashedBucket > > {
1800- relay_statsd:: metric!( gauge( MetricGauges :: Buckets ) = self . buckets. len( ) as u64 ) ;
1814+ relay_statsd:: metric!(
1815+ gauge( MetricGauges :: Buckets ) = self . buckets. len( ) as u64 ,
1816+ aggregator = & self . name,
1817+ ) ;
18011818
18021819 // We only emit statsd metrics for the cost on flush (and not when merging the buckets),
18031820 // assuming that this gives us more than enough data points.
@@ -1811,37 +1828,41 @@ impl AggregatorService {
18111828
18121829 let mut stats = BTreeMap :: new ( ) ;
18131830
1814- relay_statsd:: metric!( timer( MetricTimers :: BucketsScanDuration ) , {
1815- let bucket_interval = self . config. bucket_interval;
1816- let cost_tracker = & mut self . cost_tracker;
1817- self . buckets. retain( |key, entry| {
1818- if force || entry. elapsed( ) {
1819- // Take the value and leave a placeholder behind. It'll be removed right after.
1820- let value = mem:: replace( & mut entry. value, BucketValue :: Counter ( 0.0 ) ) ;
1821- cost_tracker. subtract_cost( key. project_key, key. cost( ) ) ;
1822- cost_tracker. subtract_cost( key. project_key, value. cost( ) ) ;
1823-
1824- let ( bucket_count, item_count) = stats
1825- . entry( ( metric_type_tag( & value) , metric_name_tag( & key. metric_name) ) )
1826- . or_insert( ( 0usize , 0usize ) ) ;
1827- * bucket_count += 1 ;
1828- * item_count += value. len( ) ;
1829-
1830- let bucket = Bucket :: from_parts( key. clone( ) , bucket_interval, value) ;
1831- buckets
1832- . entry( key. project_key)
1833- . or_default( )
1834- . push( HashedBucket {
1835- hashed_key: key. hash64( ) ,
1836- bucket,
1837- } ) ;
1838-
1839- false
1840- } else {
1841- true
1842- }
1843- } ) ;
1844- } ) ;
1831+ relay_statsd:: metric!(
1832+ timer( MetricTimers :: BucketsScanDuration ) ,
1833+ aggregator = & self . name,
1834+ {
1835+ let bucket_interval = self . config. bucket_interval;
1836+ let cost_tracker = & mut self . cost_tracker;
1837+ self . buckets. retain( |key, entry| {
1838+ if force || entry. elapsed( ) {
1839+ // Take the value and leave a placeholder behind. It'll be removed right after.
1840+ let value = mem:: replace( & mut entry. value, BucketValue :: Counter ( 0.0 ) ) ;
1841+ cost_tracker. subtract_cost( key. project_key, key. cost( ) ) ;
1842+ cost_tracker. subtract_cost( key. project_key, value. cost( ) ) ;
1843+
1844+ let ( bucket_count, item_count) = stats
1845+ . entry( ( metric_type_tag( & value) , metric_name_tag( & key. metric_name) ) )
1846+ . or_insert( ( 0usize , 0usize ) ) ;
1847+ * bucket_count += 1 ;
1848+ * item_count += value. len( ) ;
1849+
1850+ let bucket = Bucket :: from_parts( key. clone( ) , bucket_interval, value) ;
1851+ buckets
1852+ . entry( key. project_key)
1853+ . or_default( )
1854+ . push( HashedBucket {
1855+ hashed_key: key. hash64( ) ,
1856+ bucket,
1857+ } ) ;
1858+
1859+ false
1860+ } else {
1861+ true
1862+ }
1863+ } ) ;
1864+ }
1865+ ) ;
18451866
18461867 for ( ( ty, name) , ( bucket_count, item_count) ) in stats. into_iter ( ) {
18471868 relay_statsd:: metric!(
@@ -1876,7 +1897,8 @@ impl AggregatorService {
18761897
18771898 // Log the distribution of buckets over partition key
18781899 relay_statsd:: metric!(
1879- histogram( MetricHistograms :: PartitionKeys ) = partition_key as f64
1900+ histogram( MetricHistograms :: PartitionKeys ) = partition_key as f64 ,
1901+ aggregator = & self . name,
18801902 ) ;
18811903 }
18821904 partitions
@@ -1895,13 +1917,15 @@ impl AggregatorService {
18951917 . map ( |batch| {
18961918 relay_statsd:: metric!(
18971919 histogram( MetricHistograms :: BucketsPerBatch ) = batch. len( ) as f64 ,
1920+ aggregator = & self . name,
18981921 ) ;
18991922 process ( batch) ;
19001923 } )
19011924 . count ( ) ;
19021925
19031926 relay_statsd:: metric!(
19041927 histogram( MetricHistograms :: BatchesPerPartition ) = num_batches as f64 ,
1928+ aggregator = & self . name,
19051929 ) ;
19061930 }
19071931
@@ -1923,7 +1947,8 @@ impl AggregatorService {
19231947 for ( project_key, project_buckets) in flush_buckets. into_iter ( ) {
19241948 let bucket_count = project_buckets. len ( ) as u64 ;
19251949 relay_statsd:: metric!(
1926- histogram( MetricHistograms :: BucketsFlushedPerProject ) = bucket_count
1950+ histogram( MetricHistograms :: BucketsFlushedPerProject ) = bucket_count,
1951+ aggregator = & self . name,
19271952 ) ;
19281953 total_bucket_count += bucket_count;
19291954
@@ -1941,7 +1966,10 @@ impl AggregatorService {
19411966 } ) ;
19421967 }
19431968 }
1944- relay_statsd:: metric!( histogram( MetricHistograms :: BucketsFlushed ) = total_bucket_count) ;
1969+ relay_statsd:: metric!(
1970+ histogram( MetricHistograms :: BucketsFlushed ) = total_bucket_count,
1971+ aggregator = & self . name,
1972+ ) ;
19451973 }
19461974
19471975 fn handle_accepts_metrics ( & self , sender : Sender < bool > ) {
@@ -2007,9 +2035,8 @@ impl Service for AggregatorService {
20072035 tokio:: spawn ( async move {
20082036 let mut ticker = tokio:: time:: interval ( FLUSH_INTERVAL ) ;
20092037 let mut shutdown = Controller :: shutdown_handle ( ) ;
2010- relay_log:: info!( "aggregator started" ) ;
20112038
2012- // Note that currently this loop never exists and will run till the tokio runtime shuts
2039+ // Note that currently this loop never exits and will run till the tokio runtime shuts
20132040 // down. This is about to change with the refactoring for the shutdown process.
20142041 loop {
20152042 tokio:: select! {
@@ -2022,7 +2049,6 @@ impl Service for AggregatorService {
20222049 else => break ,
20232050 }
20242051 }
2025- relay_log:: info!( "aggregator stopped" ) ;
20262052 } ) ;
20272053 }
20282054}
@@ -2033,7 +2059,8 @@ impl Drop for AggregatorService {
20332059 if remaining_buckets > 0 {
20342060 relay_log:: error!( "metrics aggregator dropping {remaining_buckets} buckets" ) ;
20352061 relay_statsd:: metric!(
2036- counter( MetricCounters :: BucketsDropped ) += remaining_buckets as i64
2062+ counter( MetricCounters :: BucketsDropped ) += remaining_buckets as i64 ,
2063+ aggregator = & self . name,
20372064 ) ;
20382065 }
20392066 }
@@ -2064,8 +2091,6 @@ mod tests {
20642091
20652092 #[ derive( Clone , Default ) ]
20662093 struct TestReceiver {
2067- // TODO: Better way to communicate with service after it's started?
2068- // Messages, maybe?
20692094 data : Arc < RwLock < ReceivedData > > ,
20702095 reject_all : bool ,
20712096 }
@@ -2380,7 +2405,6 @@ mod tests {
23802405 MetricValue :: Distribution ( 2.0 )
23812406 . merge_into ( & mut value)
23822407 . unwrap ( ) ;
2383- // TODO: This should be ordered
23842408 assert_eq ! ( value, BucketValue :: Distribution ( dist![ 1. , 2. , 3. , 2. ] ) ) ;
23852409 }
23862410
@@ -3124,8 +3148,8 @@ mod tests {
31243148 let output = run_test_bucket_partitioning ( None ) ;
31253149 insta:: assert_debug_snapshot!( output, @r###"
31263150 [
3127- "metrics.buckets.per_batch:2|h",
3128- "metrics.buckets.batches_per_partition:1|h",
3151+ "metrics.buckets.per_batch:2|h|#aggregator:default ",
3152+ "metrics.buckets.batches_per_partition:1|h|#aggregator:default ",
31293153 ]
31303154 "### ) ;
31313155 }
@@ -3138,17 +3162,17 @@ mod tests {
31383162 let ( partition_keys, tail) = output. split_at ( 2 ) ;
31393163 insta:: assert_debug_snapshot!( BTreeSet :: from_iter( partition_keys) , @r###"
31403164 {
3141- "metrics.buckets.partition_keys:59|h",
3142- "metrics.buckets.partition_keys:62|h",
3165+ "metrics.buckets.partition_keys:59|h|#aggregator:default ",
3166+ "metrics.buckets.partition_keys:62|h|#aggregator:default ",
31433167 }
31443168 "### ) ;
31453169
31463170 insta:: assert_debug_snapshot!( tail, @r###"
31473171 [
3148- "metrics.buckets.per_batch:1|h",
3149- "metrics.buckets.batches_per_partition:1|h",
3150- "metrics.buckets.per_batch:1|h",
3151- "metrics.buckets.batches_per_partition:1|h",
3172+ "metrics.buckets.per_batch:1|h|#aggregator:default ",
3173+ "metrics.buckets.batches_per_partition:1|h|#aggregator:default ",
3174+ "metrics.buckets.per_batch:1|h|#aggregator:default ",
3175+ "metrics.buckets.batches_per_partition:1|h|#aggregator:default ",
31523176 ]
31533177 "### ) ;
31543178 }
0 commit comments