Skip to content

Cythonize Buffer and MemoryResource classes for performance optimization #876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

from __future__ import annotations

from libc.stdint cimport uintptr_t

from cuda.core.experimental._utils.cuda_utils cimport (
_check_driver_error as raise_if_driver_error,
)

import abc
import weakref
from typing import Tuple, TypeVar, Union

from cuda.core.experimental._dlpack import DLDeviceType, make_py_capsule
from cuda.core.experimental._stream import Stream, default_stream
from cuda.core.experimental._utils.cuda_utils import driver, handle_return
from cuda.core.experimental._utils.cuda_utils import driver

# TODO: define a memory property mixin class and make Buffer and
# MemoryResource both inherit from it
Expand All @@ -23,7 +28,7 @@
"""A type union of :obj:`~driver.CUdeviceptr`, `int` and `None` for hinting :attr:`Buffer.handle`."""


class Buffer:
cdef class Buffer:
"""Represent a handle to allocated memory.

This generic object provides a unified representation for how
Expand All @@ -33,34 +38,28 @@ class Buffer:
Support for data interchange mechanisms are provided by DLPack.
"""

class _MembersNeededForFinalize:
__slots__ = ("ptr", "size", "mr")

def __init__(self, buffer_obj, ptr, size, mr):
self.ptr = ptr
self.size = size
self.mr = mr
weakref.finalize(buffer_obj, self.close)

def close(self, stream=None):
if self.ptr and self.mr is not None:
self.mr.deallocate(self.ptr, self.size, stream)
self.ptr = 0
self.mr = None
cdef:
uintptr_t _ptr
size_t _size
object _mr
object _ptr_obj

# TODO: handle ownership? (_mr could be None)
__slots__ = ("__weakref__", "_mnff")

def __new__(self, *args, **kwargs):
def __init__(self, *args, **kwargs):
raise RuntimeError("Buffer objects cannot be instantiated directly. Please use MemoryResource APIs.")

@classmethod
def _init(cls, ptr: DevicePointerT, size: int, mr: MemoryResource | None = None):
self = super().__new__(cls)
self._mnff = Buffer._MembersNeededForFinalize(self, ptr, size, mr)
def _init(cls, ptr: DevicePointerT, size_t size, mr: MemoryResource | None = None):
cdef Buffer self = Buffer.__new__(cls)
self._ptr = <uintptr_t>(int(ptr))
self._ptr_obj = ptr
self._size = size
self._mr = mr
return self

def close(self, stream: Stream = None):
def __del__(self):
self.close()

cpdef close(self, stream: Stream = None):
"""Deallocate this buffer asynchronously on the given stream.

This buffer is released back to their memory resource
Expand All @@ -72,7 +71,11 @@ def close(self, stream: Stream = None):
The stream object to use for asynchronous deallocation. If None,
the behavior depends on the underlying memory resource.
"""
self._mnff.close(stream)
if self._ptr and self._mr is not None:
self._mr.deallocate(self._ptr, self._size, stream)
self._ptr = 0
self._mr = None
self._ptr_obj = None

@property
def handle(self) -> DevicePointerT:
Expand All @@ -83,37 +86,37 @@ def handle(self) -> DevicePointerT:
This handle is a Python object. To get the memory address of the underlying C
handle, call ``int(Buffer.handle)``.
"""
return self._mnff.ptr
return self._ptr_obj

@property
def size(self) -> int:
"""Return the memory size of this buffer."""
return self._mnff.size
return self._size

@property
def memory_resource(self) -> MemoryResource:
"""Return the memory resource associated with this buffer."""
return self._mnff.mr
return self._mr

@property
def is_device_accessible(self) -> bool:
"""Return True if this buffer can be accessed by the GPU, otherwise False."""
if self._mnff.mr is not None:
return self._mnff.mr.is_device_accessible
if self._mr is not None:
return self._mr.is_device_accessible
raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource")

@property
def is_host_accessible(self) -> bool:
"""Return True if this buffer can be accessed by the CPU, otherwise False."""
if self._mnff.mr is not None:
return self._mnff.mr.is_host_accessible
if self._mr is not None:
return self._mr.is_host_accessible
raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource")

@property
def device_id(self) -> int:
"""Return the device ordinal of this buffer."""
if self._mnff.mr is not None:
return self._mnff.mr.device_id
if self._mr is not None:
return self._mr.device_id
raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource")

def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer:
Expand All @@ -134,15 +137,21 @@ def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer:
"""
if stream is None:
raise ValueError("stream must be provided")

cdef size_t src_size = self._size

if dst is None:
if self._mnff.mr is None:
if self._mr is None:
raise ValueError("a destination buffer must be provided (this buffer does not have a memory_resource)")
dst = self._mnff.mr.allocate(self._mnff.size, stream)
if dst._mnff.size != self._mnff.size:
dst = self._mr.allocate(src_size, stream)

