-
Notifications
You must be signed in to change notification settings - Fork 16
Description
Actual Behavior:
"chng" "smoothed_adj_outpatient_flu" surprisingly has data with time_value > the as_of, but docs do not mention containing forecasts, and metadata suggests there should not be (related?).
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(epidatr)
api = covidcast_epidata()
ak_chng_flu_as_of_20220901 =
api$sources$chng$signals$smoothed_adj_outpatient_flu$call("state", "ak", epirange(12340101,34560101), as_of=20220901) %>%
fetch_tbl()
range(ak_chng_flu_as_of_20220901$time_value)
#> [1] "2020-02-01" "2022-11-05"
ccm = covidcast::covidcast_meta()
ccm %>%
filter(data_source == "chng", signal=="smoothed_adj_outpatient_flu", geo_type=="state") %>%
select(min_lag, max_lag)
#> A `covidcast_meta` data frame with 1 rows and 2 columns.
#>
#> Number of data sources : 0
#> Number of signals : 0
#>
#> min_lag max_lag
#> 1 5 674Created on 2023-01-02 by the reprex package (v2.0.1)
Brief analysis in "Context:" below suggests that this may be limited to the 2022-09-01 issue, but I have not confirmed this rigorously.
Expected behavior
I expected the documentation to mention forecasts, or to not to produce time_values greater than the as_of for any as_of
Context
Potentially-related issues:
- CHNG historical issues are wrong before 2021-02-21 #1289, but this suggests that the 2022-09-01 as-of data should be okay.
- CHNG min_lags not consistent within signals #1382, regarding
min_lagmetadata
I ran into this originally when trying to do some revision behavior analysis on some hhs and chng influenza data. Work towards analysis included below, which may help find where/when all these negative-version-lag data are:
Analysis:
library(epidatr)
library(epiprocess)
#>
#> Attaching package: 'epiprocess'
#> The following object is masked from 'package:stats':
#>
#> filter
library(data.table)
library(tibble)
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:data.table':
#>
#> between, first, last
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tidyr)
library(ggplot2)
states_dc_pr_vi = c('al', 'ak', 'az', 'ar', 'ca', 'co', 'ct', 'dc', 'de', 'fl',
'ga', 'hi', 'id', 'il', 'in', 'ia', 'ks', 'ky', 'la', 'me',
'md', 'ma', 'mi', 'mn', 'ms', 'mo', 'mt', 'ne', 'nv', 'nh',
'nj', 'nm', 'ny', 'nc', 'nd', 'oh', 'ok', 'or', 'pa', 'ri',
'sc', 'sd', 'tn', 'tx', 'ut', 'vt', 'va', 'wa', 'wv', 'wi',
'wy', 'pr', 'vi')
# devtools::load_all(here::here("code","direction.forecaster"), export_all=FALSE)
fetch_updating_resource = function(fetch,
check_response,
cache_file_prefix=NULL,
cache_invalidation_period=as.difftime(1L, units="days"),
force_cache_invalidation=FALSE,
silent=FALSE,
make_ancestor_directories=TRUE
) {
cache_filepath =
if (is.null(cache_file_prefix)) {
NULL
} else {
paste0(cache_file_prefix, ".RDS")
}
should_fetch_now =
if (is.null(cache_filepath)) {
TRUE
} else if (force_cache_invalidation) {
TRUE
} else if (!file.exists(cache_filepath)) {
TRUE
} else {
fetch_info = readRDS(cache_filepath) # ==> fetch_info
should_refetch = difftime(Sys.time(), fetch_info$fetch_timestamp) > cache_invalidation_period
should_refetch
}
if (should_fetch_now) {
if (!silent) {
message("No cache, empty cache, expired cache, or forced cache invalidation; fetching data.")
}
# todo prompt for confirmation on fetch/refetch
# todo option to read from cache without considering refetch
#
# `Sys.time()` outputs a POSIXct, which tracks the time elapsed since some
# epoch, paired with timezone metadata. Arithmetic should still reliably
# work even with timezone changes (including DST), while likely providing
# for quicker debugging for users that don't switch timezones frequently.
fetch_response <- fetch()
fetch_timestamp <- Sys.time()
} else {
if (!silent) {
message("Cached version of data used.")
}
fetch_response = fetch_info$fetch_response
fetch_timestamp = fetch_info$fetch_timestamp
}
check_response(fetch_response)
if (!is.null(cache_filepath) && should_fetch_now) {
## Update cache:
fetch_info = list(fetch_response=fetch_response, fetch_timestamp=fetch_timestamp)
if (make_ancestor_directories && !dir.exists(dirname(cache_filepath))) {
dir.create(dirname(cache_filepath), recursive=TRUE)
}
saveRDS(fetch_info, cache_filepath)
}
return (fetch_response)
}
response_to_archive = function(response) {
response %>%
unite(source_signal, c(source, signal)) %>%
rename(version = issue) %>%
pivot_wider(id_cols=c(geo_value, time_value, version), names_from=source_signal, values_from=value) %>%
as_epi_archive(compactify=TRUE)
}
fetch_updating_covidcast_archive = function(data_source, signal) {
fetch_updating_resource(
function() {
covidcast(data_source, signal,
"day", "state",
epirange(12340101, 34560101),
"*",
issues = epirange(12340101, as.integer(format(Sys.Date(), "%Y%m%d")))) %>%
fetch_tbl() %>%
filter(.data$geo_value %in% .env$states_dc_pr_vi)
},
function(response) {
if (!is_tibble(response)) {
abort("result was not a tibble")
}
if (nrow(response) == 0L) {
abort("result had no rows")
}
},
cache_file_prefix = here::here("cache","issue-data",
paste0(data_source, "_", signal))
) %>%
response_to_archive()
}
archive_hhs_confirmed_admissions_influenza_1d_prop_7dav =
fetch_updating_covidcast_archive("hhs", "confirmed_admissions_influenza_1d_prop_7dav") %>%
# filter out some bad data for time_values before reporting became mandatory
# the first time; not sure exactly when influenza reporting compliance was
# broadly achieved, but this is better than nothing:
{
new_min_time_value = .$DT[!is.na(hhs_confirmed_admissions_influenza_1d_prop_7dav) &
hhs_confirmed_admissions_influenza_1d_prop_7dav != 0L &
geo_value != "nv",
min(time_value)]
.$DT <- .$DT[.$DT$time_value >= new_min_time_value, names(.$DT), with=FALSE]
.
}
#> No cache, empty cache, expired cache, or forced cache invalidation; fetching data.
archive_chng_smoothed_adj_outpatient_flu =
fetch_updating_covidcast_archive("chng", "smoothed_adj_outpatient_flu")
#> No cache, empty cache, expired cache, or forced cache invalidation; fetching data.
archive =
epix_merge(archive_hhs_confirmed_admissions_influenza_1d_prop_7dav, archive_chng_smoothed_adj_outpatient_flu,
sync="locf")
version_lag_data =
archive %>%
group_by(geo_value) %>%
epix_slide(~ tibble(data=list(.x)), before = 60L) %>%
ungroup() %>%
transmute(version=time_value, slide_value_data) %>%
unnest(slide_value_data) %>%
mutate(version_lag = as.integer(version - time_value)) %>%
pivot_longer(c("hhs_confirmed_admissions_influenza_1d_prop_7dav","chng_smoothed_adj_outpatient_flu"),
names_to="signal")
# version_lag_data %>%
# inner_join(
# {.} %>%
# filter(version_lag == 60L) %>%
# select(-version_lag) %>%
# rename(anchor = value),
# by = c("signal", "geo_value", "time_value")
# )
version_lag_data %>% distinct(version_lag) %>% print(n=200L)
#> # A tibble: 126 × 1
#> version_lag
#> <int>
#> 1 60
#> 2 59
#> 3 58
#> 4 57
#> 5 56
#> 6 55
#> 7 54
#> 8 53
#> 9 52
#> 10 51
#> 11 50
#> 12 49
#> 13 48
#> 14 47
#> 15 46
#> 16 45
#> 17 44
#> 18 43
#> 19 42
#> 20 41
#> 21 40
#> 22 39
#> 23 38
#> 24 37
#> 25 36
#> 26 35
#> 27 34
#> 28 33
#> 29 32
#> 30 31
#> 31 30
#> 32 29
#> 33 28
#> 34 27
#> 35 26
#> 36 25
#> 37 24
#> 38 23
#> 39 22
#> 40 21
#> 41 20
#> 42 19
#> 43 18
#> 44 17
#> 45 16
#> 46 15
#> 47 14
#> 48 13
#> 49 12
#> 50 11
#> 51 10
#> 52 9
#> 53 8
#> 54 7
#> 55 6
#> 56 5
#> 57 4
#> 58 3
#> 59 2
#> 60 1
#> 61 -6
#> 62 -7
#> 63 -8
#> 64 -9
#> 65 -10
#> 66 -11
#> 67 -12
#> 68 -13
#> 69 -14
#> 70 -15
#> 71 -16
#> 72 -17
#> 73 -18
#> 74 -19
#> 75 -20
#> 76 -21
#> 77 -22
#> 78 -23
#> 79 -24
#> 80 -25
#> 81 -26
#> 82 -27
#> 83 -28
#> 84 -29
#> 85 -30
#> 86 -31
#> 87 -32
#> 88 -33
#> 89 -34
#> 90 -35
#> 91 -36
#> 92 -37
#> 93 -38
#> 94 -39
#> 95 -40
#> 96 -41
#> 97 -42
#> 98 -43
#> 99 -44
#> 100 -45
#> 101 -46
#> 102 -47
#> 103 -48
#> 104 -49
#> 105 -50
#> 106 -51
#> 107 -52
#> 108 -53
#> 109 -54
#> 110 -55
#> 111 -56
#> 112 -57
#> 113 -58
#> 114 -59
#> 115 -60
#> 116 -61
#> 117 -62
#> 118 -63
#> 119 -64
#> 120 -65
#> 121 -5
#> 122 -4
#> 123 -3
#> 124 -2
#> 125 -1
#> 126 0
version_lag_data %>% filter(version_lag == -20L)
#> # A tibble: 4,876 × 6
#> version geo_value time_value version_lag signal value
#> <date> <chr> <date> <int> <chr> <dbl>
#> 1 2022-09-01 ak 2022-09-21 -20 hhs_confirmed_admissions… NA
#> 2 2022-09-01 ak 2022-09-21 -20 chng_smoothed_adj_outpat… 0.0502
#> 3 2022-09-01 al 2022-09-21 -20 hhs_confirmed_admissions… NA
#> 4 2022-09-01 al 2022-09-21 -20 chng_smoothed_adj_outpat… 0.0518
#> 5 2022-09-01 ar 2022-09-21 -20 hhs_confirmed_admissions… NA
#> 6 2022-09-01 ar 2022-09-21 -20 chng_smoothed_adj_outpat… 0.0103
#> 7 2022-09-01 az 2022-09-21 -20 hhs_confirmed_admissions… NA
#> 8 2022-09-01 az 2022-09-21 -20 chng_smoothed_adj_outpat… 0.0125
#> 9 2022-09-01 ca 2022-09-21 -20 hhs_confirmed_admissions… NA
#> 10 2022-09-01 ca 2022-09-21 -20 chng_smoothed_adj_outpat… 0.0212
#> # … with 4,866 more rows
archive_chng_smoothed_adj_outpatient_flu$DT[time_value > version]
#> geo_value time_value version chng_smoothed_adj_outpatient_flu
#> 1: ak 2022-09-07 2022-09-01 0.0498707
#> 2: ak 2022-09-08 2022-09-01 0.0340505
#> 3: ak 2022-09-09 2022-09-01 0.0340368
#> 4: ak 2022-09-10 2022-09-01 0.0257732
#> 5: ak 2022-09-11 2022-09-01 0.0636943
#> ---
#> 3176: wy 2022-11-01 2022-09-01 0.1605823
#> 3177: wy 2022-11-02 2022-09-01 0.1884758
#> 3178: wy 2022-11-03 2022-09-01 0.1903627
#> 3179: wy 2022-11-04 2022-09-01 0.2013854
#> 3180: wy 2022-11-05 2022-09-01 0.1508639Created on 2023-01-02 by the reprex package (v2.0.1)