From b1059cabf9abff983447c86c627c78671e543c08 Mon Sep 17 00:00:00 2001 From: Jeff Lin Date: Thu, 1 Jul 2021 16:39:22 -0400 Subject: [PATCH 1/3] TBD Discussion on StreamSets and the chaining [ch13505] --- btrdb/stream.py | 56 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/btrdb/stream.py b/btrdb/stream.py index 109346c..d2ef884 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -1107,13 +1107,19 @@ def clone(self): setattr(clone, attr, deepcopy(val)) return clone - def windows(self, width, depth): + def windows(self, *args, **kwargs): """ Stores the request for a windowing operation when the query is eventually materialized. Parameters ---------- + start : int or datetime like object + the inclusive start of the query (see :func:`btrdb.utils.timez.to_nanoseconds` + for valid input types) + end : int or datetime like object + the exclusive end of the query (see :func:`btrdb.utils.timez.to_nanoseconds` + for valid input types) width : int The number of nanoseconds to use for each window size. depth : int @@ -1145,6 +1151,26 @@ def windows(self, width, depth): specified. This is much faster to execute on the database side. """ + start, end = None, None + if len(args) < 2 and len(kwargs) < 2: + raise UserWarning("Require 'width' and 'depth'") + elif len(args) == 2 and len(kwargs) == 0: + width, depth = args + elif len(args) == 0 and len(kwargs) == 2: + width, depth = kwargs.get('width', None) and kwargs.get('depth', None) + elif len(args) == 0 and len(kwargs) == 4: + start, end, width, depth = [kwargs.get(k, None) for k in ('start', 'end', 'width', 'depth')] + elif len(args) == 4 and len(kwargs) == 0: + start, end, width, depth = args + else: + if kwargs.get('width', None) and kwargs.get('depth', None): + start, end = args + width, depth = kwargs.get('width', None) and kwargs.get('depth', None) + else: + width, depth = args + start, end = kwargs.get('start', None), kwargs.get('end', None) + if start is not None and end is not None: + self = self.filter(start, end) # because the `filter` returns an copy object. if not self.allow_window: raise InvalidOperation("A window operation is already requested") @@ -1153,13 +1179,19 @@ def windows(self, width, depth): self.depth = int(depth) return self - def aligned_windows(self, pointwidth): + def aligned_windows(self, *args, **kwargs): """ Stores the request for an aligned windowing operation when the query is eventually materialized. Parameters ---------- + start : int or datetime like object + the inclusive start of the query (see :func:`btrdb.utils.timez.to_nanoseconds` + for valid input types) + end : int or datetime like object + the exclusive end of the query (see :func:`btrdb.utils.timez.to_nanoseconds` + for valid input types) pointwidth : int The length of each returned window as computed by 2^pointwidth. @@ -1181,6 +1213,26 @@ def aligned_windows(self, pointwidth): omitted. """ + start, end = None, None + if len(args) < 1 and len(kwargs) < 1: + raise UserWarning("Require 'width' and 'depth'") + elif len(args) == 1 and len(kwargs) == 0: + pointwidth = args[0] + elif len(args) == 0 and len(kwargs) == 1: + pointwidth = kwargs.get('pointwidth', None) + elif len(args) == 0 and len(kwargs) == 3: + start, end, pointwidth = [kwargs.get(k, None) for k in ('start', 'end', 'pointwidth')] + elif len(args) == 3 and len(kwargs) == 0: + start, end, pointwidth = args + else: + if kwargs.get('pointwidth', None): + start, end = args + pointwidth = kwargs.get('pointwidth', None) + else: + pointwidth = args[0] + start, end = kwargs.get('start', None), kwargs.get('end', None) + if start is not None and end is not None: + self = self.filter(start, end) # because the `filter` returns an copy object. if not self.allow_window: raise InvalidOperation("A window operation is already requested") From 86bafc09eb77cf456f0c6fea23d3dfe645d01760 Mon Sep 17 00:00:00 2001 From: Jeff Lin Date: Fri, 2 Jul 2021 11:25:43 -0400 Subject: [PATCH 2/3] fix TypeError for tuple unpacking --- btrdb/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/btrdb/stream.py b/btrdb/stream.py index d2ef884..c1ad1b8 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -1165,7 +1165,7 @@ def windows(self, *args, **kwargs): else: if kwargs.get('width', None) and kwargs.get('depth', None): start, end = args - width, depth = kwargs.get('width', None) and kwargs.get('depth', None) + width, depth = kwargs.get('width', None), kwargs.get('depth', None) else: width, depth = args start, end = kwargs.get('start', None), kwargs.get('end', None) From 3bfba3d91db0fce97942a53278e7ab32029ef1b8 Mon Sep 17 00:00:00 2001 From: Jeff Lin Date: Fri, 2 Jul 2021 11:35:53 -0400 Subject: [PATCH 3/3] fix TypeError for tuple unpacking (2) --- btrdb/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/btrdb/stream.py b/btrdb/stream.py index c1ad1b8..280be7c 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -1157,7 +1157,7 @@ def windows(self, *args, **kwargs): elif len(args) == 2 and len(kwargs) == 0: width, depth = args elif len(args) == 0 and len(kwargs) == 2: - width, depth = kwargs.get('width', None) and kwargs.get('depth', None) + width, depth = kwargs.get('width', None), kwargs.get('depth', None) elif len(args) == 0 and len(kwargs) == 4: start, end, width, depth = [kwargs.get(k, None) for k in ('start', 'end', 'width', 'depth')] elif len(args) == 4 and len(kwargs) == 0: @@ -1215,7 +1215,7 @@ def aligned_windows(self, *args, **kwargs): """ start, end = None, None if len(args) < 1 and len(kwargs) < 1: - raise UserWarning("Require 'width' and 'depth'") + raise UserWarning("Require 'pointwidth'") elif len(args) == 1 and len(kwargs) == 0: pointwidth = args[0] elif len(args) == 0 and len(kwargs) == 1: