Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/sentry/tsdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from django.utils import timezone
from enum import Enum

from sentry.utils.dates import to_timestamp
from sentry.utils.dates import to_datetime, to_timestamp

ONE_MINUTE = 60
ONE_HOUR = ONE_MINUTE * 60
Expand Down Expand Up @@ -181,6 +181,22 @@ def get_optimal_rollup_series(self, start, end=None, rollup=None):

return rollup, sorted(series)

def get_active_series(self, start=None, end=None, timestamp=None):
rollups = {}
for rollup, samples in self.rollups.items():
_, series = self.get_optimal_rollup_series(
start if start is not None else to_datetime(
self.get_earliest_timestamp(
rollup,
timestamp=timestamp,
),
),
end,
rollup=rollup,
)
rollups[rollup] = map(to_datetime, series)
return rollups

def calculate_expiry(self, rollup, samples, timestamp):
"""
Calculate the expiration time for a rollup.
Expand Down Expand Up @@ -232,6 +248,12 @@ def merge(self, model, destination, sources, timestamp=None):
"""
raise NotImplementedError

def delete(self, models, keys, start=None, end=None, timestamp=None):
"""
Delete all counters.
"""
raise NotImplementedError

def get_range(self, model, keys, start, end, rollup=None):
"""
To get a range of data for group ID=[1, 2, 3]:
Expand Down Expand Up @@ -311,6 +333,12 @@ def merge_distinct_counts(self, model, destination, sources, timestamp=None):
"""
raise NotImplementedError

def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None):
"""
Delete all distinct counters.
"""
raise NotImplementedError

def record_frequency_multi(self, requests, timestamp=None):
"""
Record items in a frequency table.
Expand Down Expand Up @@ -380,3 +408,9 @@ def merge_frequencies(self, model, destination, sources, timestamp=None):
key.
"""
raise NotImplementedError

def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None):
"""
Delete all frequency tables.
"""
raise NotImplementedError
9 changes: 9 additions & 0 deletions src/sentry/tsdb/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def incr(self, model, key, timestamp=None, count=1):
def merge(self, model, destination, sources, timestamp=None):
pass

def delete(self, models, keys, start=None, end=None, timestamp=None):
pass

def get_range(self, model, keys, start, end, rollup=None):
_, series = self.get_optimal_rollup_series(start, end, rollup)
return {k: [(ts, 0) for ts in series] for k in keys}
Expand All @@ -40,6 +43,9 @@ def get_distinct_counts_union(self, model, keys, start, end=None, rollup=None):
def merge_distinct_counts(self, model, destination, sources, timestamp=None):
pass

def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None):
pass

def record_frequency_multi(self, requests, timestamp=None):
pass

Expand Down Expand Up @@ -72,3 +78,6 @@ def get_frequency_totals(self, model, items, start, end=None, rollup=None):

def merge_frequencies(self, model, destination, sources, timestamp=None):
pass

def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None):
pass
39 changes: 39 additions & 0 deletions src/sentry/tsdb/inmemory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ def merge(self, model, destination, sources, timestamp=None):
for bucket, count in self.data[model].pop(source, {}).items():
destination[bucket] += count

def delete(self, models, keys, start=None, end=None, timestamp=None):
rollups = self.get_active_series(start, end, timestamp)

for rollup, series in rollups.items():
for model in models:
for key in keys:
data = self.data[model][key]
for timestamp in series:
data.pop(
self.normalize_to_rollup(timestamp, rollup),
0,
)

def get_range(self, model, keys, start, end, rollup=None):
rollup, series = self.get_optimal_rollup_series(start, end, rollup)

Expand Down Expand Up @@ -112,6 +125,19 @@ def merge_distinct_counts(self, model, destination, sources, timestamp=None):
for bucket, values in self.sets[model].pop(source, {}).items():
destination[bucket].update(values)

def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None):
rollups = self.get_active_series(start, end, timestamp)

for rollup, series in rollups.items():
for model in models:
for key in keys:
data = self.data[model][key]
for timestamp in series:
data.pop(
self.normalize_to_rollup(timestamp, rollup),
set(),
)

def flush(self):
# self.data[model][key][rollup] = count
self.data = defaultdict(
Expand Down Expand Up @@ -198,3 +224,16 @@ def merge_frequencies(self, model, destination, sources, timestamp=None):
for source in sources:
for bucket, counter in self.data[model].pop(source, {}).items():
destination[bucket].update(counter)

def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None):
rollups = self.get_active_series(start, end, timestamp)

for rollup, series in rollups.items():
for model in models:
for key in keys:
data = self.data[model][key]
for timestamp in series:
data.pop(
self.normalize_to_rollup(timestamp, rollup),
Counter(),
)
64 changes: 48 additions & 16 deletions src/sentry/tsdb/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,7 @@ def get_range(self, model, keys, start, end, rollup=None):
return dict(results_by_key)

