diff --git a/gs_quant/timeseries/helper.py b/gs_quant/timeseries/helper.py index 6b0afce6..4d9db3e7 100644 --- a/gs_quant/timeseries/helper.py +++ b/gs_quant/timeseries/helper.py @@ -13,6 +13,7 @@ specific language governing permissions and limitations under the License. """ + import datetime as dt import inspect import logging @@ -33,25 +34,33 @@ from gs_quant.errors import MqValueError, MqRequestError from gs_quant.timeseries.measure_registry import register_measure -ENABLE_DISPLAY_NAME = 'GSQ_ENABLE_MEASURE_DISPLAY_NAME' +ENABLE_DISPLAY_NAME = "GSQ_ENABLE_MEASURE_DISPLAY_NAME" USE_DISPLAY_NAME = os.environ.get(ENABLE_DISPLAY_NAME) == "1" _logger = logging.getLogger(__name__) class Entitlement(Enum): - INTERNAL = 'internal' + INTERNAL = "internal" try: from quant_extensions.timeseries.rolling import rolling_apply except ImportError as e: - _logger.debug('unable to import rolling_apply extension: %s', e) + _logger.debug("unable to import rolling_apply extension: %s", e) - def rolling_apply(s: pd.Series, offset: pd.DateOffset, function: Callable[[np.ndarray], float]) -> pd.Series: + def rolling_apply( + s: pd.Series, offset: pd.DateOffset, function: Callable[[np.ndarray], float] + ) -> pd.Series: if isinstance(s.index, pd.DatetimeIndex): - values = [function(s.loc[(s.index > (idx - offset)) & (s.index <= idx)]) for idx in s.index] + values = [ + function(s.loc[(s.index > (idx - offset)) & (s.index <= idx)]) + for idx in s.index + ] else: - values = [function(s.loc[(s.index > (idx - offset).date()) & (s.index <= idx)]) for idx in s.index] + values = [ + function(s.loc[(s.index > (idx - offset).date()) & (s.index <= idx)]) + for idx in s.index + ] return pd.Series(values, index=s.index, dtype=np.double) @@ -65,39 +74,40 @@ def _create_int_enum(name, mappings): def _to_offset(tenor: str) -> pd.DateOffset: import re - matcher = re.fullmatch('(\\d+)([hdwmy])', tenor) + + matcher = re.fullmatch("(\\d+)([hdwmy])", tenor) if not matcher: - raise MqValueError('invalid tenor ' + tenor) + raise MqValueError("invalid tenor " + tenor) ab = matcher.group(2) - if ab == 'h': - name = 'hours' - elif ab == 'd': - name = 'days' - elif ab == 'w': - name = 'weeks' - elif ab == 'm': - name = 'months' + if ab == "h": + name = "hours" + elif ab == "d": + name = "days" + elif ab == "w": + name = "weeks" + elif ab == "m": + name = "months" else: - assert ab == 'y' - name = 'years' + assert ab == "y" + name = "years" kwarg = {name: int(matcher.group(1))} return pd.DateOffset(**kwarg) def _tenor_to_month(relative_date: str) -> int: - matcher = re.fullmatch('([1-9]\\d*)([my])', relative_date) + matcher = re.fullmatch("([1-9]\\d*)([my])", relative_date) if matcher: mag = int(matcher.group(1)) - return mag if matcher.group(2) == 'm' else mag * 12 - raise MqValueError('invalid input: relative date must be in months or years') + return mag if matcher.group(2) == "m" else mag * 12 + raise MqValueError("invalid input: relative date must be in months or years") -Interpolate = _create_enum('Interpolate', ['intersect', 'step', 'nan', 'zero', 'time']) -Returns = _create_enum('Returns', ['simple', 'logarithmic', 'absolute']) -SeriesType = _create_enum('SeriesType', ['prices', 'returns']) -CurveType = _create_enum('CurveType', ['prices', 'excess_returns']) +Interpolate = _create_enum("Interpolate", ["intersect", "step", "nan", "zero", "time"]) +Returns = _create_enum("Returns", ["simple", "logarithmic", "absolute"]) +SeriesType = _create_enum("SeriesType", ["prices", "returns"]) +CurveType = _create_enum("CurveType", ["prices", "excess_returns"]) class Window: @@ -125,61 +135,71 @@ class Window: """ - def __init__(self, w: Union[int, str, None] = None, r: Union[int, str, None] = None): + def __init__( + self, w: Union[int, str, None] = None, r: Union[int, str, None] = None + ): self.w = w self.r = w if r is None else r def as_dict(self): - return { - 'w': self.w, - 'r': self.r - } + return {"w": self.w, "r": self.r} @classmethod def from_dict(cls, obj): - return Window(w=obj.get('w'), r=obj.get('r')) + return Window(w=obj.get("w"), r=obj.get("r")) def _check_window(series_length: int, window: Window): if series_length > 0 and isinstance(window.w, int) and isinstance(window.r, int): if window.w <= 0: - raise MqValueError('Window value must be greater than zero.') + raise MqValueError("Window value must be greater than zero.") if window.r > series_length or window.r < 0: - raise MqValueError('Ramp value must be less than the length of the series and greater than zero.') + raise MqValueError( + "Ramp value must be less than the length of the series and greater than zero." + ) def apply_ramp(x: pd.Series, window: Window) -> pd.Series: _check_window(len(x), window) - if isinstance(window.w, int) and window.w > len(x): # does not restrict window size when it is a DataOffset + if isinstance(window.w, int) and window.w > len( + x + ): # does not restrict window size when it is a DataOffset return pd.Series(dtype=float) if isinstance(window.r, pd.DateOffset): if np.issubdtype(x.index, dt.date): - return x.loc[(x.index[0] + window.r).date():] + return x.loc[(x.index[0] + window.r).date() :] else: - return x.loc[(x.index[0] + window.r).to_pydatetime():] + return x.loc[(x.index[0] + window.r).to_pydatetime() :] else: - return x[window.r:] + return x[window.r :] -def normalize_window(x: Union[pd.Series, pd.DataFrame], window: Union[Window, int, str, None], - default_window: int = None) -> Window: +def normalize_window( + x: Union[pd.Series, pd.DataFrame], + window: Union[Window, int, str, None], + default_window: int = None, +) -> Window: if default_window is None: default_window = len(x) + # Convert string window to offset just once if isinstance(window, int): window = Window(window, window) elif isinstance(window, str): - window = Window(_to_offset(window), _to_offset(window)) + offset = _to_offset(window) + window = Window(offset, offset) else: if window is None: window = Window(default_window, 0) else: - if isinstance(window.w, str): - window = Window(_to_offset(window.w), window.r) - if isinstance(window.r, str): - window = Window(window.w, _to_offset(window.r)) - if window.w is None: - window = Window(default_window, window.r) + w_val, r_val = window.w, window.r + if isinstance(w_val, str): + w_val = _to_offset(w_val) + if isinstance(r_val, str): + r_val = _to_offset(r_val) + if w_val is None: + w_val = default_window + window = Window(w_val, r_val) _check_window(default_window, window) return window @@ -198,17 +218,24 @@ def plot_session_function(fn): def check_forward_looking(pricing_date, source, name="function"): - if pricing_date is not None or source != 'plottool': + if pricing_date is not None or source != "plottool": return if DataContext.current.end_date <= dt.date.today(): - msg = (f'{name}() requires a forward looking date range e.g. [0d, 3y]. ' - 'Please update the date range via the date picker.') + msg = ( + f"{name}() requires a forward looking date range e.g. [0d, 3y]. " + "Please update the date range via the date picker." + ) raise MqValueError(msg) -def plot_measure(asset_class: tuple, asset_type: Optional[tuple] = None, - dependencies: Optional[List[QueryType]] = tuple(), asset_type_excluded: Optional[tuple] = None, - display_name: Optional[str] = None, entitlements: Optional[List[Entitlement]] = []): +def plot_measure( + asset_class: tuple, + asset_type: Optional[tuple] = None, + dependencies: Optional[List[QueryType]] = tuple(), + asset_type_excluded: Optional[tuple] = None, + display_name: Optional[str] = None, + entitlements: Optional[List[Entitlement]] = [], +): # Indicates that fn should be exported to plottool as a member function / pseudo-measure. # Set category to None for no restrictions, else provide a tuple of allowed values. def decorator(fn): @@ -238,7 +265,9 @@ def decorator(fn): return decorator -def plot_measure_entity(entity_type: EntityType, dependencies: Optional[Iterable[QueryType]] = tuple()): +def plot_measure_entity( + entity_type: EntityType, dependencies: Optional[Iterable[QueryType]] = tuple() +): def decorator(fn): assert isinstance(entity_type, EntityType) if dependencies is not None: @@ -266,7 +295,7 @@ def plot_method(fn): # Allows fn to accept and ignore real_time argument even if it is not defined in the signature @wraps(fn) def ignore_extra_argument(*args, **kwargs): - for arg in ('real_time', 'interval', 'time_filter'): + for arg in ("real_time", "interval", "time_filter"): if arg not in inspect.signature(fn).parameters: kwargs.pop(arg, None) return fn(*args, **kwargs) @@ -279,7 +308,7 @@ def outer(fn): @wraps(fn) def inner(*args, **kwargs): response = fn(*args, **kwargs) - logger.debug('%s: %s', message, response) + logger.debug("%s: %s", message, response) return response return inner @@ -306,30 +335,46 @@ def get_df_with_retries(fetcher, start_date, end_date, exchange, retries=1): result = fetcher() if not result.empty: break - kwargs = {'exchanges': [exchange]} if exchange else {} + kwargs = {"exchanges": [exchange]} if exchange else {} # no need to include any part of the previous date range since it's known to be empty - end_date = RelativeDate('-1b', base_date=start_date).apply_rule(**kwargs) + end_date = RelativeDate("-1b", base_date=start_date).apply_rule(**kwargs) start_date = end_date retries -= 1 return result -def get_dataset_data_with_retries(dataset: Dataset, - *, - start: dt.date, - end: dt.date, - count: int = 0, - max_retries: int = 5, - **kwargs) -> pd.DataFrame: +def get_dataset_data_with_retries( + dataset: Dataset, + *, + start: dt.date, + end: dt.date, + count: int = 0, + max_retries: int = 5, + **kwargs, +) -> pd.DataFrame: try: data = dataset.get_data(start=start, end=end, **kwargs) except MqRequestError as e: if count < max_retries: mid = start + (end - start) / 2 count += 1 - first_half = partial(get_dataset_data_with_retries, dataset, start=start, end=mid, count=count, **kwargs) + first_half = partial( + get_dataset_data_with_retries, + dataset, + start=start, + end=mid, + count=count, + **kwargs, + ) mid = mid + dt.timedelta(days=1) - second_half = partial(get_dataset_data_with_retries, dataset, start=mid, end=end, count=count, **kwargs) + second_half = partial( + get_dataset_data_with_retries, + dataset, + start=mid, + end=end, + count=count, + **kwargs, + ) results = ThreadPoolManager.run_async([first_half, second_half]) first_half_results, second_half_results = results[0], results[1] data = pd.concat([first_half_results, second_half_results]).sort_index() @@ -339,23 +384,31 @@ def get_dataset_data_with_retries(dataset: Dataset, def get_dataset_with_many_assets( - ds: Dataset, - *, - assets: List[str], - start: dt.date, - end: dt.date, - batch_limit: int = 100, - **kwargs - + ds: Dataset, + *, + assets: List[str], + start: dt.date, + end: dt.date, + batch_limit: int = 100, + **kwargs, ) -> pd.DataFrame: - tasks = [partial(ds.get_data, assetId=assets[i:i + batch_limit], start=start, end=end, - return_type=None, **kwargs) for i in range(0, len(assets), batch_limit)] + tasks = [ + partial( + ds.get_data, + assetId=assets[i : i + batch_limit], + start=start, + end=end, + return_type=None, + **kwargs, + ) + for i in range(0, len(assets), batch_limit) + ] results = ThreadPoolManager.run_async(tasks) return pd.concat(results) def _month_to_tenor(months: int) -> str: - return f'{months // 12}y' if months % 12 == 0 else f'{months}m' + return f"{months // 12}y" if months % 12 == 0 else f"{months}m" def _split_where_conditions(where): @@ -379,10 +432,12 @@ def _pandas_roll(s: pd.Series, window_str: str, method_name: str): return getattr(s.rolling(window_str), method_name)() -def rolling_offset(s: pd.Series, - offset: pd.DateOffset, - function: Callable[[np.ndarray], float], - method_name: str = None) -> pd.Series: +def rolling_offset( + s: pd.Series, + offset: pd.DateOffset, + function: Callable[[np.ndarray], float], + method_name: str = None, +) -> pd.Series: """ Perform rolling window calculations. If offset has a fixed frequency and method name is provided, will use `Series.rolling< https://pandas.pydata.org/docs/reference/api/pandas.Series.rolling.html>`_ for best performance. @@ -394,22 +449,19 @@ def rolling_offset(s: pd.Series, :return: result time series """ # frequencies that can be passed to Series.rolling - fixed = { - 'hour': 'h', - 'hours': 'h', - 'day': 'D', - 'days': 'D' - } + fixed = {"hour": "h", "hours": "h", "day": "D", "days": "D"} if method_name and len(offset.kwds) == 1: freq, count = offset.kwds.popitem() if freq in fixed: - window_str = f'{count}{fixed[freq]}' + window_str = f"{count}{fixed[freq]}" if np.issubdtype(s.index, np.datetime64): return _pandas_roll(s, window_str, method_name) else: t = s.copy(deep=False) t.index = pd.to_datetime(t.index) # needed for Series.rolling - return pd.Series(_pandas_roll(t, window_str, method_name), index=s.index) + return pd.Series( + _pandas_roll(t, window_str, method_name), index=s.index + ) return rolling_apply(s, offset, function) diff --git a/gs_quant/timeseries/statistics.py b/gs_quant/timeseries/statistics.py index c00799b3..44861145 100644 --- a/gs_quant/timeseries/statistics.py +++ b/gs_quant/timeseries/statistics.py @@ -28,8 +28,16 @@ from .algebra import ceil, floor from .datetime import interpolate -from .helper import Window, normalize_window, rolling_offset, apply_ramp, plot_function, rolling_apply, Interpolate, \ - plot_method +from .helper import ( + Window, + normalize_window, + rolling_offset, + apply_ramp, + plot_function, + rolling_apply, + Interpolate, + plot_method, +) from ..data import DataContext from ..errors import MqValueError, MqTypeError from ..models.epidemiology import SIR, SEIR, EpidemicModel @@ -43,12 +51,15 @@ try: from quant_extensions.timeseries.statistics import rolling_std except ImportError: + def rolling_std(x: pd.Series, offset: pd.DateOffset) -> pd.Series: size = len(x) index = x.index results = np.empty(size, dtype=np.double) results[0] = np.nan - values = np.array(x.values, dtype=np.double) # put data into np arrays to save time on slicing later + values = np.array( + x.values, dtype=np.double + ) # put data into np arrays to save time on slicing later start = 0 for i in range(1, size): @@ -56,7 +67,7 @@ def rolling_std(x: pd.Series, offset: pd.DateOffset) -> pd.Series: if pd.Timestamp(index[j]) > index[i] - offset: start = j break - section = values[start:i + 1] + section = values[start : i + 1] results[i] = np.std(section[section == section], ddof=1) return pd.Series(results, index=index, dtype=np.double) @@ -69,13 +80,15 @@ def _concat_series(series: List[pd.Series]): if s.min() != s.max(): curves.append(s) else: - constants[f'temp{k}'] = s.min() + constants[f"temp{k}"] = s.min() k += 1 return pd.concat(curves, axis=1).assign(**constants) @plot_function -def min_(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def min_( + x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0) +) -> pd.Series: """ Minimum value of series over given window @@ -136,15 +149,23 @@ def min_(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Wind w = normalize_window(x, w) assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): - values = rolling_offset(x, w.w, np.nanmin, 'min') if isinstance(x, pd.Series) else [ - x.loc[(x.index > (idx - w.w).datetime()) & (x.index <= idx)].min() for idx in x.index] + values = ( + rolling_offset(x, w.w, np.nanmin, "min") + if isinstance(x, pd.Series) + else [ + x.loc[(x.index > (idx - w.w).datetime()) & (x.index <= idx)].min() + for idx in x.index + ] + ) return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).min(), w) @plot_function -def max_(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def max_( + x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0) +) -> pd.Series: """ Maximum value of series over given window @@ -203,8 +224,13 @@ def max_(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Wind w = normalize_window(x, w) assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): - values = rolling_offset(x, w.w, np.nanmax, 'max') if isinstance(x, pd.Series) else [ - x.loc[(x.index > idx - w.w) & (x.index <= idx)].max() for idx in x.index] + values = ( + rolling_offset(x, w.w, np.nanmax, "max") + if isinstance(x, pd.Series) + else [ + x.loc[(x.index > idx - w.w) & (x.index <= idx)].max() for idx in x.index + ] + ) return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).max(), w) @@ -252,7 +278,9 @@ def range_(x: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Ser @plot_function -def mean(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def mean( + x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0) +) -> pd.Series: """ Arithmetic mean of series over given window @@ -268,13 +296,13 @@ def mean(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Wind If a timeseries is provided: - :math:`R_t = \\frac{\\sum_{i=t-w+1}^{t} X_i}{N}` + :math:`R_t = \frac{\sum_{i=t-w+1}^{t} X_i}{N}` where :math:`N` is the number of observations in each rolling window, :math:`w`. If an array of timeseries is provided: - :math:`R_t = \\frac{\\sum_{i=t-w+1}^{t} {\\sum_{j=1}^{n}} X_{ij}}{N}` + :math:`R_t = \frac{\sum_{i=t-w+1}^{t} {\sum_{j=1}^{n}} X_{ij}}{N}` where :math:`n` is the number of series, and :math:`N` is the number of observations in each rolling window, :math:`w`. @@ -300,15 +328,17 @@ def mean(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Wind assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): if isinstance(x, pd.Series): - values = rolling_offset(x, w.w, np.nanmean, 'mean') + values = rolling_offset(x, w.w, np.nanmean, "mean") else: - values = [np.nanmean(x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)]) for idx in x.index] + # For DataFrame/dateoffset, preserve original code path + values = [ + np.nanmean(x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)]) + for idx in x.index + ] else: - if isinstance(x, pd.Series): - values = x.rolling(w.w, 0).mean() # faster than slicing in Python - else: - values = [np.nanmean(x.iloc[max(idx - w.w + 1, 0): idx + 1]) for idx in range(0, len(x))] - return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) + # Use Pandas vectorized rolling for both Series and DataFrame for higher performance + values = x.rolling(w.w, 0).mean() + return apply_ramp(pd.Series(values, index=x.index), w) @plot_function @@ -349,8 +379,14 @@ def median(x: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Ser w = normalize_window(x, w) assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): - values = rolling_offset(x, w.w, np.nanmedian, 'median') if isinstance(x, pd.Series) else [ - x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].median() for idx in x.index] + values = ( + rolling_offset(x, w.w, np.nanmedian, "median") + if isinstance(x, pd.Series) + else [ + x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].median() + for idx in x.index + ] + ) return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).median(), w) @@ -389,15 +425,27 @@ def mode(x: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Serie w = normalize_window(x, w) assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): - values = rolling_apply(x, w.w, lambda a: stats.mode(a).mode[0]) if isinstance(x, pd.Series) else [ - stats.mode(x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)]).mode[0] for idx in x.index] + values = ( + rolling_apply(x, w.w, lambda a: stats.mode(a).mode[0]) + if isinstance(x, pd.Series) + else [ + stats.mode( + x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)] + ).mode[0] + for idx in x.index + ] + ) return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: - return apply_ramp(x.rolling(w.w, 0).apply(lambda y: stats.mode(y).mode, raw=True), w) + return apply_ramp( + x.rolling(w.w, 0).apply(lambda y: stats.mode(y).mode, raw=True), w + ) @plot_function -def sum_(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def sum_( + x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Window(None, 0) +) -> pd.Series: """ Rolling sum of series over given window @@ -443,8 +491,8 @@ def sum_(x: Union[pd.Series, List[pd.Series]], w: Union[Window, int, str] = Wind w = normalize_window(x, w) assert x.index.is_monotonic_increasing if isinstance(w.w, pd.DateOffset): - assert isinstance(x, pd.Series), 'expected a series' - values = rolling_offset(x, w.w, np.nansum, 'sum') + assert isinstance(x, pd.Series), "expected a series" + values = rolling_offset(x, w.w, np.nansum, "sum") return apply_ramp(values, w) else: return apply_ramp(x.rolling(w.w, 0).sum(), w) @@ -484,8 +532,14 @@ def product(x: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Se w = normalize_window(x, w) assert x.index.is_monotonic_increasing if isinstance(w.w, pd.DateOffset): - values = rolling_offset(x, w.w, np.nanprod, 'prod') if isinstance(x, pd.Series) else [ - x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].prod() for idx in x.index] + values = ( + rolling_offset(x, w.w, np.nanprod, "prod") + if isinstance(x, pd.Series) + else [ + x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].prod() + for idx in x.index + ] + ) return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).agg(pd.Series.prod), w) @@ -624,15 +678,23 @@ def var(x: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Series w = normalize_window(x, w) assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): - values = rolling_offset(x, w.w, lambda a: np.nanvar(a, ddof=1), 'var') if isinstance(x, pd.Series) else [ - x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].var() for idx in x.index] + values = ( + rolling_offset(x, w.w, lambda a: np.nanvar(a, ddof=1), "var") + if isinstance(x, pd.Series) + else [ + x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].var() + for idx in x.index + ] + ) return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).var(), w) @plot_function -def cov(x: pd.Series, y: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def cov( + x: pd.Series, y: pd.Series, w: Union[Window, int, str] = Window(None, 0) +) -> pd.Series: """ Rolling co-variance of series over given window @@ -673,7 +735,10 @@ def cov(x: pd.Series, y: pd.Series, w: Union[Window, int, str] = Window(None, 0) w = normalize_window(x, w) assert x.index.is_monotonic_increasing, "series index is monotonic increasing" if isinstance(w.w, pd.DateOffset): - values = [x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].cov(y) for idx in x.index] + values = [ + x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].cov(y) + for idx in x.index + ] return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).cov(y), w) @@ -727,27 +792,40 @@ def zscores(x: pd.Series, w: Union[Window, int, str] = Window(None, 0)) -> pd.Se if isinstance(w, int): w = normalize_window(x, w) elif isinstance(w, str): - if not (isinstance(x.index, pd.DatetimeIndex) or isinstance(x.index[0], dt.date)): - raise MqValueError("When string is passed window index must be a DatetimeIndex or of type datetime.date") + if not ( + isinstance(x.index, pd.DatetimeIndex) or isinstance(x.index[0], dt.date) + ): + raise MqValueError( + "When string is passed window index must be a DatetimeIndex or of type datetime.date" + ) w = normalize_window(x, w) if not w.w: if x.size == 1: return pd.Series([0.0], index=x.index, dtype=np.dtype(float)) clean_series = x.dropna() - zscore_series = pd.Series(stats.zscore(clean_series, ddof=1), clean_series.index, dtype=np.dtype(float)) + zscore_series = pd.Series( + stats.zscore(clean_series, ddof=1), + clean_series.index, + dtype=np.dtype(float), + ) return interpolate(zscore_series, x, Interpolate.NAN) if not isinstance(w.w, int): w = normalize_window(x, w) dt_idx = pd.DatetimeIndex(x.index).date - values = [_zscore(x.loc[(dt_idx > (idx - w.w).date()) & (dt_idx <= idx)]) for idx in dt_idx] + values = [ + _zscore(x.loc[(dt_idx > (idx - w.w).date()) & (dt_idx <= idx)]) + for idx in dt_idx + ] return apply_ramp(pd.Series(values, index=x.index, dtype=np.dtype(float)), w) else: return apply_ramp(x.rolling(w.w, 0).apply(_zscore, raw=False), w) @plot_function -def winsorize(x: pd.Series, limit: float = 2.5, w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def winsorize( + x: pd.Series, limit: float = 2.5, w: Union[Window, int, str] = Window(None, 0) +) -> pd.Series: """ Limit extreme values in series @@ -808,12 +886,14 @@ def winsorize(x: pd.Series, limit: float = 2.5, w: Union[Window, int, str] = Win class Direction(Enum): - START_TODAY = 'start_today' - END_TODAY = 'end_today' + START_TODAY = "start_today" + END_TODAY = "end_today" @plot_function -def generate_series(length: int, direction: Direction = Direction.START_TODAY) -> pd.Series: +def generate_series( + length: int, direction: Direction = Direction.START_TODAY +) -> pd.Series: """ Generate sample timeseries @@ -861,7 +941,11 @@ def generate_series(length: int, direction: Direction = Direction.START_TODAY) - @plot_function -def percentiles(x: pd.Series, y: Optional[pd.Series] = None, w: Union[Window, int, str] = Window(None, 0)) -> pd.Series: +def percentiles( + x: pd.Series, + y: Optional[pd.Series] = None, + w: Union[Window, int, str] = Window(None, 0), +) -> pd.Series: """ Rolling percentiles over given window @@ -904,7 +988,7 @@ def percentiles(x: pd.Series, y: Optional[pd.Series] = None, w: Union[Window, in w = normalize_window(y, w) if isinstance(w.r, int) and w.r > len(y): - raise ValueError('Ramp value must be less than the length of the series y.') + raise ValueError("Ramp value must be less than the length of the series y.") if isinstance(w.w, int) and w.w > len(x): return pd.Series(dtype=float) @@ -914,20 +998,26 @@ def percentiles(x: pd.Series, y: Optional[pd.Series] = None, w: Union[Window, in if isinstance(w.w, pd.DateOffset): for idx, val in y.items(): - sample = x.loc[(x.index > ((idx - w.w).date() if convert_to_date else idx - w.w)) & (x.index <= idx)] - res.loc[idx] = percentileofscore(sample, val, kind='mean') + sample = x.loc[ + (x.index > ((idx - w.w).date() if convert_to_date else idx - w.w)) + & (x.index <= idx) + ] + res.loc[idx] = percentileofscore(sample, val, kind="mean") elif not y.empty: min_periods = 0 if isinstance(w.r, pd.DateOffset) else w.r - rolling_window = x[:y.index[-1]].rolling(w.w, min_periods) - percentile_on_x_index = rolling_window.apply(lambda a: percentileofscore(a, y[a.index[-1]:].iloc[0], - kind="mean")) + rolling_window = x[: y.index[-1]].rolling(w.w, min_periods) + percentile_on_x_index = rolling_window.apply( + lambda a: percentileofscore(a, y[a.index[-1] :].iloc[0], kind="mean") + ) joined_index = pd.concat([x, y], axis=1).index res = percentile_on_x_index.reindex(joined_index, method="ffill")[y.index] return apply_ramp(res, w) @plot_function -def percentile(x: pd.Series, n: float, w: Union[Window, int, str] = None) -> Union[pd.Series, float]: +def percentile( + x: pd.Series, n: float, w: Union[Window, int, str] = None +) -> Union[pd.Series, float]: """ Returns the nth percentile of a series. @@ -951,7 +1041,7 @@ def percentile(x: pd.Series, n: float, w: Union[Window, int, str] = None) -> Uni """ if not 0 <= n <= 100: - raise MqValueError('percentile must be in range [0, 100]') + raise MqValueError("percentile must be in range [0, 100]") x = x.dropna() if x.size < 1: return x @@ -963,11 +1053,17 @@ def percentile(x: pd.Series, n: float, w: Union[Window, int, str] = None) -> Uni if isinstance(w.w, pd.DateOffset): try: if isinstance(x.index, pd.DatetimeIndex): - values = [x.loc[(x.index > (idx - w.w)) & (x.index <= idx)].quantile(n) for idx in x.index] + values = [ + x.loc[(x.index > (idx - w.w)) & (x.index <= idx)].quantile(n) + for idx in x.index + ] else: - values = [x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].quantile(n) for idx in x.index] + values = [ + x.loc[(x.index > (idx - w.w).date()) & (x.index <= idx)].quantile(n) + for idx in x.index + ] except TypeError: - raise MqTypeError(f'cannot use relative dates with index {x.index}') + raise MqTypeError(f"cannot use relative dates with index {x.index}") res = pd.Series(values, index=x.index, dtype=np.dtype(float)) else: res = x.rolling(w.w, 0).quantile(n) @@ -999,19 +1095,32 @@ class LinearRegression: """ - def __init__(self, X: Union[pd.Series, List[pd.Series]], y: pd.Series, fit_intercept: bool = True): + def __init__( + self, + X: Union[pd.Series, List[pd.Series]], + y: pd.Series, + fit_intercept: bool = True, + ): if not isinstance(fit_intercept, bool): raise MqTypeError('expected a boolean value for "fit_intercept"') df = pd.concat(X, axis=1) if isinstance(X, list) else X.to_frame() df = sm.add_constant(df) if fit_intercept else df - df.columns = range(len(df.columns)) if fit_intercept else range(1, len(df.columns) + 1) + df.columns = ( + range(len(df.columns)) if fit_intercept else range(1, len(df.columns) + 1) + ) - df = df[~df.isin([np.nan, np.inf, -np.inf]).any(axis=1)] # filter out nan and inf + df = df[ + ~df.isin([np.nan, np.inf, -np.inf]).any(axis=1) + ] # filter out nan and inf y = y[~y.isin([np.nan, np.inf, -np.inf])] - df_aligned, y_aligned = df.align(y, 'inner', axis=0) # align series + df_aligned, y_aligned = df.align(y, "inner", axis=0) # align series - self._index_scope = range(0, len(df.columns)) if fit_intercept else range(1, len(df.columns) + 1) + self._index_scope = ( + range(0, len(df.columns)) + if fit_intercept + else range(1, len(df.columns) + 1) + ) self._res = sm.OLS(y_aligned, df_aligned).fit() self._fit_intercept = fit_intercept @@ -1051,7 +1160,11 @@ def predict(self, X_predict: Union[pd.Series, List[pd.Series]]) -> pd.Series: :param X_predict: the values for which to predict :return: predicted values """ - df = pd.concat(X_predict, axis=1) if isinstance(X_predict, list) else X_predict.to_frame() + df = ( + pd.concat(X_predict, axis=1) + if isinstance(X_predict, list) + else X_predict.to_frame() + ) return self._res.predict(sm.add_constant(df) if self._fit_intercept else df) @plot_method @@ -1093,20 +1206,32 @@ class RollingLinearRegression: """ - def __init__(self, X: Union[pd.Series, List[pd.Series]], y: pd.Series, w: int, fit_intercept: bool = True): + def __init__( + self, + X: Union[pd.Series, List[pd.Series]], + y: pd.Series, + w: int, + fit_intercept: bool = True, + ): if not isinstance(fit_intercept, bool): raise MqTypeError('expected a boolean value for "fit_intercept"') df = pd.concat(X, axis=1) if isinstance(X, list) else X.to_frame() df = sm.add_constant(df) if fit_intercept else df - df.columns = range(len(df.columns)) if fit_intercept else range(1, len(df.columns) + 1) + df.columns = ( + range(len(df.columns)) if fit_intercept else range(1, len(df.columns) + 1) + ) if w <= len(df.columns): - raise MqValueError('Window length must be larger than the number of explanatory variables') + raise MqValueError( + "Window length must be larger than the number of explanatory variables" + ) - df = df[~df.isin([np.nan, np.inf, -np.inf]).any(axis=1)] # filter out nan and inf + df = df[ + ~df.isin([np.nan, np.inf, -np.inf]).any(axis=1) + ] # filter out nan and inf y = y[~y.isin([np.nan, np.inf, -np.inf])] - df_aligned, y_aligned = df.align(y, 'inner', axis=0) # align series + df_aligned, y_aligned = df.align(y, "inner", axis=0) # align series self._X = df_aligned.copy() self._res = RollingOLS(y_aligned, df_aligned, w).fit() @@ -1187,10 +1312,17 @@ class SIRModel: """ - def __init__(self, beta: float = None, gamma: float = None, s: Union[pd.Series, float] = None, - i: Union[pd.Series, float] = None, r: Union[pd.Series, float] = None, - n: Union[pd.Series, float] = None, fit: bool = True, - fit_period: int = None): + def __init__( + self, + beta: float = None, + gamma: float = None, + s: Union[pd.Series, float] = None, + i: Union[pd.Series, float] = None, + r: Union[pd.Series, float] = None, + n: Union[pd.Series, float] = None, + fit: bool = True, + fit_period: int = None, + ): if not isinstance(fit, bool): raise MqTypeError('expected a boolean value for "fit"') @@ -1201,11 +1333,15 @@ def __init__(self, beta: float = None, gamma: float = None, s: Union[pd.Series, i = 1 if i is None else i r = 0 if r is None else r - data_start = [ts.index.min().date() for ts in [s, i, r] if isinstance(ts, pd.Series)] + data_start = [ + ts.index.min().date() for ts in [s, i, r] if isinstance(ts, pd.Series) + ] data_start.append(DataContext.current.start_date) start_date = max(data_start) - data_end = [ts.index.max().date() for ts in [s, i, r] if isinstance(ts, pd.Series)] + data_end = [ + ts.index.max().date() for ts in [s, i, r] if isinstance(ts, pd.Series) + ] data_end.append(DataContext.current.end_date) end_date = max(data_end) @@ -1227,19 +1363,35 @@ def __init__(self, beta: float = None, gamma: float = None, s: Union[pd.Series, beta_init = self.beta_init if self.beta_init is not None else 0.9 gamma_init = self.gamma_init if self.gamma_init is not None else 0.01 - parameters, initial_conditions = SIR.get_parameters(self.s.iloc[0], self.i.iloc[0], self.r.iloc[0], n, - beta=beta_init, gamma=gamma_init, - beta_fixed=self.beta_fixed, gamma_fixed=self.gamma_fixed, - S0_fixed=True, I0_fixed=True, R0_fixed=True) + parameters, initial_conditions = SIR.get_parameters( + self.s.iloc[0], + self.i.iloc[0], + self.r.iloc[0], + n, + beta=beta_init, + gamma=gamma_init, + beta_fixed=self.beta_fixed, + gamma_fixed=self.gamma_fixed, + S0_fixed=True, + I0_fixed=True, + R0_fixed=True, + ) self.parameters = parameters - self._model = EpidemicModel(SIR, parameters=parameters, data=data, initial_conditions=initial_conditions, - fit_period=self.fit_period) + self._model = EpidemicModel( + SIR, + parameters=parameters, + data=data, + initial_conditions=initial_conditions, + fit_period=self.fit_period, + ) if self.fit: self._model.fit(verbose=False) t = np.arange((end_date - start_date).days + 1) - predict = self._model.solve(t, (self.s0(), self.i0(), self.r0()), (self.beta(), self.gamma(), n)) + predict = self._model.solve( + t, (self.s0(), self.i0(), self.r0()), (self.beta(), self.gamma(), n) + ) predict_dates = pd.date_range(start_date, end_date) @@ -1255,8 +1407,8 @@ def s0(self) -> float: :return: initial susceptible individuals """ if self.fit: - return self._model.fitted_parameters['S0'] - return self.parameters['S0'].value + return self._model.fitted_parameters["S0"] + return self.parameters["S0"].value @plot_method def i0(self) -> float: @@ -1266,8 +1418,8 @@ def i0(self) -> float: :return: initial infectious individuals """ if self.fit: - return self._model.fitted_parameters['I0'] - return self.parameters['I0'].value + return self._model.fitted_parameters["I0"] + return self.parameters["I0"].value @plot_method def r0(self) -> float: @@ -1277,8 +1429,8 @@ def r0(self) -> float: :return: initial recovered individuals """ if self.fit: - return self._model.fitted_parameters['R0'] - return self.parameters['R0'].value + return self._model.fitted_parameters["R0"] + return self.parameters["R0"].value @plot_method def beta(self) -> float: @@ -1288,8 +1440,8 @@ def beta(self) -> float: :return: beta """ if self.fit: - return self._model.fitted_parameters['beta'] - return self.parameters['beta'].value + return self._model.fitted_parameters["beta"] + return self.parameters["beta"].value @plot_method def gamma(self) -> float: @@ -1299,8 +1451,8 @@ def gamma(self) -> float: :return: beta """ if self.fit: - return self._model.fitted_parameters['gamma'] - return self.parameters['gamma'].value + return self._model.fitted_parameters["gamma"] + return self.parameters["gamma"].value @plot_method def predict_s(self) -> pd.Series: @@ -1371,10 +1523,19 @@ class SEIRModel(SIRModel): """ - def __init__(self, beta: float = None, gamma: float = None, sigma: float = None, s: Union[pd.Series, float] = None, - e: Union[pd.Series, float] = None, i: Union[pd.Series, float] = None, - r: Union[pd.Series, float] = None, n: Union[pd.Series, float] = None, - fit: bool = True, fit_period: int = None): + def __init__( + self, + beta: float = None, + gamma: float = None, + sigma: float = None, + s: Union[pd.Series, float] = None, + e: Union[pd.Series, float] = None, + i: Union[pd.Series, float] = None, + r: Union[pd.Series, float] = None, + n: Union[pd.Series, float] = None, + fit: bool = True, + fit_period: int = None, + ): if not isinstance(fit, bool): raise MqTypeError('expected a boolean value for "fit"') @@ -1386,11 +1547,15 @@ def __init__(self, beta: float = None, gamma: float = None, sigma: float = None, i = 1 if i is None else i r = 0 if r is None else r - data_start = [ts.index.min().date() for ts in [s, i, r] if isinstance(ts, pd.Series)] + data_start = [ + ts.index.min().date() for ts in [s, i, r] if isinstance(ts, pd.Series) + ] data_start.append(DataContext.current.start_date) start_date = max(data_start) - data_end = [ts.index.max().date() for ts in [s, i, r] if isinstance(ts, pd.Series)] + data_end = [ + ts.index.max().date() for ts in [s, i, r] if isinstance(ts, pd.Series) + ] data_end.append(DataContext.current.end_date) end_date = max(data_end) @@ -1416,25 +1581,45 @@ def __init__(self, beta: float = None, gamma: float = None, sigma: float = None, gamma_init = self.gamma_init if self.gamma_init is not None else 0.01 sigma_init = self.sigma_init if self.sigma_init is not None else 0.2 - parameters, initial_conditions = SEIR.get_parameters(self.s.iloc[0], self.e.iloc[0], - self.i.iloc[0], self.r.iloc[0], n, - beta=beta_init, gamma=gamma_init, sigma=sigma_init, - beta_fixed=self.beta_fixed, - gamma_fixed=self.gamma_fixed, - sigma_fixed=self.sigma_fixed, - S0_fixed=True, I0_fixed=True, - R0_fixed=True, E0_fixed=True, S0_max=5e6, I0_max=5e6, - E0_max=10e6, R0_max=10e6) + parameters, initial_conditions = SEIR.get_parameters( + self.s.iloc[0], + self.e.iloc[0], + self.i.iloc[0], + self.r.iloc[0], + n, + beta=beta_init, + gamma=gamma_init, + sigma=sigma_init, + beta_fixed=self.beta_fixed, + gamma_fixed=self.gamma_fixed, + sigma_fixed=self.sigma_fixed, + S0_fixed=True, + I0_fixed=True, + R0_fixed=True, + E0_fixed=True, + S0_max=5e6, + I0_max=5e6, + E0_max=10e6, + R0_max=10e6, + ) self.parameters = parameters - self._model = EpidemicModel(SEIR, parameters=parameters, data=data, initial_conditions=initial_conditions, - fit_period=self.fit_period) + self._model = EpidemicModel( + SEIR, + parameters=parameters, + data=data, + initial_conditions=initial_conditions, + fit_period=self.fit_period, + ) if self.fit: self._model.fit(verbose=False) t = np.arange((end_date - start_date).days + 1) - predict = self._model.solve(t, (self.s0(), self.e0(), self.i0(), self.r0()), - (self.beta(), self.gamma(), self.sigma(), n)) + predict = self._model.solve( + t, + (self.s0(), self.e0(), self.i0(), self.r0()), + (self.beta(), self.gamma(), self.sigma(), n), + ) predict_dates = pd.date_range(start_date, end_date) @@ -1451,8 +1636,8 @@ def e0(self) -> float: :return: initial exposed individuals """ if self.fit: - return self._model.fitted_parameters['E0'] - return self.parameters['E0'].value + return self._model.fitted_parameters["E0"] + return self.parameters["E0"].value @plot_method def beta(self) -> float: @@ -1462,8 +1647,8 @@ def beta(self) -> float: :return: beta """ if self.fit: - return self._model.fitted_parameters['beta'] - return self.parameters['beta'].value + return self._model.fitted_parameters["beta"] + return self.parameters["beta"].value @plot_method def gamma(self) -> float: @@ -1473,8 +1658,8 @@ def gamma(self) -> float: :return: gamma """ if self.fit: - return self._model.fitted_parameters['gamma'] - return self.parameters['gamma'].value + return self._model.fitted_parameters["gamma"] + return self.parameters["gamma"].value @plot_method def sigma(self) -> float: @@ -1484,8 +1669,8 @@ def sigma(self) -> float: :return: sigma """ if self.fit: - return self._model.fitted_parameters['sigma'] - return self.parameters['sigma'].value + return self._model.fitted_parameters["sigma"] + return self.parameters["sigma"].value @plot_method def predict_e(self) -> pd.Series: