From 17b90c5090afbd3b25be29daf9f5075fdb1c74bd Mon Sep 17 00:00:00 2001 From: Eoin Condron Date: Thu, 25 Sep 2025 12:33:52 +0100 Subject: [PATCH 1/6] Add nanops_numba module with numba implementations of nanops including parallel chunking for large arrays --- pandas/core/nanops_numba.py | 869 ++++++++++++++++++++++++++++++++++++ 1 file changed, 869 insertions(+) create mode 100644 pandas/core/nanops_numba.py diff --git a/pandas/core/nanops_numba.py b/pandas/core/nanops_numba.py new file mode 100644 index 0000000000000..e0d107fac2b6d --- /dev/null +++ b/pandas/core/nanops_numba.py @@ -0,0 +1,869 @@ +# Module for implementing nanops in numba +from typing import ( + Any, + Callable, + Literal, + Optional, + Tuple, + TypeVar, +) + +import numba as nb +from numba.core.extending import overload +from numba.typed import List as NumbaList +import numpy as np + +from pandas._typing import ( + AxisInt, + npt, +) + +T = TypeVar("T") +R = TypeVar("R") +F = TypeVar("F", bound=Callable[..., Any]) + + +MIN_INT = np.iinfo(np.int64).min + + +def is_null(x): + """ + Check if a value is considered null/NA. + + Parameters + ---------- + x : scalar + Value to check + + Returns + ------- + bool + True if value is null, False otherwise + + Notes + ----- + This function is overloaded with specialized implementations for + various numeric types via Numba's overload mechanism. + """ + dtype = np.asarray(x).dtype + if np.issubdtype(dtype, np.float64): + return np.isnan(x) + + elif np.issubdtype(dtype, np.int64): + return x == MIN_INT + + else: + return False + + +@overload(is_null) # type: ignore +def jit_is_null(x): + if isinstance(x, nb.types.Integer): + + def is_null(x): + return x == MIN_INT + + return is_null + + elif isinstance(x, nb.types.Float) or isinstance(x, float): + + def is_null(x): + return np.isnan(x) + + return is_null + + elif isinstance(x, nb.types.Boolean): + + def is_null(x): + return False + + return is_null + + else: + + return np.isnat + + +def null_for_np_type(dtype): + """ + Return the appropriate null value for a given numpy dtype. + + Parameters + ---------- + dtype : np.dtype + NumPy data type to get null value for + + Returns + ------- + scalar + NaT for datetime/timedelta types, np.nan for other types + + Notes + ----- + For datetime64 and timedelta64 dtypes (kind 'm' or 'M'), returns + the appropriate NaT (Not a Time) value. For all other dtypes, + returns np.nan. + """ + if dtype.kind in "mM": + return np.array(["NaT"], dtype=dtype)[0] + else: + return np.nan + + +@nb.njit(nogil=True) +def _get_initial_value( + arr, skipna: bool = True, mask: Optional[np.ndarray] = None +) -> Tuple[int, T]: + """ + Find the first value in an array for use in a reduction, and its location. + If skipna is True then we find the first non-null value, other wise just the first value. + If the array is empty, return a null value. + + Parameters + ---------- + arr : array-like + Array to search for non-null values + mask: np.ndarray + Boolean mask where True indicates null values in an extension array + + Returns + ------- + tuple + (index, value) of first non-null value, or (-1, np.nan) if all values are null + + Notes + ----- + This function is JIT-compiled with Numba for performance. + """ + if not skipna: + if mask is not None and mask[0]: + return 0, np.nan + elif is_null(arr[0]): + return 0, np.nan + else: + return 0, arr[0] + elif mask is not None: + for i, x in enumerate(arr): + if not mask[i]: + return i, x + else: + for i, x in enumerate(arr): + if not is_null(x): + return i, x + return -1, np.nan + + +_SCALAR_SIGNATURES = [ + "float64(float64, float64)", + "uint64(uint64, uint64)", + "int64(int64, int64)", +] + + +def _njit_scalar_reduce(func): + """ + Decorator to create numba-compiled scalar reduction functions. + + Parameters + ---------- + func : callable + Scalar reduction function taking two arguments + + Returns + ------- + staticmethod + Numba-compiled version of the function with standard signatures + + Notes + ----- + This decorator compiles the function with predefined signatures for + common numeric types (float64, uint64, int64) and enables nogil mode + for better performance in multithreaded environments. + """ + return staticmethod(nb.njit(_SCALAR_SIGNATURES, nogil=True)(func)) + + +class NumbaReductionOps: + """ + Collection of numba implementations of scalar reduction ops""" + + @_njit_scalar_reduce + def count(x, y): + return x + 1 + + @_njit_scalar_reduce + def min(x, y): + return x if x <= y else y + + @_njit_scalar_reduce + def max(x, y): + return x if x >= y else y + + @_njit_scalar_reduce + def sum(x, y): + return x + y + + @_njit_scalar_reduce + def prod(x, y): + return x + y + + @_njit_scalar_reduce + def sum_square(x, y): + return x + float(y) ** 2 + + @_njit_scalar_reduce + def any(x, y): + return x or y + + @_njit_scalar_reduce + def all(x, y): + return x and y + + +@nb.njit(nogil=True) +def _nb_reduce_single_arr( + reduce_func: Callable, + arr: np.ndarray, + skipna: bool = True, + find_initial_value: bool = True, + mask: Optional[np.ndarray] = None, +) -> Tuple[float | int, int]: + """ + Apply a reduction function to a numpy array, with NA/null handling. + Returns the count of non-nulls as well as the reduction. + + Parameters + ---------- + reduce_func : callable + Function that combines two values (e.g., min, max, sum) + arr : array-like + Array to reduce + skipna : bool, default True + Whether to skip NA/null values + initial_value: + Initial_value for each reduction. Should be 0 or None. + If None, we find the first_non_null value before commencing the reduction + + Returns + ------- + scalar + Result of the reduction operation + + Notes + ----- + This function is JIT-compiled with Numba for performance. + """ + if not find_initial_value: + initial_value = 0.0 + initial_loc = -1 + count = 0 + + else: + # find the initial non-null value to pass through the reduction + # If the array is empty then this returns the type-appropriate null + initial_loc, initial_value = _get_initial_value(arr, skipna=skipna, mask=mask) + if is_null( + initial_value + ): # skipna is False and initial value is null, or all values are null + return np.nan, 0 + else: + count = 1 + + arr = arr[initial_loc + 1 :] + result = initial_value + if mask is not None: + mask = mask[initial_loc + 1 :] + for x, mask_i in zip(arr, mask): + if mask_i: + if skipna: + continue + else: + return np.nan, count + + result = reduce_func(result, x) + count += 1 + + else: + for x in arr: + if is_null(x): + if skipna: + continue + else: + return np.nan, count + + result = reduce_func(result, x) + count += 1 + + return result, count + + +@nb.njit(nogil=True, parallel=True) +def _nb_reduce_arr_list_in_parallel( + reduce_func: Callable, + arr_list: NumbaList[np.ndarray] | np.ndarray, # type: ignore + target: np.ndarray, + mask_list: Optional[np.ndarray] | np.ndarray, + skipna: bool = True, + find_initial_value: bool = True, +) -> Tuple[float | int, int]: + counts = np.zeros(len(arr_list), dtype=np.int64) + for i in nb.prange(len(arr_list)): + arr = arr_list[i] + if mask_list is None: + mask = None + else: + mask = mask_list[i] + + target[i], counts[i] = _nb_reduce_single_arr( + reduce_func, + arr, + skipna=skipna, + find_initial_value=find_initial_value, + mask=mask, + ) + + return target, counts + + +def reduction_return_type_and_empty_result_for_op_and_type( + dtype, op: Literal["count", "min", "max", "sum", "sum_square", "mean"] +): + """ + Determine the return dtype and empty result value for a reduction operation. + + Parameters + ---------- + dtype : np.dtype + Input array dtype + op : {"count", "min", "max", "sum", "sum_square", "mean"} + Reduction operation to perform + + Returns + ------- + tuple + (return_dtype, empty_result_value) for the given operation and input dtype + + Notes + ----- + This function defines the type promotion rules and empty result values + for various reduction operations on different input dtypes. + """ + if op == "count": + return np.int64, 0 + elif op in ("min", "max"): + return dtype, null_for_np_type(dtype) + elif op == "sum": + match dtype.kind: + case "f": + return np.dtype("float64"), 0.0 + case "u": + return np.dtype("uint64"), 0 + case "m": + return dtype, np.timedelta64(0) + case "M": + return dtype, np.datetime64(0, "ns") + case _: + return np.dtype("int64"), 0 + elif op == "mean": + # always use floats for mean/var/std calculation to avoid overflow + if dtype.kind in "mM": + return dtype, null_for_np_type(dtype) + else: + return np.dtype("float64"), np.nan + elif op == "sum_square": + return np.dtype("float64"), np.nan + else: + raise ValueError( + 'op must be one of ["count", "min", "max", "sum", "sum_square"]' + ) + + +def _nullify_below_mincount(result, count, min_count): + """ + Set result elements to null where count is below minimum threshold. + + Parameters + ---------- + result : np.ndarray + Result array to modify + count : np.ndarray + Count of valid values for each result element + min_count : int + Minimum number of non-null values required + + Returns + ------- + np.ndarray + Modified result array with nullified values + + Notes + ----- + For unsigned integer dtypes, uses MIN_INT as null value. + For all other dtypes, uses np.nan as null value. + """ + if result.dtype.kind in "ui": + null = MIN_INT + else: + null = np.nan + + result[count < min_count] = null + + return result + + +def _reduce_empty_array(op, values: np.ndarray, axis: int, min_count: int = 0): + return_type, empty_result = reduction_return_type_and_empty_result_for_op_and_type( + values.dtype, op + ) + if min_count > 0: + empty_result = null_for_np_type(return_type) + if values.ndim == 2 and axis is not None: + n = values.shape[1 - axis] + return np.full(n, empty_result), np.zeros(n) + else: + return empty_result, 0 + + +def _chunk_arr_into_arr_list( + values: np.ndarray, + multi_threading: bool, + axis: Optional[int], + mask: Optional[np.ndarray] = None, +) -> NumbaList: + """ + Split arrays into chunks for parallel processing in reduction operations. + + Parameters + ---------- + values : np.ndarray + Input array to be chunked. Must be 1D or 2D. + multi_threading : bool + If True, split array into multiple chunks for parallel processing. + If False, return single chunk (no parallelization). + axis : int or None + Reduction axis. For 2D arrays: + - axis=0: transpose array so reduction operates along columns + - axis=1: keep array as-is, reduction operates along rows + - axis=None: flatten to 1D for scalar reduction + mask : np.ndarray, optional + Boolean mask indicating null values. If provided, will be split + consistently with values array. + + Returns + ------- + tuple + - arr_list : NumbaList + List of array chunks ready for parallel processing + - mask_list : NumbaList + List of corresponding mask chunks (empty if mask=None) + - final_length : int + Length of the final reduction dimension. 0 for 1D arrays, + number of columns/rows for 2D arrays. + + Notes + ----- + Thread count is determined automatically based on array size when + multi_threading=True, with a maximum of 6 threads and minimum of 1. + Arrays smaller than 1 million elements use single threading. + + For 1D arrays, the array is split into n_threads chunks along axis 0. + For 2D arrays, the array is either transposed (axis=0) or used as-is + (axis=1) to prepare for row-wise or column-wise reductions. + + Raises + ------ + ValueError + If input array has more than 2 dimensions. + + Examples + -------- + >>> arr = np.array([[1, 2, 3], [4, 5, 6]]) + >>> arr_list, mask_list, final_length = _chunk_arr_into_arr_list( + ... arr, multi_threading=False, axis=0 + ... ) + >>> final_length + 3 + >>> len(arr_list) + 3 + """ + ndim = values.ndim + if multi_threading: + # TODO: be smarter about this choice. numba is handling the distribution of the compute + # so don't need to worry about setting it too high + max_n_threads = min(6, values.size // 1e6) + n_threads = max(1, max_n_threads) + else: + n_threads = 1 + + if mask is None: + mask = np.array([]) + + if ndim == 1: + axis = 0 + arr_list = np.array_split(values, n_threads) # type: ignore + mask_list = np.array_split(mask, n_threads) # type: ignore + final_length = 0 + + elif ndim == 2: + if axis == 0: + arr_list = values.T + mask_list = mask.T + else: + arr_list = values + mask_list = mask + final_length = 0 if axis is None else len(arr_list) + else: + raise ValueError("Only arrays of 1 or 2 dimensions are supported") + + if len(arr_list) < n_threads: + arr_list = [ + chunk for row in arr_list for chunk in np.array_split(row, n_threads) + ] + mask_list = [ + chunk for row in mask_list for chunk in np.array_split(row, n_threads) + ] + + arr_list, mask_list = NumbaList(arr_list), NumbaList(mask_list) + return arr_list, mask_list, final_length + + +def _reduce_chunked_results( + op, + chunk_results: np.ndarray, + counts: np.ndarray, + final_length: int, + return_dtype: np.dtype, + **kwargs, +): + chunk_reducer = ( + NumbaReductionOps.sum if op == "sum_square" else getattr(NumbaReductionOps, op) + ) + + if final_length == 0: + # we chunked and want to reduce both axes + result, _ = _nb_reduce_single_arr(chunk_reducer, arr=chunk_results, **kwargs) + count = counts.sum() + elif len(chunk_results) > final_length: + # We chunked along both axes and want to reduce a single axis + arr_list = np.array_split(chunk_results, final_length) + target = np.zeros(final_length, dtype=np.float64) # type: ignore + result, _ = _nb_reduce_arr_list_in_parallel( + chunk_reducer, arr_list=arr_list, mask_list=None, target=target, **kwargs + ) + count = [c.sum() for c in np.array_split(counts, final_length)] + else: + result, count = chunk_results, counts + + result, count = map(np.atleast_1d, (result, count)) + + return result, count + + +def nb_reduce( + op: Literal["count", "min", "max", "sum", "sum_square"], + values: np.ndarray, + axis: Optional[int] = None, + skipna: bool = True, + min_count: int = 0, + mask: Optional[np.ndarray] = None, + multi_threading: bool = True, +) -> Tuple[np.ndarray, np.ndarray]: + """ + Apply a reduction operation to a numpy array using Numba-accelerated functions. + + Parameters + ---------- + op : {"count", "min", "max", "sum", "sum_square"} + The reduction operation to perform + arr : np.ndarray + Input array to reduce (1D or 2D) + axis : int, optional + Axis along which to perform the reduction. If None, reduces over all elements. + For 2D arrays, axis=0 reduces along rows, axis=1 reduces along columns. + skipna : bool, default True + Whether to skip NA/null values during reduction + multi_threading : bool, default True + Whether to use parallel processing by splitting array into chunks (1D only) + + Returns + ------- + tuple[float | int | np.ndarray, int | np.ndarray] + Two-element tuple containing: + - Reduction result (scalar for 1D or axis=None, array for 2D with specified axis) + - Count of non-null values processed (scalar or array matching result shape) + + Notes + ----- + This function provides high-performance reduction operations by leveraging + Numba's JIT compilation and optional parallel processing. For 1D arrays with + multi_threading=True, the array is split into chunks processed in parallel. + + Supports arrays up to 2 dimensions: + - 1D arrays: reduces to scalar + - 2D arrays: reduces along specified axis or to scalar if axis=None + + The function handles null values according to the skipna parameter: + - If skipna=True: null values are ignored in the reduction + - If skipna=False: any null value causes early termination + + For integer arrays, MIN_INT is used as the null sentinel value. + For float arrays, NaN is used as the null value. + + Examples + -------- + >>> import numpy as np + >>> # 1D array reduction + >>> arr = np.array([1.0, 2.0, np.nan, 4.0]) + >>> result, count = nb_reduce("sum", arr, skipna=True) + >>> result, count + (np.float64(7.0), 3) + + >>> # 2D array reduction along axis + >>> arr_2d = np.array([[1.0, 2.0], [3.0, np.nan]]) + >>> result, count = nb_reduce("sum", arr_2d, axis=0, skipna=True) + >>> result, count + (array([4., 2.]), array([2, 1])) + """ + values = np.asarray(values) + if values.dtype.kind in "bui" and mask is None: + skipna = False + + elif values.dtype.kind == "c": + kwargs = locals().copy() + real_piece, count = nb_reduce(**(kwargs | {"values": values.real})) + imaginary_piece, count = nb_reduce(**(kwargs | {"values": values.imag})) + return real_piece + 1j * imaginary_piece, count + + if values.size == 0: + return _reduce_empty_array(op, values, axis=axis, min_count=min_count) + + return_dtype, _ = reduction_return_type_and_empty_result_for_op_and_type( + values.dtype, op + ) + + is_timelike = values.dtype.kind in "mM" + if is_timelike: + values = values.view(int) + + ndim = np.ndim(values) + if not (axis is None or axis < ndim): + raise ValueError(f"axis {axis} out-of-bounds for array of dimension {ndim}") + + return_scalar = ndim == 1 or axis is None + + reduce_op = "sum" if op == "mean" else op + reduce_func = getattr(NumbaReductionOps, reduce_op) + + arr_list, mask_list, final_length = _chunk_arr_into_arr_list( + values, multi_threading=multi_threading, axis=axis, mask=mask + ) + + kwargs = { + "skipna": skipna, + "find_initial_value": "sum" not in reduce_op, + } + target = np.zeros(len(arr_list), dtype=np.float64) # type: ignore + + result, count = _nb_reduce_arr_list_in_parallel( + reduce_func=reduce_func, + arr_list=arr_list, + target=target, + mask_list=None if mask is None else mask_list, + **kwargs, + ) + result, count = _reduce_chunked_results( + reduce_op, + result, + count, + final_length=final_length, + return_dtype=return_dtype, + **kwargs, + ) + + if op in ["mean", "sum_square"]: + if op == "mean": + with np.errstate(invalid="ignore", divide="ignore"): + result = result / count + if not skipna: + # null integers need to be nullified here as dividing by the count + # causes MIN_INT results to increase + null = count < values.shape[axis or 0] + result[null] = np.nan + + if min_count > 0: + result = _nullify_below_mincount(result, count, min_count) + + if return_dtype.kind in "mM": + result = _cast_to_timelike(result, return_dtype) + + elif return_dtype.kind == "f" or not np.isnan(result).any(): + result = result.astype(return_dtype, copy=False) + + if return_scalar: + result, count = result[0], int(count[0]) + result = result.dtype.type(result) + + return result, count + + +def _cast_to_timelike(arr, to_dtype): + """ + Convert a float array to timelike (datetime/timedelta) dtype. + + Parameters + ---------- + arr : np.ndarray + Float array to convert + to_dtype : np.dtype + Target datetime or timedelta dtype + + Returns + ------- + np.ndarray + Array converted to timelike dtype with NaN values replaced by MIN_INT + + Notes + ----- + This function is used to convert float arrays back to timelike dtypes + after reduction operations. NaN values are replaced with MIN_INT before + conversion to preserve null representation in integer-based time types. + """ + isnan = np.isnan(arr) + if isnan.any(): + arr[isnan] = MIN_INT + arr = arr.astype(int, copy=False).astype(to_dtype, copy=False) + + return arr + + +def nanmax( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + min_count: "int" = 0, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return nb_reduce("max", **locals())[0] + + +def nanmin( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + min_count: "int" = 0, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return nb_reduce("min", **locals())[0] + + +def nansum( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + min_count: "int" = 0, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return nb_reduce("sum", **locals())[0] + + +def nanmean( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + min_count: "int" = 0, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return nb_reduce("mean", **locals())[0] + + +def _nanvar_std_sem( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + ddof: int = 1, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, + std: bool = False, + sem: bool = False, +) -> np.ndarray: + kwargs = locals().copy() + if values.dtype.kind == "c": + return sum( + _nanvar_std_sem(**(kwargs | {"values": x})) + for x in [values.real, values.imag] + ) + + dtype = values.dtype + is_timelike = dtype.kind in "mM" + + del kwargs["ddof"], kwargs["std"], kwargs["sem"] + mean, count = nb_reduce("mean", **kwargs) + + if np.ndim(mean) == 1 and len(mean) == len(values): + kwargs["values"] = (values.T - mean).T + else: + kwargs["values"] = values - mean + + sum_of_squares, count = nb_reduce("sum_square", **kwargs) + + if np.ndim(mean) == 0: + if is_null(mean) or count <= ddof: + return np.timedelta64(MIN_INT) if is_timelike else np.nan + + result = sum_of_squares / (count - ddof) + + if std or sem: + result = result.astype(float, copy=False) ** 0.5 + if sem: + result = result / np.sqrt(count) + if is_timelike: + result = np.array(result).astype(dtype.str.replace("M", "m")) + if np.ndim(result) == 0: + result = np.timedelta64(result) + + return result + + +def nanvar( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + ddof: int = 1, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return _nanvar_std_sem(**locals()) + + +def nanstd( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + ddof: int = 1, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return _nanvar_std_sem(**locals(), std=True) + + +def nansem( + values: "np.ndarray", + *, + axis: "AxisInt | None" = None, + skipna: "bool" = True, + ddof: int = 1, + mask: "npt.NDArray[np.bool_] | None" = None, + multi_threading: bool = True, +) -> np.ndarray: + return _nanvar_std_sem(**locals(), sem=True) From 24f75e2ef0b659887fe24d8b54523feea3a07193 Mon Sep 17 00:00:00 2001 From: Eoin Condron Date: Thu, 25 Sep 2025 12:34:36 +0100 Subject: [PATCH 2/6] Add a numba_switch decorator to nanops and replace most of the bottleneck switches --- pandas/core/nanops.py | 46 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/pandas/core/nanops.py b/pandas/core/nanops.py index 7bcf4371a0bcd..437d5849d5dcf 100644 --- a/pandas/core/nanops.py +++ b/pandas/core/nanops.py @@ -48,6 +48,10 @@ notna, ) + +from pandas.core.util.numba_ import GLOBAL_USE_NUMBA +from pandas.core import nanops_numba + if TYPE_CHECKING: from collections.abc import Callable @@ -97,6 +101,38 @@ def _f(*args, **kwargs): return cast(F, _f) +class numba_switch: + def __init__(self, name=None, **kwargs) -> None: + self.name = name + self.kwargs = kwargs + + def __call__(self, alt: F) -> F: + nb_name = self.name or alt.__name__ + + try: + nb_func = getattr(nanops_numba, nb_name) + except (AttributeError, NameError): # pragma: no cover + nb_func = None + + @functools.wraps(alt) + def f( + values: np.ndarray, + *, + axis: AxisInt | None = None, + skipna: bool = True, + **kwds, + ): + disallowed = values.dtype == "O" + if GLOBAL_USE_NUMBA and not disallowed: + result = nb_func(values, skipna=skipna, axis=axis, **kwds) + else: + result = alt(values, axis=axis, skipna=skipna, **kwds) + + return result + + return cast(F, f) + + class bottleneck_switch: def __init__(self, name=None, **kwargs) -> None: self.name = name @@ -593,6 +629,7 @@ def nanall( return values.all(axis) # type: ignore[return-value] +@numba_switch() @disallow("M8") @_datetimelike_compat @maybe_operate_rowwise @@ -660,7 +697,7 @@ def _mask_datetimelike_result( return result -@bottleneck_switch() +@numba_switch() @_datetimelike_compat def nanmean( values: np.ndarray, @@ -910,7 +947,7 @@ def _get_counts_nanvar( return count, d -@bottleneck_switch(ddof=1) +@numba_switch(ddof=1) def nanstd( values, *, @@ -957,7 +994,7 @@ def nanstd( @disallow("M8", "m8") -@bottleneck_switch(ddof=1) +@numba_switch(ddof=1) def nanvar( values: np.ndarray, *, @@ -1035,6 +1072,7 @@ def nanvar( return result +@numba_switch() @disallow("M8", "m8") def nansem( values: np.ndarray, @@ -1089,7 +1127,7 @@ def nansem( def _nanminmax(meth, fill_value_typ): - @bottleneck_switch(name=f"nan{meth}") + @numba_switch(name=f"nan{meth}") @_datetimelike_compat def reduction( values: np.ndarray, From 00863c416aed210b1194fa7b8f6622d091bf072e Mon Sep 17 00:00:00 2001 From: Eoin Condron Date: Thu, 25 Sep 2025 12:35:20 +0100 Subject: [PATCH 3/6] Use isclose instead of exact equality when testing sum of large Series --- pandas/tests/reductions/test_reductions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandas/tests/reductions/test_reductions.py b/pandas/tests/reductions/test_reductions.py index db27572b9da26..1093a1e9c1e81 100644 --- a/pandas/tests/reductions/test_reductions.py +++ b/pandas/tests/reductions/test_reductions.py @@ -751,9 +751,8 @@ def test_sum_overflow_float(self, use_bottleneck, dtype): with pd.option_context("use_bottleneck", use_bottleneck): v = np.arange(5000000, dtype=dtype) s = Series(v) - result = s.sum(skipna=False) - assert result == v.sum(dtype=dtype) + assert np.isclose(result, v.sum(dtype=dtype)) result = s.min(skipna=False) assert np.allclose(float(result), 0.0) result = s.max(skipna=False) From c6a7e20252150db089fa4c92696e5e5a9a11b1ac Mon Sep 17 00:00:00 2001 From: Eoin Condron Date: Fri, 26 Sep 2025 09:40:26 +0100 Subject: [PATCH 4/6] Add comprehensive unit tests for private methods in pandas.core.nanops_numba MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Tests all 9 private methods prefixed with underscore - 37 test cases organized in 8 test classes - Comprehensive coverage of Numba-accelerated reduction operations - Tests edge cases: NaN handling, empty arrays, masks, different dtypes - Uses pytest fixtures and parameterization to avoid code duplication - Tests NumbaList usage for parallel processing - Uses pandas._testing for consistent assertion helpers - All tests pass in pandas-dev environment Functions tested: - _get_initial_value: Finding first valid values in arrays - _nb_reduce_single_arr: Single array reduction operations - _nullify_below_mincount: Minimum count validation - _reduce_empty_array: Empty array handling - _chunk_arr_into_arr_list: Array chunking for parallel processing - _nb_reduce_arr_list_in_parallel: Parallel reduction operations - _reduce_chunked_results: Combining chunked results - _cast_to_timelike: DateTime/timedelta type casting - _nanvar_std_sem: Variance/standard deviation/standard error calculations 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pandas/tests/reductions/test_nanops_numba.py | 779 +++++++++++++++++++ 1 file changed, 779 insertions(+) create mode 100644 pandas/tests/reductions/test_nanops_numba.py diff --git a/pandas/tests/reductions/test_nanops_numba.py b/pandas/tests/reductions/test_nanops_numba.py new file mode 100644 index 0000000000000..5468dd0e27077 --- /dev/null +++ b/pandas/tests/reductions/test_nanops_numba.py @@ -0,0 +1,779 @@ +""" +Unit tests for private methods in pandas.core.nanops_numba module. + +This module tests only the private methods (prefixed with underscore). +""" + +import numpy as np +import pytest +from numba.typed import List as NumbaList + +from pandas.core.nanops_numba import ( + MIN_INT, + NumbaReductionOps, + _cast_to_timelike, + _chunk_arr_into_arr_list, + _get_initial_value, + _nb_reduce_arr_list_in_parallel, + _nb_reduce_single_arr, + _nanvar_std_sem, + _nullify_below_mincount, + _reduce_chunked_results, + _reduce_empty_array, + nb_reduce, +) + +import pandas._testing as tm + + +class TestGetInitialValue: + """Test the _get_initial_value private function.""" + + @pytest.fixture + def float_array_with_nan(self): + return np.array([np.nan, 2.0, 3.0, np.nan]) + + @pytest.fixture + def valid_array(self): + return np.array([1.0, 2.0, 3.0]) + + def test_skipna_true_with_leading_nulls(self, float_array_with_nan): + index, value = _get_initial_value(float_array_with_nan, skipna=True) + assert index == 1 + assert value == 2.0 + + def test_skipna_false_with_leading_null(self, float_array_with_nan): + index, value = _get_initial_value(float_array_with_nan, skipna=False) + assert index == 0 + assert np.isnan(value) + + def test_skipna_false_with_valid_first(self, valid_array): + index, value = _get_initial_value(valid_array, skipna=False) + assert index == 0 + assert value == 1.0 + + def test_all_null_values(self): + arr = np.array([np.nan, np.nan, np.nan]) + index, value = _get_initial_value(arr, skipna=True) + assert index == -1 + assert np.isnan(value) + + def test_with_mask_skipna_true(self): + arr = np.array([1.0, 2.0, 3.0]) + mask = np.array([True, False, False]) # First element masked + index, value = _get_initial_value(arr, skipna=True, mask=mask) + assert index == 1 + assert value == 2.0 + + def test_with_mask_skipna_false(self): + arr = np.array([1.0, 2.0, 3.0]) + mask = np.array([True, False, False]) # First element masked + index, value = _get_initial_value(arr, skipna=False, mask=mask) + assert index == 0 + assert np.isnan(value) + + +class TestNbReduceSingleArr: + """Test _nb_reduce_single_arr private function.""" + + @pytest.fixture + def sample_array(self): + return np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + + @pytest.fixture + def array_with_nan(self): + return np.array([1.0, np.nan, 3.0, 4.0, np.nan]) + + def test_sum_no_nulls(self, sample_array): + result, count = _nb_reduce_single_arr( + NumbaReductionOps.sum, sample_array, skipna=True + ) + assert result == 15.0 + assert count == 5 + + def test_sum_with_nans_skipna_true(self, array_with_nan): + result, count = _nb_reduce_single_arr( + NumbaReductionOps.sum, array_with_nan, skipna=True + ) + assert result == 8.0 # 1 + 3 + 4 + assert count == 3 + + def test_sum_with_nans_skipna_false(self, array_with_nan): + result, count = _nb_reduce_single_arr( + NumbaReductionOps.sum, array_with_nan, skipna=False + ) + assert np.isnan(result) + assert count == 1 + + def test_min_operation(self, sample_array): + result, count = _nb_reduce_single_arr( + NumbaReductionOps.min, sample_array, skipna=True + ) + assert result == 1.0 + assert count == 5 + + def test_max_operation(self, sample_array): + result, count = _nb_reduce_single_arr( + NumbaReductionOps.max, sample_array, skipna=True + ) + assert result == 5.0 + assert count == 5 + + def test_with_mask(self): + arr = np.array([1.0, 2.0, 3.0, 4.0]) + mask = np.array([False, True, False, True]) # mask 2.0 and 4.0 + result, count = _nb_reduce_single_arr( + NumbaReductionOps.sum, arr, skipna=True, mask=mask + ) + assert result == 4.0 # 1 + 3 + assert count == 2 + + def test_find_initial_value_false(self, sample_array): + result, count = _nb_reduce_single_arr( + NumbaReductionOps.sum, sample_array, find_initial_value=False + ) + # Should start with 0 and add all values + assert result == 15.0 + assert count == 5 + + +class TestNullifyBelowMincount: + """Test _nullify_below_mincount private function.""" + + def test_float_array(self): + result = np.array([1.0, 2.0, 3.0]) + count = np.array([2, 1, 3]) + min_count = 2 + + modified = _nullify_below_mincount(result, count, min_count) + + expected = np.array([1.0, np.nan, 3.0]) + tm.assert_numpy_array_equal(modified, expected) + + def test_int_array(self): + result = np.array([1, 2, 3], dtype=np.int64) + count = np.array([2, 1, 3]) + min_count = 2 + + modified = _nullify_below_mincount(result, count, min_count) + + expected = np.array([1, MIN_INT, 3]) + tm.assert_numpy_array_equal(modified, expected) + + +class TestReduceEmptyArray: + """Test _reduce_empty_array private function.""" + + def test_1d_empty_array(self): + arr = np.array([], dtype=np.float64) + result, count = _reduce_empty_array("sum", arr, axis=None, min_count=0) + assert result == 0.0 + assert count == 0 + + def test_1d_empty_array_with_min_count(self): + arr = np.array([], dtype=np.float64) + result, count = _reduce_empty_array("sum", arr, axis=None, min_count=1) + assert np.isnan(result) + assert count == 0 + + def test_2d_empty_array_axis_0(self): + arr = np.array([[], []], dtype=np.float64) + result, count = _reduce_empty_array("sum", arr, axis=0, min_count=0) + # Empty array along axis 0 means no elements to reduce + tm.assert_numpy_array_equal(result, np.array([])) + tm.assert_numpy_array_equal(count, np.array([])) + + +class TestChunkArrIntoArrList: + """Test _chunk_arr_into_arr_list private function.""" + + def test_1d_array_single_thread(self): + arr = np.array([1, 2, 3, 4, 5]) + arr_list, _, final_length = _chunk_arr_into_arr_list( + arr, multi_threading=False, axis=None + ) + assert len(arr_list) == 1 + tm.assert_numpy_array_equal(arr_list[0], arr) + assert final_length == 0 + + def test_2d_array_axis_0(self): + arr = np.array([[1, 2], [3, 4], [5, 6]]) + arr_list, _, final_length = _chunk_arr_into_arr_list( + arr, multi_threading=False, axis=0 + ) + # For axis=0, returns transposed array, so 2 columns (arrays) + assert len(arr_list) == 2 + assert final_length == 2 + # arr_list is the transpose of the original array + expected_transposed = arr.T + for i in range(len(arr_list)): + tm.assert_numpy_array_equal(arr_list[i], expected_transposed[i]) + + def test_2d_array_axis_1(self): + arr = np.array([[1, 2], [3, 4], [5, 6]]) + arr_list, _, final_length = _chunk_arr_into_arr_list( + arr, multi_threading=False, axis=1 + ) + # For axis=1, returns original array, so 3 rows + assert len(arr_list) == 3 + assert final_length == 3 + # arr_list is the original array + for i in range(len(arr_list)): + tm.assert_numpy_array_equal(arr_list[i], arr[i]) + + +class TestNbReduceArrListInParallel: + """Test _nb_reduce_arr_list_in_parallel private function.""" + + @pytest.fixture + def array_list(self): + # Create a NumbaList of arrays for parallel processing + arr_list = NumbaList() + arr_list.append(np.array([1.0, 2.0, 3.0])) + arr_list.append(np.array([4.0, 5.0, 6.0])) + arr_list.append(np.array([7.0, 8.0, 9.0])) + return arr_list + + def test_parallel_sum_without_mask(self, array_list): + target = np.zeros(len(array_list), dtype=np.float64) + result, counts = _nb_reduce_arr_list_in_parallel( + NumbaReductionOps.sum, array_list, target, mask_list=None, skipna=True + ) + + expected_results = np.array([6.0, 15.0, 24.0]) # [1+2+3, 4+5+6, 7+8+9] + expected_counts = np.array([3, 3, 3]) + + tm.assert_numpy_array_equal(result, expected_results) + tm.assert_numpy_array_equal(counts, expected_counts) + + def test_parallel_with_mask(self): + # Create array list with some elements that should be masked + arr_list = NumbaList() + arr_list.append(np.array([1.0, 2.0, 3.0])) + arr_list.append(np.array([4.0, 5.0, 6.0])) + + # Create corresponding mask list + mask_list = NumbaList() + mask_list.append(np.array([False, True, False])) # Mask middle element + mask_list.append(np.array([True, False, False])) # Mask first element + + target = np.zeros(len(arr_list), dtype=np.float64) + result, counts = _nb_reduce_arr_list_in_parallel( + NumbaReductionOps.sum, arr_list, target, mask_list=mask_list, skipna=True + ) + + expected_results = np.array([4.0, 11.0]) # [1+3, 5+6] + expected_counts = np.array([2, 2]) + + tm.assert_numpy_array_equal(result, expected_results) + tm.assert_numpy_array_equal(counts, expected_counts) + + +class TestReduceChunkedResults: + """Test _reduce_chunked_results private function.""" + + def test_single_chunk_reduction(self): + # Test when final_length == 0 (reduce both axes) + chunk_results = np.array([1.0, 2.0, 3.0]) + counts = np.array([2, 2, 2]) + final_length = 0 + return_dtype = np.dtype("float64") + + result, count = _reduce_chunked_results( + "sum", chunk_results, counts, final_length, return_dtype, + skipna=True, find_initial_value=True + ) + + # Should reduce the chunk_results array itself + expected_result = np.array([6.0]) # 1 + 2 + 3 + expected_count = np.array([6]) # 2 + 2 + 2 + + tm.assert_numpy_array_equal(result, expected_result) + tm.assert_numpy_array_equal(count, expected_count) + + def test_no_chunking_needed(self): + # Test when chunk_results and counts are already in final form + chunk_results = np.array([10.0, 20.0]) + counts = np.array([3, 4]) + final_length = 2 + return_dtype = np.dtype("float64") + + result, count = _reduce_chunked_results( + "sum", chunk_results, counts, final_length, return_dtype, + skipna=True, find_initial_value=True + ) + + # Should return results as-is (no further reduction needed) + tm.assert_numpy_array_equal(result, chunk_results) + tm.assert_numpy_array_equal(count, counts) + + +class TestCastToTimelike: + """Test _cast_to_timelike private function.""" + + def test_cast_to_datetime(self): + arr = np.array([1000000000000000000, 2000000000000000000], dtype=np.int64) + result = _cast_to_timelike(arr, np.dtype("datetime64[ns]")) + assert result.dtype == np.dtype("datetime64[ns]") + + def test_cast_to_timedelta(self): + arr = np.array([1000000000, 2000000000], dtype=np.int64) + result = _cast_to_timelike(arr, np.dtype("timedelta64[ns]")) + assert result.dtype == np.dtype("timedelta64[ns]") + + def test_cast_with_nan_values(self): + arr = np.array([1000000000000000000, np.nan], dtype=np.float64) + result = _cast_to_timelike(arr, np.dtype("datetime64[ns]")) + assert result.dtype == np.dtype("datetime64[ns]") + assert np.isnat(result[1]) + + +class TestNanvarStdSem: + """Test the _nanvar_std_sem private function.""" + + @pytest.fixture + def sample_data(self): + return np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + + def test_variance_calculation(self, sample_data): + result = _nanvar_std_sem(sample_data) + # Sample variance with ddof=1 + # mean = 3.0, deviations = [-2,-1,0,1,2], sum of squares = 10 + # variance = 10/4 = 2.5 + expected = 2.5 + tm.assert_almost_equal(result, expected) + + def test_standard_deviation(self, sample_data): + result = _nanvar_std_sem(sample_data, std=True) + expected = np.sqrt(2.5) + tm.assert_almost_equal(result, expected) + + def test_standard_error_of_mean(self, sample_data): + result = _nanvar_std_sem(sample_data, sem=True) + expected = np.sqrt(2.5) / np.sqrt(5) # std / sqrt(n) + tm.assert_almost_equal(result, expected) + + def test_with_ddof_0(self, sample_data): + result = _nanvar_std_sem(sample_data, ddof=0) + # Population variance: sum((x - mean)^2) / n = 10/5 = 2.0 + expected = 2.0 + tm.assert_almost_equal(result, expected) + + def test_insufficient_data_for_ddof(self): + arr = np.array([1.0]) # Only one value + result = _nanvar_std_sem(arr, ddof=1) # ddof=1 requires at least 2 values + assert np.isnan(result) + + def test_with_nan_values(self): + arr = np.array([1.0, np.nan, 3.0, 4.0, np.nan]) + result = _nanvar_std_sem(arr, skipna=True) + # Should not raise error and return finite result + assert np.isfinite(result) + + def test_complex_array(self): + arr = np.array([1+2j, 3+4j]) + result = _nanvar_std_sem(arr) + # Should handle complex numbers by processing real and imag parts + assert np.isfinite(result) + + def test_datetime_array(self): + arr = np.array( + ["2020-01-01", "2020-01-02", "2020-01-03"], dtype="datetime64[ns]" + ) + result = _nanvar_std_sem(arr, std=True) + # Should return timedelta type for datetime input with std=True + assert result.dtype.kind == "m" # timedelta type + + def test_2d_array_calculation(self): + arr = np.array([[1.0, 2.0], [3.0, 4.0]]) + result = _nanvar_std_sem(arr, axis=0) + # Variance along axis 0: var([1,3]) and var([2,4]) + expected = np.array([1.0, 1.0]) # Each column has variance 1.0 with ddof=1 + tm.assert_numpy_array_equal(result, expected) + + +class TestNbReduceMultithreading: + """Test nb_reduce with large arrays to trigger multi-threading.""" + + @pytest.fixture + def large_2d_array(self): + """Create a large 2D array to trigger multi-threading.""" + # Create array large enough to trigger multi-threading (> 1e6 elements) + rows, cols = 2000, 600 # 1.2M elements + rng = np.random.default_rng(42) # For reproducible tests + arr = rng.standard_normal((rows, cols)).astype(np.float64) + return arr + + @pytest.fixture + def large_2d_array_with_nans(self): + """Create a large 2D array with some NaN values.""" + rows, cols = 2000, 600 + rng = np.random.default_rng(42) + arr = rng.standard_normal((rows, cols)).astype(np.float64) + # Add some NaN values at random positions + nan_mask = rng.random((rows, cols)) < 0.05 # 5% NaN values + arr[nan_mask] = np.nan + return arr + + def test_nb_reduce_sum_axis_none_multithreaded(self, large_2d_array): + """Test sum reduction with axis=None on large array (multi-threaded).""" + result, count = nb_reduce( + "sum", large_2d_array, axis=None, multi_threading=True + ) + + # Compare with numpy result + expected = np.sum(large_2d_array) + expected_count = large_2d_array.size + + tm.assert_almost_equal(result, expected, rtol=1e-10) + assert count == expected_count + + def test_nb_reduce_sum_axis_0_multithreaded(self, large_2d_array): + """Test sum reduction along axis 0 on large array (multi-threaded).""" + result, count = nb_reduce("sum", large_2d_array, axis=0, multi_threading=True) + + # Compare with numpy result + expected = np.sum(large_2d_array, axis=0) + expected_count = np.full(large_2d_array.shape[1], large_2d_array.shape[0]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_sum_axis_1_multithreaded(self, large_2d_array): + """Test sum reduction along axis 1 on large array (multi-threaded).""" + result, count = nb_reduce("sum", large_2d_array, axis=1, multi_threading=True) + + # Compare with numpy result + expected = np.sum(large_2d_array, axis=1) + expected_count = np.full(large_2d_array.shape[0], large_2d_array.shape[1]) + + np.testing.assert_array_almost_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_min_axis_none_multithreaded(self, large_2d_array): + """Test min reduction with axis=None on large array (multi-threaded).""" + result, count = nb_reduce( + "min", large_2d_array, axis=None, multi_threading=True + ) + + # Compare with numpy result + expected = np.min(large_2d_array) + expected_count = large_2d_array.size + + tm.assert_almost_equal(result, expected) + assert count == expected_count + + def test_nb_reduce_min_axis_0_multithreaded(self, large_2d_array): + """Test min reduction along axis 0 on large array (multi-threaded).""" + result, count = nb_reduce("min", large_2d_array, axis=0, multi_threading=True) + + # Compare with numpy result + expected = np.min(large_2d_array, axis=0) + expected_count = np.full(large_2d_array.shape[1], large_2d_array.shape[0]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_max_axis_1_multithreaded(self, large_2d_array): + """Test max reduction along axis 1 on large array (multi-threaded).""" + result, count = nb_reduce("max", large_2d_array, axis=1, multi_threading=True) + + # Compare with numpy result + expected = np.max(large_2d_array, axis=1) + expected_count = np.full(large_2d_array.shape[0], large_2d_array.shape[1]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_with_nans_skipna_true_multithreaded( + self, large_2d_array_with_nans + ): + """Test sum with NaN values and skipna=True on large array (multi-threaded).""" + result, count = nb_reduce( + "sum", large_2d_array_with_nans, axis=None, + skipna=True, multi_threading=True + ) + + # Compare with numpy nansum + expected = np.nansum(large_2d_array_with_nans) + expected_count = np.sum(~np.isnan(large_2d_array_with_nans)) + + tm.assert_almost_equal(result, expected, rtol=1e-10) + assert count == expected_count + + def test_nb_reduce_with_nans_axis_0_multithreaded(self, large_2d_array_with_nans): + """Test sum with NaN values along axis 0 (multi-threaded).""" + result, count = nb_reduce("sum", large_2d_array_with_nans, axis=0, + skipna=True, multi_threading=True) + + # Compare with numpy nansum + expected = np.nansum(large_2d_array_with_nans, axis=0) + expected_count = np.sum(~np.isnan(large_2d_array_with_nans), axis=0) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_with_nans_axis_1_multithreaded(self, large_2d_array_with_nans): + """Test sum with NaN values along axis 1 (multi-threaded).""" + result, count = nb_reduce("sum", large_2d_array_with_nans, axis=1, + skipna=True, multi_threading=True) + + # Compare with numpy nansum + expected = np.nansum(large_2d_array_with_nans, axis=1) + expected_count = np.sum(~np.isnan(large_2d_array_with_nans), axis=1) + + np.testing.assert_array_almost_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_single_thread_vs_multithread_consistency(self, large_2d_array): + """Test that single-threaded and multi-threaded results are identical.""" + # Single-threaded result + result_st, count_st = nb_reduce("sum", large_2d_array, axis=0, + multi_threading=False) + + # Multi-threaded result + result_mt, count_mt = nb_reduce("sum", large_2d_array, axis=0, + multi_threading=True) + + # Results should be identical + tm.assert_numpy_array_equal(result_st, result_mt) + tm.assert_numpy_array_equal(count_st, count_mt) + + @pytest.mark.parametrize("op", ["sum", "min", "max"]) + @pytest.mark.parametrize("axis", [None, 0, 1]) + def test_nb_reduce_operations_multithreaded(self, large_2d_array, op, axis): + """Test various operations with different axes on large array.""" + result, count = nb_reduce(op, large_2d_array, axis=axis, multi_threading=True) + + # Verify result shape is correct + if axis is None: + assert np.isscalar(result) + assert np.isscalar(count) + elif axis == 0: + assert result.shape == (large_2d_array.shape[1],) + assert count.shape == (large_2d_array.shape[1],) + elif axis == 1: + assert result.shape == (large_2d_array.shape[0],) + assert count.shape == (large_2d_array.shape[0],) + + # Verify count values are reasonable + if axis is None: + assert count == large_2d_array.size + elif axis == 0: + assert np.all(count == large_2d_array.shape[0]) + elif axis == 1: + assert np.all(count == large_2d_array.shape[1]) + + def test_nb_reduce_min_count_multithreaded(self, large_2d_array_with_nans): + """Test min_count parameter with large array (multi-threaded).""" + min_count = 100 # Require at least 100 non-NaN values per column + + result, count = nb_reduce("sum", large_2d_array_with_nans, axis=0, + skipna=True, min_count=min_count, + multi_threading=True) + + # Check that columns with insufficient data are NaN + valid_columns = count >= min_count + assert np.all(np.isfinite(result[valid_columns])) + + # Some columns should be nullified due to min_count + if not np.all(valid_columns): + assert np.any(np.isnan(result[~valid_columns])) + + def test_nb_reduce_mean_axis_none_multithreaded(self, large_2d_array): + """Test mean reduction with axis=None on large array (multi-threaded).""" + result, count = nb_reduce("mean", large_2d_array, axis=None, + multi_threading=True) + + # Compare with numpy result + expected = np.mean(large_2d_array) + expected_count = large_2d_array.size + + tm.assert_almost_equal(result, expected, rtol=1e-10) + assert count == expected_count + + def test_nb_reduce_mean_axis_0_multithreaded(self, large_2d_array): + """Test mean reduction along axis 0 on large array (multi-threaded).""" + result, count = nb_reduce("mean", large_2d_array, axis=0, + multi_threading=True) + + # Compare with numpy result + expected = np.mean(large_2d_array, axis=0) + expected_count = np.full(large_2d_array.shape[1], large_2d_array.shape[0]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_mean_axis_1_multithreaded(self, large_2d_array): + """Test mean reduction along axis 1 on large array (multi-threaded).""" + result, count = nb_reduce("mean", large_2d_array, axis=1, + multi_threading=True) + + # Compare with numpy result + expected = np.mean(large_2d_array, axis=1) + expected_count = np.full(large_2d_array.shape[0], large_2d_array.shape[1]) + + np.testing.assert_array_almost_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_sum_square_axis_none_multithreaded(self, large_2d_array): + """Test sum_square reduction with axis=None on large array.""" + result, count = nb_reduce("sum_square", large_2d_array, axis=None, + multi_threading=True) + + # Compare with numpy result (sum of squares) + expected = np.sum(large_2d_array ** 2) + expected_count = large_2d_array.size + + tm.assert_almost_equal(result, expected, rtol=1e-10) + assert count == expected_count + + def test_nb_reduce_sum_square_axis_0_multithreaded(self, large_2d_array): + """Test sum_square reduction along axis 0 on large array.""" + result, count = nb_reduce("sum_square", large_2d_array, axis=0, + multi_threading=True) + + # Compare with numpy result (sum of squares along axis 0) + expected = np.sum(large_2d_array ** 2, axis=0) + expected_count = np.full(large_2d_array.shape[1], large_2d_array.shape[0]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_sum_square_axis_1_multithreaded(self, large_2d_array): + """Test sum_square reduction along axis 1 on large array.""" + result, count = nb_reduce("sum_square", large_2d_array, axis=1, + multi_threading=True) + + # Compare with numpy result (sum of squares along axis 1) + expected = np.sum(large_2d_array ** 2, axis=1) + expected_count = np.full(large_2d_array.shape[0], large_2d_array.shape[1]) + + np.testing.assert_array_almost_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + +class TestNbReduceTimedelta64: + """Test nb_reduce with timedelta64 dtype arrays.""" + + @pytest.fixture + def timedelta64_2d_array(self): + """Create a 2D array of timedelta64 values.""" + # Create timedelta values in different units + rows, cols = 1000, 300 # Large enough for multi-threading + rng = np.random.default_rng(42) + + # Generate random days between 0 and 365 + days = rng.integers(0, 365, size=(rows, cols)) + td_array = np.array(days, dtype="timedelta64[D]") + + return td_array + + @pytest.fixture + def timedelta64_2d_array_with_nat(self): + """Create a 2D array of timedelta64 values with some NaT values.""" + rows, cols = 1000, 300 + rng = np.random.default_rng(42) + + # Generate random days + days = rng.integers(0, 365, size=(rows, cols)) + td_array = np.array(days, dtype="timedelta64[D]") + + # Add some NaT (Not a Time) values + nat_mask = rng.random((rows, cols)) < 0.05 # 5% NaT values + td_array[nat_mask] = np.timedelta64("NaT") + + return td_array + + def test_nb_reduce_timedelta64_sum_axis_none(self, timedelta64_2d_array): + """Test sum reduction on timedelta64 array with axis=None.""" + result, count = nb_reduce("sum", timedelta64_2d_array, axis=None, + multi_threading=True) + + # Compare with numpy result + expected = np.sum(timedelta64_2d_array) + expected_count = timedelta64_2d_array.size + + assert result == expected + assert count == expected_count + + def test_nb_reduce_timedelta64_sum_axis_0(self, timedelta64_2d_array): + """Test sum reduction on timedelta64 array along axis 0.""" + result, count = nb_reduce("sum", timedelta64_2d_array, axis=0, + multi_threading=True) + + # Compare with numpy result + expected = np.sum(timedelta64_2d_array, axis=0) + expected_count = np.full(timedelta64_2d_array.shape[1], + timedelta64_2d_array.shape[0]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_timedelta64_sum_axis_1(self, timedelta64_2d_array): + """Test sum reduction on timedelta64 array along axis 1.""" + result, count = nb_reduce("sum", timedelta64_2d_array, axis=1, + multi_threading=True) + + # Compare with numpy result + expected = np.sum(timedelta64_2d_array, axis=1) + expected_count = np.full(timedelta64_2d_array.shape[0], + timedelta64_2d_array.shape[1]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) + + def test_nb_reduce_timedelta64_min_max(self, timedelta64_2d_array): + """Test min/max reduction on timedelta64 array.""" + # Test min + result_min, count_min = nb_reduce("min", timedelta64_2d_array, axis=None, + multi_threading=True) + expected_min = np.min(timedelta64_2d_array) + assert result_min == expected_min + assert count_min == timedelta64_2d_array.size + + # Test max + result_max, count_max = nb_reduce("max", timedelta64_2d_array, axis=None, + multi_threading=True) + expected_max = np.max(timedelta64_2d_array) + assert result_max == expected_max + assert count_max == timedelta64_2d_array.size + + def test_nb_reduce_timedelta64_with_nat_skipna_true( + self, timedelta64_2d_array_with_nat + ): + """Test reduction on timedelta64 array with NaT values, skipna=True.""" + result, count = nb_reduce("sum", timedelta64_2d_array_with_nat, axis=None, + skipna=True, multi_threading=True) + + # Compare with numpy result + # For timedelta64 with NaT, we need to use nansum equivalent + valid_mask = ~np.isnat(timedelta64_2d_array_with_nat) + expected = np.sum(timedelta64_2d_array_with_nat[valid_mask]) + expected_count = np.sum(valid_mask) + + assert result == expected + assert count == expected_count + + def test_nb_reduce_timedelta64_with_nat_skipna_false( + self, timedelta64_2d_array_with_nat + ): + """Test reduction on timedelta64 array with NaT values, skipna=False.""" + result, count = nb_reduce("sum", timedelta64_2d_array_with_nat, axis=None, + skipna=False, multi_threading=True) + + # When skipna=False and there are NaT values, result should be NaT + assert np.isnat(result) + + def test_nb_reduce_timedelta64_mean_axis_0(self, timedelta64_2d_array): + """Test mean reduction on timedelta64 array along axis 0.""" + result, count = nb_reduce("mean", timedelta64_2d_array, axis=0, + multi_threading=True) + + # Compare with numpy result + expected = np.mean(timedelta64_2d_array, axis=0) + expected_count = np.full(timedelta64_2d_array.shape[1], + timedelta64_2d_array.shape[0]) + + tm.assert_numpy_array_equal(result, expected) + tm.assert_numpy_array_equal(count, expected_count) From c095e1a4e0bc9c910fba0add2c5dfd63361001cb Mon Sep 17 00:00:00 2001 From: Eoin Condron Date: Fri, 26 Sep 2025 16:42:39 +0100 Subject: [PATCH 5/6] (linting for test_nanops_numba) --- pandas/tests/reductions/test_nanops_numba.py | 164 ++++++++++++------- 1 file changed, 103 insertions(+), 61 deletions(-) diff --git a/pandas/tests/reductions/test_nanops_numba.py b/pandas/tests/reductions/test_nanops_numba.py index 5468dd0e27077..450b63d0006d5 100644 --- a/pandas/tests/reductions/test_nanops_numba.py +++ b/pandas/tests/reductions/test_nanops_numba.py @@ -4,27 +4,26 @@ This module tests only the private methods (prefixed with underscore). """ +from numba.typed import List as NumbaList import numpy as np import pytest -from numba.typed import List as NumbaList +import pandas._testing as tm from pandas.core.nanops_numba import ( MIN_INT, NumbaReductionOps, _cast_to_timelike, _chunk_arr_into_arr_list, _get_initial_value, + _nanvar_std_sem, _nb_reduce_arr_list_in_parallel, _nb_reduce_single_arr, - _nanvar_std_sem, _nullify_below_mincount, _reduce_chunked_results, _reduce_empty_array, nb_reduce, ) -import pandas._testing as tm - class TestGetInitialValue: """Test the _get_initial_value private function.""" @@ -255,7 +254,7 @@ def test_parallel_with_mask(self): # Create corresponding mask list mask_list = NumbaList() mask_list.append(np.array([False, True, False])) # Mask middle element - mask_list.append(np.array([True, False, False])) # Mask first element + mask_list.append(np.array([True, False, False])) # Mask first element target = np.zeros(len(arr_list), dtype=np.float64) result, counts = _nb_reduce_arr_list_in_parallel( @@ -280,13 +279,18 @@ def test_single_chunk_reduction(self): return_dtype = np.dtype("float64") result, count = _reduce_chunked_results( - "sum", chunk_results, counts, final_length, return_dtype, - skipna=True, find_initial_value=True + "sum", + chunk_results, + counts, + final_length, + return_dtype, + skipna=True, + find_initial_value=True, ) # Should reduce the chunk_results array itself expected_result = np.array([6.0]) # 1 + 2 + 3 - expected_count = np.array([6]) # 2 + 2 + 2 + expected_count = np.array([6]) # 2 + 2 + 2 tm.assert_numpy_array_equal(result, expected_result) tm.assert_numpy_array_equal(count, expected_count) @@ -299,8 +303,13 @@ def test_no_chunking_needed(self): return_dtype = np.dtype("float64") result, count = _reduce_chunked_results( - "sum", chunk_results, counts, final_length, return_dtype, - skipna=True, find_initial_value=True + "sum", + chunk_results, + counts, + final_length, + return_dtype, + skipna=True, + find_initial_value=True, ) # Should return results as-is (no further reduction needed) @@ -371,7 +380,7 @@ def test_with_nan_values(self): assert np.isfinite(result) def test_complex_array(self): - arr = np.array([1+2j, 3+4j]) + arr = np.array([1 + 2j, 3 + 4j]) result = _nanvar_std_sem(arr) # Should handle complex numbers by processing real and imag parts assert np.isfinite(result) @@ -490,8 +499,11 @@ def test_nb_reduce_with_nans_skipna_true_multithreaded( ): """Test sum with NaN values and skipna=True on large array (multi-threaded).""" result, count = nb_reduce( - "sum", large_2d_array_with_nans, axis=None, - skipna=True, multi_threading=True + "sum", + large_2d_array_with_nans, + axis=None, + skipna=True, + multi_threading=True, ) # Compare with numpy nansum @@ -503,8 +515,9 @@ def test_nb_reduce_with_nans_skipna_true_multithreaded( def test_nb_reduce_with_nans_axis_0_multithreaded(self, large_2d_array_with_nans): """Test sum with NaN values along axis 0 (multi-threaded).""" - result, count = nb_reduce("sum", large_2d_array_with_nans, axis=0, - skipna=True, multi_threading=True) + result, count = nb_reduce( + "sum", large_2d_array_with_nans, axis=0, skipna=True, multi_threading=True + ) # Compare with numpy nansum expected = np.nansum(large_2d_array_with_nans, axis=0) @@ -515,8 +528,9 @@ def test_nb_reduce_with_nans_axis_0_multithreaded(self, large_2d_array_with_nans def test_nb_reduce_with_nans_axis_1_multithreaded(self, large_2d_array_with_nans): """Test sum with NaN values along axis 1 (multi-threaded).""" - result, count = nb_reduce("sum", large_2d_array_with_nans, axis=1, - skipna=True, multi_threading=True) + result, count = nb_reduce( + "sum", large_2d_array_with_nans, axis=1, skipna=True, multi_threading=True + ) # Compare with numpy nansum expected = np.nansum(large_2d_array_with_nans, axis=1) @@ -528,12 +542,14 @@ def test_nb_reduce_with_nans_axis_1_multithreaded(self, large_2d_array_with_nans def test_nb_reduce_single_thread_vs_multithread_consistency(self, large_2d_array): """Test that single-threaded and multi-threaded results are identical.""" # Single-threaded result - result_st, count_st = nb_reduce("sum", large_2d_array, axis=0, - multi_threading=False) + result_st, count_st = nb_reduce( + "sum", large_2d_array, axis=0, multi_threading=False + ) # Multi-threaded result - result_mt, count_mt = nb_reduce("sum", large_2d_array, axis=0, - multi_threading=True) + result_mt, count_mt = nb_reduce( + "sum", large_2d_array, axis=0, multi_threading=True + ) # Results should be identical tm.assert_numpy_array_equal(result_st, result_mt) @@ -568,9 +584,14 @@ def test_nb_reduce_min_count_multithreaded(self, large_2d_array_with_nans): """Test min_count parameter with large array (multi-threaded).""" min_count = 100 # Require at least 100 non-NaN values per column - result, count = nb_reduce("sum", large_2d_array_with_nans, axis=0, - skipna=True, min_count=min_count, - multi_threading=True) + result, count = nb_reduce( + "sum", + large_2d_array_with_nans, + axis=0, + skipna=True, + min_count=min_count, + multi_threading=True, + ) # Check that columns with insufficient data are NaN valid_columns = count >= min_count @@ -582,8 +603,9 @@ def test_nb_reduce_min_count_multithreaded(self, large_2d_array_with_nans): def test_nb_reduce_mean_axis_none_multithreaded(self, large_2d_array): """Test mean reduction with axis=None on large array (multi-threaded).""" - result, count = nb_reduce("mean", large_2d_array, axis=None, - multi_threading=True) + result, count = nb_reduce( + "mean", large_2d_array, axis=None, multi_threading=True + ) # Compare with numpy result expected = np.mean(large_2d_array) @@ -594,8 +616,7 @@ def test_nb_reduce_mean_axis_none_multithreaded(self, large_2d_array): def test_nb_reduce_mean_axis_0_multithreaded(self, large_2d_array): """Test mean reduction along axis 0 on large array (multi-threaded).""" - result, count = nb_reduce("mean", large_2d_array, axis=0, - multi_threading=True) + result, count = nb_reduce("mean", large_2d_array, axis=0, multi_threading=True) # Compare with numpy result expected = np.mean(large_2d_array, axis=0) @@ -606,8 +627,7 @@ def test_nb_reduce_mean_axis_0_multithreaded(self, large_2d_array): def test_nb_reduce_mean_axis_1_multithreaded(self, large_2d_array): """Test mean reduction along axis 1 on large array (multi-threaded).""" - result, count = nb_reduce("mean", large_2d_array, axis=1, - multi_threading=True) + result, count = nb_reduce("mean", large_2d_array, axis=1, multi_threading=True) # Compare with numpy result expected = np.mean(large_2d_array, axis=1) @@ -618,11 +638,12 @@ def test_nb_reduce_mean_axis_1_multithreaded(self, large_2d_array): def test_nb_reduce_sum_square_axis_none_multithreaded(self, large_2d_array): """Test sum_square reduction with axis=None on large array.""" - result, count = nb_reduce("sum_square", large_2d_array, axis=None, - multi_threading=True) + result, count = nb_reduce( + "sum_square", large_2d_array, axis=None, multi_threading=True + ) # Compare with numpy result (sum of squares) - expected = np.sum(large_2d_array ** 2) + expected = np.sum(large_2d_array**2) expected_count = large_2d_array.size tm.assert_almost_equal(result, expected, rtol=1e-10) @@ -630,11 +651,12 @@ def test_nb_reduce_sum_square_axis_none_multithreaded(self, large_2d_array): def test_nb_reduce_sum_square_axis_0_multithreaded(self, large_2d_array): """Test sum_square reduction along axis 0 on large array.""" - result, count = nb_reduce("sum_square", large_2d_array, axis=0, - multi_threading=True) + result, count = nb_reduce( + "sum_square", large_2d_array, axis=0, multi_threading=True + ) # Compare with numpy result (sum of squares along axis 0) - expected = np.sum(large_2d_array ** 2, axis=0) + expected = np.sum(large_2d_array**2, axis=0) expected_count = np.full(large_2d_array.shape[1], large_2d_array.shape[0]) tm.assert_numpy_array_equal(result, expected) @@ -642,11 +664,12 @@ def test_nb_reduce_sum_square_axis_0_multithreaded(self, large_2d_array): def test_nb_reduce_sum_square_axis_1_multithreaded(self, large_2d_array): """Test sum_square reduction along axis 1 on large array.""" - result, count = nb_reduce("sum_square", large_2d_array, axis=1, - multi_threading=True) + result, count = nb_reduce( + "sum_square", large_2d_array, axis=1, multi_threading=True + ) # Compare with numpy result (sum of squares along axis 1) - expected = np.sum(large_2d_array ** 2, axis=1) + expected = np.sum(large_2d_array**2, axis=1) expected_count = np.full(large_2d_array.shape[0], large_2d_array.shape[1]) np.testing.assert_array_almost_equal(result, expected) @@ -687,8 +710,9 @@ def timedelta64_2d_array_with_nat(self): def test_nb_reduce_timedelta64_sum_axis_none(self, timedelta64_2d_array): """Test sum reduction on timedelta64 array with axis=None.""" - result, count = nb_reduce("sum", timedelta64_2d_array, axis=None, - multi_threading=True) + result, count = nb_reduce( + "sum", timedelta64_2d_array, axis=None, multi_threading=True + ) # Compare with numpy result expected = np.sum(timedelta64_2d_array) @@ -699,26 +723,30 @@ def test_nb_reduce_timedelta64_sum_axis_none(self, timedelta64_2d_array): def test_nb_reduce_timedelta64_sum_axis_0(self, timedelta64_2d_array): """Test sum reduction on timedelta64 array along axis 0.""" - result, count = nb_reduce("sum", timedelta64_2d_array, axis=0, - multi_threading=True) + result, count = nb_reduce( + "sum", timedelta64_2d_array, axis=0, multi_threading=True + ) # Compare with numpy result expected = np.sum(timedelta64_2d_array, axis=0) - expected_count = np.full(timedelta64_2d_array.shape[1], - timedelta64_2d_array.shape[0]) + expected_count = np.full( + timedelta64_2d_array.shape[1], timedelta64_2d_array.shape[0] + ) tm.assert_numpy_array_equal(result, expected) tm.assert_numpy_array_equal(count, expected_count) def test_nb_reduce_timedelta64_sum_axis_1(self, timedelta64_2d_array): """Test sum reduction on timedelta64 array along axis 1.""" - result, count = nb_reduce("sum", timedelta64_2d_array, axis=1, - multi_threading=True) + result, count = nb_reduce( + "sum", timedelta64_2d_array, axis=1, multi_threading=True + ) # Compare with numpy result expected = np.sum(timedelta64_2d_array, axis=1) - expected_count = np.full(timedelta64_2d_array.shape[0], - timedelta64_2d_array.shape[1]) + expected_count = np.full( + timedelta64_2d_array.shape[0], timedelta64_2d_array.shape[1] + ) tm.assert_numpy_array_equal(result, expected) tm.assert_numpy_array_equal(count, expected_count) @@ -726,15 +754,17 @@ def test_nb_reduce_timedelta64_sum_axis_1(self, timedelta64_2d_array): def test_nb_reduce_timedelta64_min_max(self, timedelta64_2d_array): """Test min/max reduction on timedelta64 array.""" # Test min - result_min, count_min = nb_reduce("min", timedelta64_2d_array, axis=None, - multi_threading=True) + result_min, count_min = nb_reduce( + "min", timedelta64_2d_array, axis=None, multi_threading=True + ) expected_min = np.min(timedelta64_2d_array) assert result_min == expected_min assert count_min == timedelta64_2d_array.size # Test max - result_max, count_max = nb_reduce("max", timedelta64_2d_array, axis=None, - multi_threading=True) + result_max, count_max = nb_reduce( + "max", timedelta64_2d_array, axis=None, multi_threading=True + ) expected_max = np.max(timedelta64_2d_array) assert result_max == expected_max assert count_max == timedelta64_2d_array.size @@ -743,8 +773,13 @@ def test_nb_reduce_timedelta64_with_nat_skipna_true( self, timedelta64_2d_array_with_nat ): """Test reduction on timedelta64 array with NaT values, skipna=True.""" - result, count = nb_reduce("sum", timedelta64_2d_array_with_nat, axis=None, - skipna=True, multi_threading=True) + result, count = nb_reduce( + "sum", + timedelta64_2d_array_with_nat, + axis=None, + skipna=True, + multi_threading=True, + ) # Compare with numpy result # For timedelta64 with NaT, we need to use nansum equivalent @@ -759,21 +794,28 @@ def test_nb_reduce_timedelta64_with_nat_skipna_false( self, timedelta64_2d_array_with_nat ): """Test reduction on timedelta64 array with NaT values, skipna=False.""" - result, count = nb_reduce("sum", timedelta64_2d_array_with_nat, axis=None, - skipna=False, multi_threading=True) + result, count = nb_reduce( + "sum", + timedelta64_2d_array_with_nat, + axis=None, + skipna=False, + multi_threading=True, + ) # When skipna=False and there are NaT values, result should be NaT assert np.isnat(result) def test_nb_reduce_timedelta64_mean_axis_0(self, timedelta64_2d_array): """Test mean reduction on timedelta64 array along axis 0.""" - result, count = nb_reduce("mean", timedelta64_2d_array, axis=0, - multi_threading=True) + result, count = nb_reduce( + "mean", timedelta64_2d_array, axis=0, multi_threading=True + ) # Compare with numpy result expected = np.mean(timedelta64_2d_array, axis=0) - expected_count = np.full(timedelta64_2d_array.shape[1], - timedelta64_2d_array.shape[0]) + expected_count = np.full( + timedelta64_2d_array.shape[1], timedelta64_2d_array.shape[0] + ) tm.assert_numpy_array_equal(result, expected) tm.assert_numpy_array_equal(count, expected_count) From a12aa7c59831a1ab466925d755d049a8f2ac0090 Mon Sep 17 00:00:00 2001 From: Eoin Condron Date: Fri, 26 Sep 2025 16:44:37 +0100 Subject: [PATCH 6/6] (minor fixes to examples in docstrings) --- pandas/core/nanops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/nanops.py b/pandas/core/nanops.py index 437d5849d5dcf..73ae69ca680f4 100644 --- a/pandas/core/nanops.py +++ b/pandas/core/nanops.py @@ -981,7 +981,7 @@ def nanstd( >>> from pandas.core import nanops >>> s = pd.Series([1, np.nan, 2, 3]) >>> nanops.nanstd(s.values) - 1.0 + np.float64(1.0) """ if values.dtype == "M8[ns]": values = values.view("m8[ns]") @@ -1028,7 +1028,7 @@ def nanvar( >>> from pandas.core import nanops >>> s = pd.Series([1, np.nan, 2, 3]) >>> nanops.nanvar(s.values) - 1.0 + np.float64(1.0) """ dtype = values.dtype mask = _maybe_get_mask(values, skipna, mask)