Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python: ['3.9', '3.10', '3.11', '3.12']
python: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python }}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "dispatch-py"
description = "Develop reliable distributed systems on the Dispatch platform."
readme = "README.md"
dynamic = ["version"]
requires-python = ">= 3.9"
requires-python = ">= 3.8"
dependencies = [
"grpcio >= 1.60.0",
"protobuf >= 4.24.0",
Expand Down
18 changes: 9 additions & 9 deletions src/dispatch/coroutine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from types import coroutine
from typing import Any, Awaitable
from typing import Any, Awaitable, List, Tuple

from dispatch.experimental.durable import durable
from dispatch.proto import Call
Expand All @@ -16,14 +16,14 @@ def call(call: Call) -> Any:

@coroutine
@durable
def gather(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def gather(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Alias for all."""
return all(*awaitables)


@coroutine
@durable
def all(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def all(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Concurrently run a set of coroutines, blocking until all coroutines
return or any coroutine raises an error. If any coroutine fails with an
uncaught exception, the exception will be re-raised here."""
Expand All @@ -32,7 +32,7 @@ def all(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]

@coroutine
@durable
def any(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def any(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Concurrently run a set of coroutines, blocking until any coroutine
returns or all coroutines raises an error. If all coroutines fail with
uncaught exceptions, the exception(s) will be re-raised here."""
Expand All @@ -41,7 +41,7 @@ def any(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]

@coroutine
@durable
def race(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def race(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Concurrently run a set of coroutines, blocking until any coroutine
returns or raises an error. If any coroutine fails with an uncaught
exception, the exception will be re-raised here."""
Expand All @@ -50,17 +50,17 @@ def race(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]

@dataclass
class AllDirective:
awaitables: tuple[Awaitable[Any], ...]
awaitables: Tuple[Awaitable[Any], ...]


@dataclass
class AnyDirective:
awaitables: tuple[Awaitable[Any], ...]
awaitables: Tuple[Awaitable[Any], ...]


@dataclass
class RaceDirective:
awaitables: tuple[Awaitable[Any], ...]
awaitables: Tuple[Awaitable[Any], ...]


class AnyException(RuntimeError):
Expand All @@ -69,7 +69,7 @@ class AnyException(RuntimeError):

__slots__ = ("exceptions",)

def __init__(self, exceptions: list[Exception]):
def __init__(self, exceptions: List[Exception]):
self.exceptions = exceptions

def __str__(self):
Expand Down
12 changes: 8 additions & 4 deletions src/dispatch/experimental/durable/frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>

#if PY_MAJOR_VERSION != 3 || (PY_MINOR_VERSION < 9 || PY_MINOR_VERSION > 13)
# error Python 3.9-3.13 is required
#if PY_MAJOR_VERSION != 3 || (PY_MINOR_VERSION < 8 || PY_MINOR_VERSION > 13)
# error Python 3.8-3.13 is required
#endif

// This is a redefinition of the private PyTryBlock from <= 3.10.
// https://github.com/python/cpython/blob/3.8/Include/frameobject.h#L10
// https://github.com/python/cpython/blob/3.9/Include/cpython/frameobject.h#L11
// https://github.com/python/cpython/blob/3.10/Include/cpython/frameobject.h#L22
typedef struct {
Expand All @@ -19,7 +20,8 @@ typedef struct {
int b_level;
} PyTryBlock;

// This is a redefinition of the private PyCoroWrapper from 3.9-3.13.
// This is a redefinition of the private PyCoroWrapper from 3.8-3.13.
// https://github.com/python/cpython/blob/3.8/Objects/genobject.c#L840
// https://github.com/python/cpython/blob/3.9/Objects/genobject.c#L830
// https://github.com/python/cpython/blob/3.10/Objects/genobject.c#L884
// https://github.com/python/cpython/blob/3.11/Objects/genobject.c#L1016
Expand Down Expand Up @@ -53,7 +55,9 @@ static int get_frame_iblock(Frame *frame);
static void set_frame_iblock(Frame *frame, int iblock);
static PyTryBlock *get_frame_blockstack(Frame *frame);

#if PY_MINOR_VERSION == 9
#if PY_MINOR_VERSION == 8
#include "frame308.h"
#elif PY_MINOR_VERSION == 9
#include "frame309.h"
#elif PY_MINOR_VERSION == 10
#include "frame310.h"
Expand Down
145 changes: 145 additions & 0 deletions src/dispatch/experimental/durable/frame308.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// This is a redefinition of the private/opaque frame object.
//
// https://github.com/python/cpython/blob/3.8/Include/frameobject.h#L16
//
// In Python <= 3.10, `struct _frame` is both the PyFrameObject and
// PyInterpreterFrame. From Python 3.11 onwards, the two were split with the
// PyFrameObject (struct _frame) pointing to struct _PyInterpreterFrame.
struct Frame {
PyObject_VAR_HEAD
struct Frame *f_back; // struct _frame
PyCodeObject *f_code;
PyObject *f_builtins;
PyObject *f_globals;
PyObject *f_locals;
PyObject **f_valuestack;
PyObject **f_stacktop;
PyObject *f_trace;
char f_trace_lines;
char f_trace_opcodes;
PyObject *f_gen;
int f_lasti;
int f_lineno;
int f_iblock;
char f_executing;
PyTryBlock f_blockstack[CO_MAXBLOCKS];
PyObject *f_localsplus[1];
};

// Python 3.9 and prior didn't have an explicit enum of frame states,
// but we can derive them based on the presence of a frame, and other
// information found on the frame, for compatibility with later versions.
typedef enum _framestate {
FRAME_CREATED = -2,
FRAME_EXECUTING = 0,
FRAME_CLEARED = 4
} FrameState;

/*
// This is the definition of PyGenObject for reference to developers
// working on the extension.
//
// Note that PyCoroObject and PyAsyncGenObject have the same layout as
// PyGenObject, however the struct fields have a cr_ and ag_ prefix
// (respectively) rather than a gi_ prefix. In Python <= 3.10, PyCoroObject
// and PyAsyncGenObject have extra fields compared to PyGenObject. In Python
// 3.11 onwards, the three objects are identical (except for field name
// prefixes). The extra fields in Python <= 3.10 are not applicable to the
// extension at this time.
//
// https://github.com/python/cpython/blob/3.8/Include/genobject.h#L17
typedef struct {
PyObject_HEAD
PyFrameObject *gi_frame;
char gi_running;
PyObject *gi_code;
PyObject *gi_weakreflist;
PyObject *gi_name;
PyObject *gi_qualname;
_PyErr_StackItem gi_exc_state;
} PyGenObject;
*/

static Frame *get_frame(PyGenObject *gen_like) {
Frame *frame = (Frame *)(gen_like->gi_frame);
assert(frame);
return frame;
}

static PyCodeObject *get_frame_code(Frame *frame) {
PyCodeObject *code = frame->f_code;
assert(code);
return code;
}

static int get_frame_lasti(Frame *frame) {
return frame->f_lasti;
}

static void set_frame_lasti(Frame *frame, int lasti) {
frame->f_lasti = lasti;
}

static int get_frame_state(PyGenObject *gen_like) {
// Python 3.8 doesn't have frame states, but we can derive
// some for compatibility with later versions and to simplify
// the extension.
Frame *frame = (Frame *)(gen_like->gi_frame);
if (!frame) {
return FRAME_CLEARED;
}
return frame->f_executing ? FRAME_EXECUTING : FRAME_CREATED;
}

static void set_frame_state(PyGenObject *gen_like, int fs) {
Frame *frame = get_frame(gen_like);
frame->f_executing = (fs == FRAME_EXECUTING);
}

static int valid_frame_state(int fs) {
return fs == FRAME_CREATED || fs == FRAME_EXECUTING || fs == FRAME_CLEARED;
}

static int get_frame_stacktop_limit(Frame *frame) {
PyCodeObject *code = get_frame_code(frame);
return code->co_stacksize + code->co_nlocals;
}

static int get_frame_stacktop(Frame *frame) {
assert(frame->f_localsplus);
int stacktop = (int)(frame->f_stacktop - frame->f_localsplus);
assert(stacktop >= 0 && stacktop < get_frame_stacktop_limit(frame));
return stacktop;
}

static void set_frame_stacktop(Frame *frame, int stacktop) {
assert(stacktop >= 0 && stacktop < get_frame_stacktop_limit(frame));
assert(frame->f_localsplus);
frame->f_stacktop = frame->f_localsplus + stacktop;
}

static PyObject **get_frame_localsplus(Frame *frame) {
PyObject **localsplus = frame->f_localsplus;
assert(localsplus);
return localsplus;
}

static int get_frame_iblock_limit(Frame *frame) {
return CO_MAXBLOCKS;
}

static int get_frame_iblock(Frame *frame) {
return frame->f_iblock;
}

static void set_frame_iblock(Frame *frame, int iblock) {
assert(iblock >= 0 && iblock < get_frame_iblock_limit(frame));
frame->f_iblock = iblock;
}

static PyTryBlock *get_frame_blockstack(Frame *frame) {
PyTryBlock *blockstack = frame->f_blockstack;
assert(blockstack);
return blockstack;
}

19 changes: 15 additions & 4 deletions src/dispatch/experimental/durable/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@
MethodType,
TracebackType,
)
from typing import Any, Callable, Coroutine, Generator, Optional, TypeVar, Union, cast
from typing import (
Any,
Callable,
Coroutine,
Dict,
Generator,
Optional,
Tuple,
TypeVar,
Union,
cast,
)

from . import frame as ext
from .registry import RegisteredFunction, lookup_function, register_function
Expand Down Expand Up @@ -78,8 +89,8 @@ class Serializable:
g: Union[GeneratorType, CoroutineType]
registered_fn: RegisteredFunction
wrapped_coroutine: Union["DurableCoroutine", None]
args: tuple[Any, ...]
kwargs: dict[str, Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]

def __init__(
self,
Expand Down Expand Up @@ -274,7 +285,7 @@ def cr_await(self) -> Any:
return self.coroutine.cr_await

@property
def cr_origin(self) -> Optional[tuple[tuple[str, int, str], ...]]:
def cr_origin(self) -> Optional[Tuple[Tuple[str, int, str], ...]]:
return self.coroutine.cr_origin

def __repr__(self) -> str:
Expand Down
3 changes: 2 additions & 1 deletion src/dispatch/experimental/durable/registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
from dataclasses import dataclass
from types import FunctionType
from typing import Dict


@dataclass
Expand Down Expand Up @@ -46,7 +47,7 @@ def __setstate__(self, state):
self.hash = code_hash


_REGISTRY: dict[str, RegisteredFunction] = {}
_REGISTRY: Dict[str, RegisteredFunction] = {}


def register_function(fn: FunctionType) -> RegisteredFunction:
Expand Down
7 changes: 4 additions & 3 deletions src/dispatch/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Dict,
Generic,
Iterable,
List,
Optional,
TypeVar,
overload,
Expand Down Expand Up @@ -329,7 +330,7 @@ def batch(self) -> Batch:
a set of calls to dispatch."""
return Batch(self)

def dispatch(self, calls: Iterable[Call]) -> list[DispatchID]:
def dispatch(self, calls: Iterable[Call]) -> List[DispatchID]:
"""Dispatch function calls.

Args:
Expand Down Expand Up @@ -369,7 +370,7 @@ class Batch:

def __init__(self, client: Client):
self.client = client
self.calls: list[Call] = []
self.calls: List[Call] = []

def add(self, func: Function[P, T], *args: P.args, **kwargs: P.kwargs) -> Batch:
"""Add a call to the specified function to the batch."""
Expand All @@ -380,7 +381,7 @@ def add_call(self, call: Call) -> Batch:
self.calls.append(call)
return self

def dispatch(self) -> list[DispatchID]:
def dispatch(self) -> List[DispatchID]:
"""Dispatch dispatches the calls asynchronously.

The batch is reset when the calls are dispatched successfully.
Expand Down
Loading