Skip to content

Commit 690bf95

Browse files
authored
Merge pull request #564 from cmu-delphi/sgratzl/covidcast_meta_file
create meta data covidcast file + use in /meta
2 parents be502e8 + 21917fe commit 690bf95

File tree

11 files changed

+881
-287
lines changed

11 files changed

+881
-287
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484
docker stop delphi_database_epidata delphi_web_epidata
8585
docker network remove delphi-net
8686
87-
build_js_clients:
87+
build_js_client:
8888
runs-on: ubuntu-latest
8989
defaults:
9090
run:
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
name: Update Google Docs Meta Data
2+
on:
3+
workflow_dispatch:
4+
jobs:
5+
update_gdocs:
6+
runs-on: ubuntu-latest
7+
steps:
8+
- name: Check out code
9+
uses: actions/checkout@v2
10+
with:
11+
branch: dev
12+
ssh-key: ${{ secrets.CMU_DELPHI_DEPLOY_MACHINE_SSH }}
13+
- name: Set up Python 3.8
14+
uses: actions/setup-python@v2
15+
with:
16+
python-version: 3.8
17+
- uses: actions/cache@v2
18+
with:
19+
path: ~/.cache/pip
20+
key: ${{ runner.os }}-pipd-${{ hashFiles('requirements-dev.txt') }}
21+
restore-keys: |
22+
${{ runner.os }}-pipd-
23+
- name: Install Dependencies
24+
run: pip install -r requirements-dev.txt
25+
- name: Update Docs
26+
run: inv update-gdoc
27+
- name: Create pull request into dev
28+
uses: peter-evans/create-pull-request@v3
29+
with:
30+
branch: bot/update-docs
31+
commit-message: 'chore: update docs'
32+
title: Update Google Docs Meta Data
33+
labels: chore
34+
reviewers: krivard
35+
assignees: krivard
36+
body: |
37+
Updating Google Docs Meta Data

integrations/server/test_covidcast_endpoints.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def test_meta(self):
339339
"""Request a signal the /meta endpoint."""
340340

341341
num_rows = 10
342-
rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(num_rows)]
342+
rows = [CovidcastRow(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)]
343343
self._insert_rows(rows)
344344
first = rows[0]
345345
last = rows[-1]
@@ -349,7 +349,10 @@ def test_meta(self):
349349
with self.subTest("plain"):
350350
out = self._fetch("/meta")
351351
self.assertEqual(len(out), 1)
352-
stats = out[0]
352+
data_source = out[0]
353+
self.assertEqual(data_source["source"], first.source)
354+
self.assertEqual(len(data_source["signals"]), 1)
355+
stats = data_source["signals"][0]
353356
self.assertEqual(stats["source"], first.source)
354357
self.assertEqual(stats["signal"], first.signal)
355358
self.assertEqual(stats["min_time"], first.time_value)
@@ -364,7 +367,11 @@ def test_meta(self):
364367
with self.subTest("filtered"):
365368
out = self._fetch("/meta", signal=f"{first.source}:*")
366369
self.assertEqual(len(out), 1)
367-
self.assertEqual(out[0]["source"], first.source)
370+
data_source = out[0]
371+
self.assertEqual(data_source["source"], first.source)
372+
self.assertEqual(len(data_source["signals"]), 1)
373+
stats = data_source["signals"][0]
374+
self.assertEqual(stats["source"], first.source)
368375
out = self._fetch("/meta", signal=f"{first.source}:X")
369376
self.assertEqual(len(out), 0)
370377

requirements.dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ mypy>=0.790
55
pytest
66
tenacity==7.0.0
77
bump2version
8+
requests

src/server/endpoints/covidcast.py

Lines changed: 115 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Optional, Union, Tuple, Dict, Any, Set
1+
from typing import List, Optional, Union, Tuple, Dict, Any
22
from itertools import groupby
33
from datetime import date, datetime, timedelta
44
from flask import Blueprint, request
@@ -33,8 +33,9 @@
3333
require_any,
3434
)
3535
from .._pandas import as_pandas, print_pandas
36-
from .covidcast_utils import compute_trend, compute_trends, compute_correlations, compute_trend_value, CovidcastMetaEntry, AllSignalsMap
36+
from .covidcast_utils import compute_trend, compute_trends, compute_correlations, compute_trend_value, CovidcastMetaEntry
3737
from ..utils import shift_time_value, date_to_time_value, time_value_to_iso, time_value_to_date
38+
from .covidcast_utils.model import TimeType, data_sources, create_source_signal_alias_mapper
3839

