Skip to content
12 changes: 12 additions & 0 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,18 @@ def time_dtype_as_field(self, dtype, method, application):
self.as_field_method()


class GroupByWide:

params = [100, 1_000, 10_000]
param_names = ["nrows"]

def setup(self, nrows):
self.wide_grp = DataFrame(np.ones((nrows, 10_000))).groupby([1])

def time_wide(self, nrows):
self.wide_grp.sum()


class RankWithTies:
# GH 21237
param_names = ["dtype", "tie_method"]
Expand Down
123 changes: 8 additions & 115 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,122 +982,15 @@ def _iterate_slices(self) -> Iterable[Series]:
def _cython_agg_general(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
):
new_items, new_blocks = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(new_items, new_blocks)

def _cython_agg_blocks(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
):
# TODO: the actual managing of mgr_locs is a PITA
# here, it should happen via BlockManager.combine

data = self._get_data_to_aggregate()

if numeric_only:
data = data.get_numeric_data(copy=False)

new_blocks = []
new_items = []
deleted_items = []
no_result = object()
for block in data.blocks:
# Avoid inheriting result from earlier in the loop
result = no_result
locs = block.mgr_locs.as_array
try:
result, _ = self.grouper.aggregate(
block.values, how, axis=1, min_count=min_count
)
except NotImplementedError:
# generally if we have numeric_only=False
# and non-applicable functions
# try to python agg

if alt is None:
# we cannot perform the operation
# in an alternate way, exclude the block
assert how == "ohlc"
deleted_items.append(locs)
continue

# call our grouper again with only this block
obj = self.obj[data.items[locs]]
if obj.shape[1] == 1:
# Avoid call to self.values that can occur in DataFrame
# reductions; see GH#28949
obj = obj.iloc[:, 0]

s = get_groupby(obj, self.grouper)
try:
result = s.aggregate(lambda x: alt(x, axis=self.axis))
except TypeError:
# we may have an exception in trying to aggregate
# continue and exclude the block
deleted_items.append(locs)
continue
else:
result = cast(DataFrame, result)
# unwrap DataFrame to get array
assert len(result._data.blocks) == 1
result = result._data.blocks[0].values
if isinstance(result, np.ndarray) and result.ndim == 1:
result = result.reshape(1, -1)

finally:
assert not isinstance(result, DataFrame)

if result is not no_result:
# see if we can cast the block back to the original dtype
result = maybe_downcast_numeric(result, block.dtype)

if block.is_extension and isinstance(result, np.ndarray):
# e.g. block.values was an IntegerArray
# (1, N) case can occur if block.values was Categorical
# and result is ndarray[object]
assert result.ndim == 1 or result.shape[0] == 1
try:
# Cast back if feasible
result = type(block.values)._from_sequence(
result.ravel(), dtype=block.values.dtype
)
except ValueError:
# reshape to be valid for non-Extension Block
result = result.reshape(1, -1)

newb = block.make_block(result)

new_items.append(locs)
new_blocks.append(newb)

if len(new_blocks) == 0:
raise DataError("No numeric types to aggregate")

# reset the locs in the blocks to correspond to our
# current ordering
indexer = np.concatenate(new_items)
new_items = data.items.take(np.sort(indexer))

if len(deleted_items):

# we need to adjust the indexer to account for the
# items we have removed
# really should be done in internals :<

deleted = np.concatenate(deleted_items)
ai = np.arange(len(data))
mask = np.zeros(len(data))
mask[deleted] = 1
indexer = (ai - mask.cumsum())[indexer]

offset = 0
for b in new_blocks:
loc = len(b.mgr_locs)
b.mgr_locs = indexer[offset : (offset + loc)]
offset += loc
func = partial(self.grouper.aggregate, how=how, axis=1, min_count=min_count)
results = self._selected_obj._data.apply(func)
df = DataFrame(results)
if self.as_index:
df.index = self.grouper.result_index
else:
df.index = np.arange(result[0].values.shape[1])

return new_items, new_blocks
return df

def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame:
if self.grouper.nkeys != 1:
Expand Down
21 changes: 16 additions & 5 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,31 @@ def apply(
and hasattr(kwargs[k], "values")
}

for b in self.blocks:
for blk in self.blocks:
if filter is not None:
if not b.mgr_locs.isin(filter_locs).any():
result_blocks.append(b)
if not blk.mgr_locs.isin(filter_locs).any():
result_blocks.append(blk)
continue

if aligned_args:
b_items = self.items[b.mgr_locs.indexer]
b_items = self.items[blk.mgr_locs.indexer]

for k, obj in aligned_args.items():
axis = obj._info_axis_number
kwargs[k] = obj.reindex(b_items, axis=axis, copy=align_copy)

applied = getattr(b, f)(**kwargs)
if isinstance(f, str):
applied = getattr(blk, f)(**kwargs)
else: # partial; specific to groupby
# TODO: func should only return one value; need to remove
# ohlc from groupby semantics to accomplish generically
result, _ = f(blk.values) # better way?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better way?

blk.apply

if result.ndim != 2: # hmm this is hacky
result = result.reshape(-1, 1)

applied = type(blk)(result, placement=blk.mgr_locs, ndim=2)
axes = [self.axes[0], np.arange(result.shape[1])]

result_blocks = _extend_blocks(applied, result_blocks)

if len(result_blocks) == 0:
Expand Down
3 changes: 3 additions & 0 deletions pandas/tests/groupby/test_whitelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def test_regression_whitelist_methods(raw_frame, op, level, axis, skipna, sort):
# GH 17537
# explicitly test the whitelist methods

if op == "median":
pytest.skip("Currently segfaulting...")

if axis == 0:
frame = raw_frame
else:
Expand Down