def merge(self, model, destination, sources, timestamp=None):
rollups = {}
for rollup, samples in self.rollups.items():
_, series = self.get_optimal_rollup_series(
to_datetime(self.get_earliest_timestamp(rollup, timestamp=timestamp)),
end=None,
rollup=rollup,
)
rollups[rollup] = map(to_datetime, series)
rollups = self.get_active_series(timestamp=timestamp)

with self.cluster.map() as client:
data = {}
Expand Down Expand Up @@ -265,6 +258,24 @@ def merge(self, model, destination, sources, timestamp=None):
),
)

def delete(self, models, keys, start=None, end=None, timestamp=None):
rollups = self.get_active_series(start, end, timestamp)

with self.cluster.map() as client:
for rollup, series in rollups.items():
for timestamp in series:
for model in models:
for key in keys:
model_key = self.get_model_key(key)
client.hdel(
Copy link
Contributor

@LewisJEllis LewisJEllis Apr 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth trying to think "are we going to end up just deleting all keys in this hash?" and then just delete the whole toplevel hash key instead of each entry one by one, or is that not going to be happening much?

self.make_counter_key(
model,
self.normalize_to_rollup(timestamp, rollup),
model_key,
),
model_key,
)

def record(self, model, key, values, timestamp=None):
self.record_multi(((model, key, values),), timestamp)

Expand Down Expand Up @@ -434,14 +445,7 @@ def merge_aggregates(values):
)

def merge_distinct_counts(self, model, destination, sources, timestamp=None):
rollups = {}
for rollup, samples in self.rollups.items():
_, series = self.get_optimal_rollup_series(
to_datetime(self.get_earliest_timestamp(rollup, timestamp=timestamp)),
end=None,
rollup=rollup,
)
rollups[rollup] = map(to_datetime, series)
rollups = self.get_active_series(timestamp=timestamp)

temporary_id = uuid.uuid1().hex

Expand Down Expand Up @@ -499,6 +503,23 @@ def make_temporary_key(key):
),
)

def delete_distinct_counts(self, models, keys, start=None, end=None, timestamp=None):
rollups = self.get_active_series(start, end, timestamp)

with self.cluster.map() as client:
for rollup, series in rollups.items():
for timestamp in series:
for model in models:
for key in keys:
client.delete(
self.make_key(
model,
rollup,
to_timestamp(timestamp),
key,
)
)

def make_frequency_table_keys(self, model, rollup, timestamp, key):
prefix = self.make_key(model, rollup, timestamp, key)
return map(
Expand Down Expand Up @@ -697,3 +718,14 @@ def merge_frequencies(self, model, destination, sources, timestamp=None):
self.cluster.execute_commands({
destination: imports,
})

def delete_frequencies(self, models, keys, start=None, end=None, timestamp=None):
rollups = self.get_active_series(start, end, timestamp)

with self.cluster.map() as client:
for rollup, series in rollups.items():
for timestamp in series:
for model in models:
for key in keys:
for k in self.make_frequency_table_keys(model, rollup, to_timestamp(timestamp), key):
client.delete(k)
32 changes: 32 additions & 0 deletions tests/sentry/tsdb/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def timestamp(d):
2: 0,
}

self.db.delete([TSDBModel.project], [1, 2], dts[0], dts[-1])

results = self.db.get_sums(TSDBModel.project, [1, 2], dts[0], dts[-1])
assert results == {
1: 0,
2: 0,
}

def test_count_distinct(self):
now = datetime.utcnow().replace(tzinfo=pytz.UTC) - timedelta(hours=4)
dts = [now + timedelta(hours=i) for i in range(4)]
Expand Down Expand Up @@ -211,6 +219,14 @@ def timestamp(d):
assert self.db.get_distinct_counts_union(model, [1, 2], dts[0], dts[-1], rollup=3600) == 3
assert self.db.get_distinct_counts_union(model, [2], dts[0], dts[-1], rollup=3600) == 0

self.db.delete_distinct_counts([model], [1, 2], dts[0], dts[-1])

results = self.db.get_distinct_counts_totals(model, [1, 2], dts[0], dts[-1])
assert results == {
1: 0,
2: 0,
}

def test_frequency_tables(self):
now = datetime.utcnow().replace(tzinfo=pytz.UTC)
model = TSDBModel.frequent_projects_by_organization
Expand Down Expand Up @@ -386,6 +402,22 @@ def test_frequency_tables(self):
},
}

self.db.delete_frequencies(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be nice to have a test case where we delete just a sub-time-interval of the data and assert on the stuff that remains? current cases all just gut the dataset entirely unless I'm misreading something

[model],
['organization:1', 'organization:2'],
now - timedelta(hours=1),
now,
)

assert self.db.get_most_frequent(
model,
('organization:1', 'organization:2'),
now,
) == {
'organization:1': [],
'organization:2': [],
}

def test_frequency_table_import_export_no_estimators(self):
client = self.db.cluster.get_local_client_for_key('key')

Expand Down