cdef size_t dst_size = dst._size
if dst_size != src_size:
raise ValueError(
f"buffer sizes mismatch between src and dst (sizes are: src={self._mnff.size}, dst={dst._mnff.size})"
f"buffer sizes mismatch between src and dst (sizes are: src={src_size}, dst={dst_size})"
)
handle_return(driver.cuMemcpyAsync(dst._mnff.ptr, self._mnff.ptr, self._mnff.size, stream.handle))
err, = driver.cuMemcpyAsync(dst._ptr, self._ptr, src_size, stream.handle)
raise_if_driver_error(err)
return dst

def copy_from(self, src: Buffer, *, stream: Stream):
Expand All @@ -159,11 +168,16 @@ def copy_from(self, src: Buffer, *, stream: Stream):
"""
if stream is None:
raise ValueError("stream must be provided")
if src._mnff.size != self._mnff.size:

cdef size_t dst_size = self._size
cdef size_t src_size = src._size

if src_size != dst_size:
raise ValueError(
f"buffer sizes mismatch between src and dst (sizes are: src={src._mnff.size}, dst={self._mnff.size})"
f"buffer sizes mismatch between src and dst (sizes are: src={src_size}, dst={dst_size})"
)
handle_return(driver.cuMemcpyAsync(self._mnff.ptr, src._mnff.ptr, self._mnff.size, stream.handle))
err, = driver.cuMemcpyAsync(self._ptr, src._ptr, dst_size, stream.handle)
raise_if_driver_error(err)

def __dlpack__(
self,
Expand All @@ -189,13 +203,14 @@ def __dlpack__(
return capsule

def __dlpack_device__(self) -> Tuple[int, int]:
d_h = (bool(self.is_device_accessible), bool(self.is_host_accessible))
if d_h == (True, False):
cdef bint d = self.is_device_accessible
cdef bint h = self.is_host_accessible
if d and (not h):
return (DLDeviceType.kDLCUDA, self.device_id)
if d_h == (True, True):
if d and h:
# TODO: this can also be kDLCUDAManaged, we need more fine-grained checks
return (DLDeviceType.kDLCUDAHost, 0)
if d_h == (False, True):
if (not d) and h:
return (DLDeviceType.kDLCPU, 0)
raise BufferError("buffer is neither device-accessible nor host-accessible")

Expand All @@ -211,7 +226,7 @@ def __release_buffer__(self, buffer: memoryview, /):
raise NotImplementedError("WIP: Buffer.__release_buffer__ hasn't been implemented yet.")

@staticmethod
def from_handle(ptr: DevicePointerT, size: int, mr: MemoryResource | None = None) -> Buffer:
def from_handle(ptr: DevicePointerT, size_t size, mr: MemoryResource | None = None) -> Buffer:
"""Create a new :class:`Buffer` object from a pointer.

Parameters
Expand Down Expand Up @@ -247,7 +262,7 @@ def __init__(self, *args, **kwargs):
...

@abc.abstractmethod
def allocate(self, size: int, stream: Stream = None) -> Buffer:
def allocate(self, size_t size, stream: Stream = None) -> Buffer:
"""Allocate a buffer of the requested size.

Parameters
Expand All @@ -268,7 +283,7 @@ def allocate(self, size: int, stream: Stream = None) -> Buffer:
...

@abc.abstractmethod
def deallocate(self, ptr: DevicePointerT, size: int, stream: Stream = None):
def deallocate(self, ptr: DevicePointerT, size_t size, stream: Stream = None):
"""Deallocate a buffer previously allocated by this resource.

Parameters
Expand Down Expand Up @@ -323,27 +338,28 @@ class DeviceMemoryResource(MemoryResource):
__slots__ = ("_dev_id",)

def __init__(self, device_id: int):
self._handle = handle_return(driver.cuDeviceGetMemPool(device_id))
err, self._handle = driver.cuDeviceGetMemPool(device_id)
raise_if_driver_error(err)
self._dev_id = device_id