3940
# first argument is the endpoint name
4041
bp = Blueprint("covidcast", __name__)
@@ -124,6 +125,7 @@ def guess_index_to_use(time: List[TimePair], geo: List[GeoPair], issues: Optiona
124125
@bp.route("/", methods=("GET", "POST"))
125126
def handle():
126127
source_signal_pairs = parse_source_signal_pairs()
128+
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
127129
time_pairs = parse_time_pairs()
128130
geo_pairs = parse_geo_pairs()
129131

@@ -137,8 +139,8 @@ def handle():
137139
fields_string = ["geo_value", "signal"]
138140
fields_int = ["time_value", "direction", "issue", "lag", "missing_value", "missing_stderr", "missing_sample_size"]
139141
fields_float = ["value", "stderr", "sample_size"]
140-
141-
if is_compatibility_mode():
142+
is_compatibility = is_compatibility_mode()
143+
if is_compatibility:
142144
q.set_order("signal", "time_value", "geo_value", "issue")
143145
else:
144146
# transfer also the new detail columns
@@ -158,14 +160,22 @@ def handle():
158160

159161
_handle_lag_issues_as_of(q, issues, lag, as_of)
160162

163+
def transform_row(row, _):
164+
if is_compatibility or not alias_mapper:
165+
return row
166+
row["source"] = alias_mapper(row["source"], row["signal"])
167+
return row
168+
161169
# send query
162-
return execute_query(str(q), q.params, fields_string, fields_int, fields_float)
170+
return execute_query(str(q), q.params, fields_string, fields_int, fields_float, transform=transform_row)
163171

164172

165173
@bp.route("/trend", methods=("GET", "POST"))
166174
def handle_trend():
167175
require_all("date", "window")
168176
source_signal_pairs = parse_source_signal_pairs()
177+
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
178+
# TODO alias
169179
geo_pairs = parse_geo_pairs()
170180

171181
time_value = parse_day_arg("date")
@@ -192,7 +202,10 @@ def handle_trend():
192202

193203
def gen(rows):
194204
for key, group in groupby((parse_row(row, fields_string, fields_int, fields_float) for row in rows), lambda row: (row["geo_type"], row["geo_value"], row["source"], row["signal"])):
195-
trend = compute_trend(key[0], key[1], key[2], key[3], time_value, basis_time_value, ((row["time_value"], row["value"]) for row in group))
205+
geo_type, geo_value, source, signal = key
206+
if alias_mapper:
207+
source = alias_mapper(source, signal)
208+
trend = compute_trend(geo_type, geo_value, source, signal, time_value, basis_time_value, ((row["time_value"], row["value"]) for row in group))
196209
yield trend.asdict()
197210

198211
# execute first query
@@ -209,6 +222,7 @@ def gen(rows):
209222
def handle_trendseries():
210223
require_all("window")
211224
source_signal_pairs = parse_source_signal_pairs()
225+
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
212226
geo_pairs = parse_geo_pairs()
213227

214228
time_window = parse_day_range_arg("window")
@@ -238,9 +252,12 @@ def handle_trendseries():
238252

239253
def gen(rows):
240254
for key, group in groupby((parse_row(row, fields_string, fields_int, fields_float) for row in rows), lambda row: (row["geo_type"], row["geo_value"], row["source"], row["signal"])):
241-
trends = compute_trends(key[0], key[1], key[2], key[3], shifter, ((row["time_value"], row["value"]) for row in group))
242-
for trend in trends:
243-
yield trend.asdict()
255+
geo_type, geo_value, source, signal = key
256+
if alias_mapper:
257+
source = alias_mapper(source, signal)
258+
trends = compute_trends(geo_type, geo_value, source, signal, shifter, ((row["time_value"], row["value"]) for row in group))
259+
for t in trends:
260+
yield t.asdict()
244261

245262
# execute first query
246263
try:
@@ -257,6 +274,7 @@ def handle_correlation():
257274
require_all("reference", "window", "others", "geo")
258275
reference = parse_single_source_signal_arg("reference")
259276
other_pairs = parse_source_signal_arg("others")
277+
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(other_pairs + [reference])
260278
geo_pairs = parse_geo_arg()
261279
time_window = parse_day_range_arg("window")
262280
lag = extract_integer("lag")
@@ -272,7 +290,11 @@ def handle_correlation():
272290
q.set_fields(fields_string, fields_int, fields_float)
273291
q.set_order("geo_type", "geo_value", "source", "signal", "time_value")
274292

275-
q.where_source_signal_pairs("source", "signal", other_pairs + [reference])
293+
q.where_source_signal_pairs(
294+
"source",
295+
"signal",
296+
source_signal_pairs,
297+
)
276298
q.where_geo_pairs("geo_type", "geo_value", geo_pairs)
277299
q.where_time_pairs("time_type", "time_value", [TimePair("day", [time_window])])
278300

@@ -305,6 +327,8 @@ def gen():
305327
continue # no other signals
306328

307329
for (source, signal), other_group in other_groups:
330+
if alias_mapper:
331+
source = alias_mapper(source, signal)
308332
for cor in compute_correlations(geo_type, geo_value, source, signal, lag, reference_group, other_group):
309333
yield cor.asdict()
310334

@@ -315,6 +339,7 @@ def gen():
315339
@bp.route("/csv", methods=("GET", "POST"))
316340
def handle_export():
317341
source, signal = request.args.get("signal", "jhu-csse:confirmed_incidence_num").split(":")
342+
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper([SourceSignalPair(source, [signal])])
318343
start_day = request.args.get("start_day", "2020-04-01")
319344
end_day = request.args.get("end_day", "2020-09-01")
320345
geo_type = request.args.get("geo_type", "county")
@@ -336,7 +361,8 @@ def handle_export():
336361

337362
q.set_fields(["geo_value", "signal", "time_value", "issue", "lag", "value", "stderr", "sample_size", "geo_type", "source"], [], [])
338363
q.set_order("time_value", "geo_value")
339-
q.where(source=source, signal=signal, time_type="day")
364+
q.where(time_type="day")
365+
q.where_source_signal_pairs("source", "signal", source_signal_pairs)
340366
q.conditions.append("time_value BETWEEN :start_day AND :end_day")
341367
q.params["start_day"] = date_to_time_value(start_day)
342368
q.params["end_day"] = date_to_time_value(end_day)
@@ -362,7 +388,7 @@ def parse_row(i, row):
362388
"stderr": row["stderr"],
363389
"sample_size": row["sample_size"],
364390
"geo_type": row["geo_type"],
365-
"data_source": row["source"],
391+
"data_source": alias_mapper(row["source"], row["signal"]) if alias_mapper else row["source"],
366392
}
367393

