From 6bbc92be269526f461aff4ef261d5b3bb3a77a43 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Thu, 27 Apr 2017 17:26:22 -0700 Subject: [PATCH 1/7] Add deletion methods to abstract TSDB backend. --- src/sentry/tsdb/base.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/sentry/tsdb/base.py b/src/sentry/tsdb/base.py index cac6b7ec4a98ca..fcbcf7500acf00 100644 --- a/src/sentry/tsdb/base.py +++ b/src/sentry/tsdb/base.py @@ -232,6 +232,12 @@ def merge(self, model, destination, sources, timestamp=None): """ raise NotImplementedError + def delete(self, models, keys, start=None, end=None, timestamp=None): + """ + Delete all counters. + """ + raise NotImplementedError + def get_range(self, model, keys, start, end, rollup=None): """ To get a range of data for group ID=[1, 2, 3]: @@ -311,6 +317,12 @@ def merge_distinct_counts(self, model, destination, sources, timestamp=None): """ raise NotImplementedError + def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None): + """ + Delete all distinct counters. + """ + raise NotImplementedError + def record_frequency_multi(self, requests, timestamp=None): """ Record items in a frequency table. @@ -380,3 +392,9 @@ def merge_frequencies(self, model, destination, sources, timestamp=None): key. """ raise NotImplementedError + + def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None): + """ + Delete all distinct counters. + """ + raise NotImplementedError From 77ef560216562e45f689e435b04666b8b04249de Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Thu, 27 Apr 2017 17:43:37 -0700 Subject: [PATCH 2/7] Add deletion methods to `RedisTSDB` --- src/sentry/tsdb/redis.py | 82 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/src/sentry/tsdb/redis.py b/src/sentry/tsdb/redis.py index a660b4c91f34ea..d03ff8686ff516 100644 --- a/src/sentry/tsdb/redis.py +++ b/src/sentry/tsdb/redis.py @@ -265,6 +265,36 @@ def merge(self, model, destination, sources, timestamp=None): ), ) + def delete(self, models, keys, start=None, end=None, timestamp=None): + rollups = {} + for rollup, samples in self.rollups.items(): + _, series = self.get_optimal_rollup_series( + start if start is not None else to_datetime( + self.get_earliest_timestamp( + rollup, + timestamp=timestamp if timestamp is not None else timezone.now(), + ), + ), + end=end, + rollup=rollup, + ) + rollups[rollup] = map(to_datetime, series) + + with self.cluster.map() as client: + for rollup, series in rollups.items(): + for timestamp in series: + for model in models: + for key in keys: + model_key = self.get_model_key(key) + client.hdel( + self.make_counter_key( + model, + self.normalize_to_rollup(timestamp, rollup), + model_key, + ), + model_key, + ) + def record(self, model, key, values, timestamp=None): self.record_multi(((model, key, values),), timestamp) @@ -499,6 +529,35 @@ def make_temporary_key(key): ), ) + def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None): + rollups = {} + for rollup, samples in self.rollups.items(): + _, series = self.get_optimal_rollup_series( + start if start is not None else to_datetime( + self.get_earliest_timestamp( + rollup, + timestamp=timestamp if timestamp is not None else timezone.now(), + ), + ), + end=end, + rollup=rollup, + ) + rollups[rollup] = map(to_datetime, series) + + with self.cluster.map() as client: + for rollup, series in rollups.items(): + for timestamp in series: + for model in models: + for key in keys: + client.delete( + self.make_key( + model, + rollup, + timestamp, + key, + ) + ) + def make_frequency_table_keys(self, model, rollup, timestamp, key): prefix = self.make_key(model, rollup, timestamp, key) return map( @@ -697,3 +756,26 @@ def merge_frequencies(self, model, destination, sources, timestamp=None): self.cluster.execute_commands({ destination: imports, }) + + def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None): + rollups = {} + for rollup, samples in self.rollups.items(): + _, series = self.get_optimal_rollup_series( + start if start is not None else to_datetime( + self.get_earliest_timestamp( + rollup, + timestamp=timestamp if timestamp is not None else timezone.now(), + ), + ), + end=end, + rollup=rollup, + ) + rollups[rollup] = map(to_datetime, series) + + with self.cluster.map() as client: + for rollup, series in rollups.items(): + for timestamp in series: + for model in models: + for key in keys: + for k in self.make_frequency_table_keys(model, rollup, to_timestamp(timestamp), key): + client.delete(k) From dd92fa0fd10e2c4d61fa8a8bc42a6b159255527f Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 28 Apr 2017 11:37:58 -0700 Subject: [PATCH 3/7] Factor out `get_active_series` method. (This name is terrible, sorry.) --- src/sentry/tsdb/base.py | 18 +++++++++++- src/sentry/tsdb/redis.py | 60 ++++------------------------------------ 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/src/sentry/tsdb/base.py b/src/sentry/tsdb/base.py index fcbcf7500acf00..1ceacee12bffb6 100644 --- a/src/sentry/tsdb/base.py +++ b/src/sentry/tsdb/base.py @@ -15,7 +15,7 @@ from django.utils import timezone from enum import Enum -from sentry.utils.dates import to_timestamp +from sentry.utils.dates import to_datetime, to_timestamp ONE_MINUTE = 60 ONE_HOUR = ONE_MINUTE * 60 @@ -181,6 +181,22 @@ def get_optimal_rollup_series(self, start, end=None, rollup=None): return rollup, sorted(series) + def get_active_series(self, start=None, end=None, timestamp=None): + rollups = {} + for rollup, samples in self.rollups.items(): + _, series = self.get_optimal_rollup_series( + start if start is not None else to_datetime( + self.get_earliest_timestamp( + rollup, + timestamp=timestamp, + ), + ), + end, + rollup=rollup, + ) + rollups[rollup] = map(to_datetime, series) + return rollups + def calculate_expiry(self, rollup, samples, timestamp): """ Calculate the expiration time for a rollup. diff --git a/src/sentry/tsdb/redis.py b/src/sentry/tsdb/redis.py index d03ff8686ff516..fd70fe116f7cb2 100644 --- a/src/sentry/tsdb/redis.py +++ b/src/sentry/tsdb/redis.py @@ -214,14 +214,7 @@ def get_range(self, model, keys, start, end, rollup=None): return dict(results_by_key) def merge(self, model, destination, sources, timestamp=None): - rollups = {} - for rollup, samples in self.rollups.items(): - _, series = self.get_optimal_rollup_series( - to_datetime(self.get_earliest_timestamp(rollup, timestamp=timestamp)), - end=None, - rollup=rollup, - ) - rollups[rollup] = map(to_datetime, series) + rollups = self.get_active_series(timestamp=timestamp) with self.cluster.map() as client: data = {} @@ -266,19 +259,7 @@ def merge(self, model, destination, sources, timestamp=None): ) def delete(self, models, keys, start=None, end=None, timestamp=None): - rollups = {} - for rollup, samples in self.rollups.items(): - _, series = self.get_optimal_rollup_series( - start if start is not None else to_datetime( - self.get_earliest_timestamp( - rollup, - timestamp=timestamp if timestamp is not None else timezone.now(), - ), - ), - end=end, - rollup=rollup, - ) - rollups[rollup] = map(to_datetime, series) + rollups = self.get_active_series(start, end, timestamp) with self.cluster.map() as client: for rollup, series in rollups.items(): @@ -464,14 +445,7 @@ def merge_aggregates(values): ) def merge_distinct_counts(self, model, destination, sources, timestamp=None): - rollups = {} - for rollup, samples in self.rollups.items(): - _, series = self.get_optimal_rollup_series( - to_datetime(self.get_earliest_timestamp(rollup, timestamp=timestamp)), - end=None, - rollup=rollup, - ) - rollups[rollup] = map(to_datetime, series) + rollups = self.get_active_series(timestamp=timestamp) temporary_id = uuid.uuid1().hex @@ -530,19 +504,7 @@ def make_temporary_key(key): ) def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None): - rollups = {} - for rollup, samples in self.rollups.items(): - _, series = self.get_optimal_rollup_series( - start if start is not None else to_datetime( - self.get_earliest_timestamp( - rollup, - timestamp=timestamp if timestamp is not None else timezone.now(), - ), - ), - end=end, - rollup=rollup, - ) - rollups[rollup] = map(to_datetime, series) + rollups = self.get_active_series(start, end, timestamp) with self.cluster.map() as client: for rollup, series in rollups.items(): @@ -758,19 +720,7 @@ def merge_frequencies(self, model, destination, sources, timestamp=None): }) def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None): - rollups = {} - for rollup, samples in self.rollups.items(): - _, series = self.get_optimal_rollup_series( - start if start is not None else to_datetime( - self.get_earliest_timestamp( - rollup, - timestamp=timestamp if timestamp is not None else timezone.now(), - ), - ), - end=end, - rollup=rollup, - ) - rollups[rollup] = map(to_datetime, series) + rollups = self.get_active_series(start, end, timestamp) with self.cluster.map() as client: for rollup, series in rollups.items(): From 32132315d574b506122f7dc8788b1af7134e1ffd Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 28 Apr 2017 13:11:12 -0700 Subject: [PATCH 4/7] Add test coverage to `RedisTSDB`. --- src/sentry/tsdb/redis.py | 2 +- tests/sentry/tsdb/test_redis.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/sentry/tsdb/redis.py b/src/sentry/tsdb/redis.py index fd70fe116f7cb2..a5822e4f4ce3aa 100644 --- a/src/sentry/tsdb/redis.py +++ b/src/sentry/tsdb/redis.py @@ -515,7 +515,7 @@ def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=N self.make_key( model, rollup, - timestamp, + to_timestamp(timestamp), key, ) ) diff --git a/tests/sentry/tsdb/test_redis.py b/tests/sentry/tsdb/test_redis.py index fa8f086061eabd..f0f44a3686c501 100644 --- a/tests/sentry/tsdb/test_redis.py +++ b/tests/sentry/tsdb/test_redis.py @@ -109,6 +109,14 @@ def timestamp(d): 2: 0, } + self.db.delete([TSDBModel.project], [1, 2], dts[0], dts[-1]) + + results = self.db.get_sums(TSDBModel.project, [1, 2], dts[0], dts[-1]) + assert results == { + 1: 0, + 2: 0, + } + def test_count_distinct(self): now = datetime.utcnow().replace(tzinfo=pytz.UTC) - timedelta(hours=4) dts = [now + timedelta(hours=i) for i in range(4)] @@ -211,6 +219,14 @@ def timestamp(d): assert self.db.get_distinct_counts_union(model, [1, 2], dts[0], dts[-1], rollup=3600) == 3 assert self.db.get_distinct_counts_union(model, [2], dts[0], dts[-1], rollup=3600) == 0 + self.db.delete_distinct_counts([model], [1, 2], dts[0], dts[-1]) + + results = self.db.get_distinct_counts_totals(model, [1, 2], dts[0], dts[-1]) + assert results == { + 1: 0, + 2: 0, + } + def test_frequency_tables(self): now = datetime.utcnow().replace(tzinfo=pytz.UTC) model = TSDBModel.frequent_projects_by_organization @@ -386,6 +402,22 @@ def test_frequency_tables(self): }, } + self.db.delete_frequencies( + [model], + ['organization:1', 'organization:2'], + now - timedelta(hours=1), + now, + ) + + assert self.db.get_most_frequent( + model, + ('organization:1', 'organization:2'), + now, + ) == { + 'organization:1': [], + 'organization:2': [], + } + def test_frequency_table_import_export_no_estimators(self): client = self.db.cluster.get_local_client_for_key('key') From 54fdc154cb2fd2eaba6c018741225a3ff26359c5 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 28 Apr 2017 13:16:17 -0700 Subject: [PATCH 5/7] Add deletion methods to `DummyTSDB`. --- src/sentry/tsdb/dummy.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/sentry/tsdb/dummy.py b/src/sentry/tsdb/dummy.py index 0d6cdd968a6e61..348f47cb4d88a6 100644 --- a/src/sentry/tsdb/dummy.py +++ b/src/sentry/tsdb/dummy.py @@ -20,6 +20,9 @@ def incr(self, model, key, timestamp=None, count=1): def merge(self, model, destination, sources, timestamp=None): pass + def delete(self, models, keys, start=None, end=None, timestamp=None): + pass + def get_range(self, model, keys, start, end, rollup=None): _, series = self.get_optimal_rollup_series(start, end, rollup) return {k: [(ts, 0) for ts in series] for k in keys} @@ -40,6 +43,9 @@ def get_distinct_counts_union(self, model, keys, start, end=None, rollup=None): def merge_distinct_counts(self, model, destination, sources, timestamp=None): pass + def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None): + pass + def record_frequency_multi(self, requests, timestamp=None): pass @@ -72,3 +78,6 @@ def get_frequency_totals(self, model, items, start, end=None, rollup=None): def merge_frequencies(self, model, destination, sources, timestamp=None): pass + + def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None): + pass From 1f07b7285eb0efa01bf5126e3b99f3cfb7f4bf30 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 28 Apr 2017 13:55:01 -0700 Subject: [PATCH 6/7] Add deletion methods to `InMemoryTSDB`. --- src/sentry/tsdb/inmemory.py | 39 +++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/sentry/tsdb/inmemory.py b/src/sentry/tsdb/inmemory.py index 8e2cc82e5f1060..1b04f52383e315 100644 --- a/src/sentry/tsdb/inmemory.py +++ b/src/sentry/tsdb/inmemory.py @@ -40,6 +40,19 @@ def merge(self, model, destination, sources, timestamp=None): for bucket, count in self.data[model].pop(source, {}).items(): destination[bucket] += count + def delete(self, models, keys, start=None, end=None, timestamp=None): + rollups = self.get_active_series(start, end, timestamp) + + for rollup, series in rollups.items(): + for model in models: + for key in keys: + data = self.data[model][key] + for timestamp in series: + data.pop( + self.normalize_to_rollup(timestamp, rollup), + 0, + ) + def get_range(self, model, keys, start, end, rollup=None): rollup, series = self.get_optimal_rollup_series(start, end, rollup) @@ -112,6 +125,19 @@ def merge_distinct_counts(self, model, destination, sources, timestamp=None): for bucket, values in self.sets[model].pop(source, {}).items(): destination[bucket].update(values) + def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None): + rollups = self.get_active_series(start, end, timestamp) + + for rollup, series in rollups.items(): + for model in models: + for key in keys: + data = self.data[model][key] + for timestamp in series: + data.pop( + self.normalize_to_rollup(timestamp, rollup), + set(), + ) + def flush(self): # self.data[model][key][rollup] = count self.data = defaultdict( @@ -198,3 +224,16 @@ def merge_frequencies(self, model, destination, sources, timestamp=None): for source in sources: for bucket, counter in self.data[model].pop(source, {}).items(): destination[bucket].update(counter) + + def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None): + rollups = self.get_active_series(start, end, timestamp) + + for rollup, series in rollups.items(): + for model in models: + for key in keys: + data = self.data[model][key] + for timestamp in series: + data.pop( + self.normalize_to_rollup(timestamp, rollup), + Counter(), + ) From 7cb738e3591cdb2c8476197fa55c6a588ce773ad Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 28 Apr 2017 14:04:22 -0700 Subject: [PATCH 7/7] Fix bad copy/paste comment. --- src/sentry/tsdb/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/tsdb/base.py b/src/sentry/tsdb/base.py index 1ceacee12bffb6..9c493bc635fb4e 100644 --- a/src/sentry/tsdb/base.py +++ b/src/sentry/tsdb/base.py @@ -411,6 +411,6 @@ def merge_frequencies(self, model, destination, sources, timestamp=None): def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None): """ - Delete all distinct counters. + Delete all frequency tables. """ raise NotImplementedError