# Set a higher release threshold to improve performance when there are no active allocations.
# By default, the release threshold is 0, which means memory is immediately released back
# to the OS when there are no active suballocations, causing performance issues.
# Check current release threshold
current_threshold = handle_return(
driver.cuMemPoolGetAttribute(self._handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD)
err, current_threshold = driver.cuMemPoolGetAttribute(
self._handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD
)
raise_if_driver_error(err)
# If threshold is 0 (default), set it to maximum to retain memory in the pool
if int(current_threshold) == 0:
handle_return(
driver.cuMemPoolSetAttribute(
self._handle,
driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD,
driver.cuuint64_t(0xFFFFFFFFFFFFFFFF),
)
err, = driver.cuMemPoolSetAttribute(
self._handle,
driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD,
driver.cuuint64_t(0xFFFFFFFFFFFFFFFF),
)
raise_if_driver_error(err)

def allocate(self, size: int, stream: Stream = None) -> Buffer:
def allocate(self, size_t size, stream: Stream = None) -> Buffer:
"""Allocate a buffer of the requested size.

Parameters
Expand All @@ -362,10 +378,11 @@ def allocate(self, size: int, stream: Stream = None) -> Buffer:
"""
if stream is None:
stream = default_stream()
ptr = handle_return(driver.cuMemAllocFromPoolAsync(size, self._handle, stream.handle))
err, ptr = driver.cuMemAllocFromPoolAsync(size, self._handle, stream.handle)
raise_if_driver_error(err)
return Buffer._init(ptr, size, self)

def deallocate(self, ptr: DevicePointerT, size: int, stream: Stream = None):
def deallocate(self, ptr: DevicePointerT, size_t size, stream: Stream = None):
"""Deallocate a buffer previously allocated by this resource.

Parameters
Expand All @@ -380,7 +397,8 @@ def deallocate(self, ptr: DevicePointerT, size: int, stream: Stream = None):
"""
if stream is None:
stream = default_stream()
handle_return(driver.cuMemFreeAsync(ptr, stream.handle))
err, = driver.cuMemFreeAsync(ptr, stream.handle)
raise_if_driver_error(err)

@property
def is_device_accessible(self) -> bool:
Expand All @@ -407,7 +425,7 @@ def __init__(self):
# TODO: support flags from cuMemHostAlloc?
self._handle = None

def allocate(self, size: int, stream: Stream = None) -> Buffer:
def allocate(self, size_t size, stream: Stream = None) -> Buffer:
"""Allocate a buffer of the requested size.

Parameters
Expand All @@ -422,10 +440,11 @@ def allocate(self, size: int, stream: Stream = None) -> Buffer:
Buffer
The allocated buffer object, which is accessible on both host and device.
"""
ptr = handle_return(driver.cuMemAllocHost(size))
err, ptr = driver.cuMemAllocHost(size)
raise_if_driver_error(err)
return Buffer._init(ptr, size, self)

def deallocate(self, ptr: DevicePointerT, size: int, stream: Stream = None):
def deallocate(self, ptr: DevicePointerT, size_t size, stream: Stream = None):
"""Deallocate a buffer previously allocated by this resource.

Parameters
Expand All @@ -440,7 +459,8 @@ def deallocate(self, ptr: DevicePointerT, size: int, stream: Stream = None):
"""
if stream:
stream.sync()
handle_return(driver.cuMemFreeHost(ptr))
err, = driver.cuMemFreeHost(ptr)
raise_if_driver_error(err)

@property
def is_device_accessible(self) -> bool:
Expand All @@ -466,14 +486,16 @@ def __init__(self, device_id):
self._dev_id = device_id

def allocate(self, size, stream=None) -> Buffer:
ptr = handle_return(driver.cuMemAlloc(size))
err, ptr = driver.cuMemAlloc(size)
raise_if_driver_error(err)
return Buffer._init(ptr, size, self)

def deallocate(self, ptr, size, stream=None):
if stream is None:
stream = default_stream()
stream.sync()
handle_return(driver.cuMemFree(ptr))
err, = driver.cuMemFree(ptr)
raise_if_driver_error(err)

@property
def is_device_accessible(self) -> bool:
Expand Down
4 changes: 3 additions & 1 deletion cuda_core/docs/source/release/0.X.Y-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Breaking Changes
----------------

- **LaunchConfig grid parameter interpretation**: When :attr:`LaunchConfig.cluster` is specified, the :attr:`LaunchConfig.grid` parameter now correctly represents the number of clusters instead of blocks. Previously, the grid parameter was incorrectly interpreted as blocks, causing a mismatch with the expected C++ behavior. This change ensures that ``LaunchConfig(grid=4, cluster=2, block=32)`` correctly produces 4 clusters × 2 blocks/cluster = 8 total blocks, matching the C++ equivalent ``cudax::make_hierarchy(cudax::grid_dims(4), cudax::cluster_dims(2), cudax::block_dims(32))``.
- When :class:`Buffer` is closed, :attr:`Buffer.handle` is now set to `None`. It was previously set to ``0`` by accident.


New features
Expand All @@ -40,4 +41,5 @@ Fixes and enhancements
- Improved :class:`DeviceMemoryResource` allocation performance when there are no active allocations by setting a higher release threshold (addresses issue #771).
- Improved :class:`StridedMemoryView` creation time performance by optimizing shape and strides tuple creation using Python/C API (addresses issue #449).
- Fix :class:`LaunchConfig` grid unit conversion when cluster is set (addresses issue #867).
- Fixed a bug in :class:`GraphBuilder.add_child` where dependencies extracted from capturing stream were passed inconsistently with num_dependencies parameter (addresses issue #843).
- Fixed a bug in :class:`GraphBuilder.add_child` where dependencies extracted from capturing stream were passed inconsistently with num_dependencies parameter (addresses issue #843).
- Make :class:`Buffer` creation more performant.
6 changes: 3 additions & 3 deletions cuda_core/examples/memory_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
cp.cuda.Stream.null.use() # reset CuPy's current stream to the null stream

# Verify buffers are properly closed
assert device_buffer.handle == 0, "Device buffer should be closed"
assert pinned_buffer.handle == 0, "Pinned buffer should be closed"
assert new_device_buffer.handle == 0, "New device buffer should be closed"
assert device_buffer.handle is None, "Device buffer should be closed"
assert pinned_buffer.handle is None, "Pinned buffer should be closed"
assert new_device_buffer.handle is None, "New device buffer should be closed"

print("Memory management example completed!")
Loading
Loading