368394
def gen(first_row, rows):
@@ -394,6 +420,9 @@ def handle_backfill():
394420
"""
395421
require_all("geo", "time", "signal")
396422
signal_pair = parse_single_source_signal_arg("signal")
423+
source_signal_pairs, _ = create_source_signal_alias_mapper([signal_pair])
424+
# don't need the alias mapper since we don't return the source
425+
397426
time_pair = parse_single_time_arg("time")
398427
geo_pair = parse_single_geo_arg("geo")
399428
reference_anchor_lag = extract_integer("anchor_lag") # in days
@@ -410,7 +439,7 @@ def handle_backfill():
410439
q.set_order(time_value=True, issue=True)
411440
q.set_fields(fields_string, fields_int, fields_float, ["is_latest_issue"])
412441

413-
q.where_source_signal_pairs("source", "signal", [signal_pair])
442+
q.where_source_signal_pairs("source", "signal", source_signal_pairs)
414443
q.where_geo_pairs("geo_type", "geo_value", [geo_pair])
415444
q.where_time_pairs("time_type", "time_value", [time_pair])
416445

@@ -463,31 +492,79 @@ def handle_meta():
463492
similar to /covidcast_meta but in a structured optimized JSON form for the app
464493
"""
465494

466-
signal = parse_source_signal_arg("signal")
495+
filter_signal = parse_source_signal_arg("signal")
496+
flags = ",".join(request.values.getlist("flags")).split(",")
497+
filter_smoothed: Optional[bool] = None
498+
filter_weighted: Optional[bool] = None
499+
filter_cumulative: Optional[bool] = None
500+
filter_active: Optional[bool] = None
501+
filter_time_type: Optional[TimeType] = None
502+
503+
if "smoothed" in flags:
504+
filter_smoothed = True
505+
elif "not_smoothed" in flags:
506+
filter_smoothed = False
507+
if "weighted" in flags:
508+
filter_weighted = True
509+
elif "not_weighted" in flags:
510+
filter_weighted = False
511+
if "cumulative" in flags:
512+
filter_cumulative = True
513+
elif "not_cumulative" in flags:
514+
filter_cumulative = False
515+
if "active" in flags:
516+
filter_active = True
517+
elif "inactive" in flags:
518+
filter_active = False
519+
if "day" in flags:
520+
filter_active = TimeType.day
521+
elif "week" in flags:
522+
filter_active = TimeType.week
467523

468524
row = db.execute(text("SELECT epidata FROM covidcast_meta_cache LIMIT 1")).fetchone()
469525

470526
data = loads(row["epidata"]) if row and row["epidata"] else []
471527

472-
all_signals: AllSignalsMap = {}
528+
by_signal: Dict[Tuple[str, str], List[Dict[str, Any]]] = {}
473529
for row in data:
474-
if row["time_type"] != "day":
475-
continue
476-
entry: Set[str] = all_signals.setdefault(row["data_source"], set())
477-
entry.add(row["signal"])
530+
entry = by_signal.setdefault((row["data_source"], row["signal"]), [])
531+
entry.append(row)
478532

479-
out: Dict[str, CovidcastMetaEntry] = {}
480-
for row in data:
481-
if row["time_type"] != "day":
533+
sources: List[Dict[str, Any]] = []
534+
for source in data_sources:
535+
if filter_active is not None and source.active != filter_active:
482536
continue
483-
if signal and all((not s.matches(row["data_source"], row["signal"]) for s in signal)):
537+
538+
meta_signals: List[Dict[str, Any]] = []
539+
540+
for signal in source.signals:
541+
if filter_signal and all((not s.matches(signal.source, signal.signal) for s in filter_signal)):
542+
continue
543+
if filter_smoothed is not None and signal.is_smoothed != filter_smoothed:
544+
continue
545+
if filter_weighted is not None and signal.is_weighted != filter_weighted:
546+
continue
547+
if filter_cumulative is not None and signal.is_cumulative != filter_cumulative:
548+
continue
549+
if filter_time_type is not None and signal.time_type != filter_time_type:
550+
continue
551+
meta_data = by_signal.get(signal.key)
552+
if not meta_data:
553+
continue
554+
row = meta_data[0]
555+
entry = CovidcastMetaEntry(signal, row["min_time"], row["max_time"], row["max_issue"])
556+
for row in meta_data:
557+
entry.intergrate(row)
558+
meta_signals.append(entry.asdict())
559+
560+
if not meta_signals: # none found or no signals
484561
continue
485-
entry = out.setdefault(
486-
f"{row['data_source']}:{row['signal']}", CovidcastMetaEntry(row["data_source"], row["signal"], row["min_time"], row["max_time"], row["max_issue"], {}, all_signals=all_signals)
487-
)
488-
entry.intergrate(row)
489562

490-
return jsonify([r.asdict() for r in out.values()])
563+
s = source.asdict()
564+
s["signals"] = meta_signals
565+
sources.append(s)
566+
567+
return jsonify(sources)
491568

492569

493570
@bp.route("/coverage", methods=("GET", "POST"))
@@ -496,7 +573,8 @@ def handle_coverage():
496573
similar to /signal_dashboard_coverage for a specific signal returns the coverage (number of locations for a given geo_type)
497574
"""
498575

499-
signal = parse_source_signal_pairs()
576+
source_signal_pairs = parse_source_signal_pairs()
577+
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
500578
geo_type = request.args.get("geo_type", "county")
501579
if "window" in request.values:
502580
time_window = parse_day_range_arg("window")
@@ -523,14 +601,20 @@ def handle_coverage():
523601
q.conditions.append('geo_value not like "%000"')
524602
else:
525603
q.where(geo_type=geo_type)
526-
q.where_source_signal_pairs("source", "signal", signal)
604+
q.where_source_signal_pairs("source", "signal", source_signal_pairs)
527605
q.where_time_pairs("time_type", "time_value", [TimePair("day", [time_window])])
528606
q.group_by = "c.source, c.signal, c.time_value"
529607
q.set_order("source", "signal", "time_value")
530608

531609
_handle_lag_issues_as_of(q, None, None, None)
532610

533-
return execute_query(q.query, q.params, fields_string, fields_int, [])
611+
def transform_row(row, _):
612+
if not alias_mapper:
613+
return row
614+
row["source"] = alias_mapper(row["source"], row["signal"])
615+
return row
616+
617+
return execute_query(q.query, q.params, fields_string, fields_int, [], transform=transform_row)
534618

535619

536620
@bp.route("/anomalies", methods=("GET", "POST"))
@@ -539,8 +623,6 @@ def handle_anomalies():
539623
proxy to the excel sheet about data anomalies
540624
"""
541625

542-
signal = parse_source_signal_arg("signal")
543-
544626
df = read_csv(
545627
"https://docs.google.com/spreadsheets/d/e/2PACX-1vToGcf9x5PNJg-eSrxadoR5b-LM2Cqs9UML97587OGrIX0LiQDcU1HL-L2AA8o5avbU7yod106ih0_n/pub?gid=0&single=true&output=csv", skip_blank_lines=True
546628
)

0 commit comments

Comments
 (0)