Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e98e2fe
Initial FLI-based implementation
al-rigazzi Jun 25, 2024
043f0e7
Add inference example stub
al-rigazzi Jun 25, 2024
efc9e83
Lint, style, black magic
al-rigazzi Jun 25, 2024
35ec45e
Merge branch 'mli-feature' of https://github.com/CrayLabs/SmartSim in…
al-rigazzi Jun 25, 2024
ed3c42a
Bring up to feature branch
al-rigazzi Jun 25, 2024
e5be26b
Update example
al-rigazzi Jun 25, 2024
a23010f
Change the changelog
al-rigazzi Jun 25, 2024
3c20f46
Make style
al-rigazzi Jun 25, 2024
b9ed5ba
Attempt to mitigate import dragon error
al-rigazzi Jun 26, 2024
0de06f3
Import dragon optional
al-rigazzi Jun 26, 2024
d051385
isort
al-rigazzi Jun 26, 2024
e77b1cd
Fix imports in dragon backend tests
al-rigazzi Jun 26, 2024
a90888d
Style
al-rigazzi Jun 26, 2024
b431221
Fix type
al-rigazzi Jun 26, 2024
23efebc
Rename examples dir
al-rigazzi Jun 26, 2024
09b9d24
Remove old dir
al-rigazzi Jun 26, 2024
56d8e50
Add tests for torch worker
al-rigazzi Jun 26, 2024
6cec83e
Switch to sender-supplied channels in app example
al-rigazzi Jun 27, 2024
3ad6d44
Add prototype client for mock app
al-rigazzi Jun 27, 2024
bd5f133
Update mock app
al-rigazzi Jun 28, 2024
3e343ee
Changes to feature store
al-rigazzi Jul 4, 2024
a0525e5
Merge upstream
al-rigazzi Jul 5, 2024
a2bed26
Make style
al-rigazzi Jul 5, 2024
36e92d9
Fix typing
al-rigazzi Jul 5, 2024
59840a3
Fix lint
al-rigazzi Jul 5, 2024
b35b37d
Remove duplicated/useless comments
al-rigazzi Jul 5, 2024
51e0b17
Bring up to date with new schema
al-rigazzi Jul 9, 2024
1fcf17d
Add feature store prototype caching
al-rigazzi Jul 10, 2024
d76f880
Add redis driver, fix FLI
al-rigazzi Jul 10, 2024
0564d01
Merge branch 'mli-feature' of https://github.com/CrayLabs/SmartSim in…
al-rigazzi Jul 11, 2024
3938ec8
Update post-merge
al-rigazzi Jul 11, 2024
273a7d9
Fix typing
al-rigazzi Jul 11, 2024
a12d923
isort
al-rigazzi Jul 11, 2024
38b0de1
Update envloader test
al-rigazzi Jul 11, 2024
53eb045
no more data blob
AlyssaCote Jul 11, 2024
e64532d
fixing up worker manager
AlyssaCote Jul 11, 2024
52f5e74
fixed tests, maybe fixed mock app?
AlyssaCote Jul 12, 2024
0e3bd61
mli driver runs all the way through
AlyssaCote Jul 13, 2024
e3f44a5
weaks
AlyssaCote Jul 15, 2024
6a80895
merge
AlyssaCote Jul 15, 2024
b57fc8e
more clean up
AlyssaCote Jul 15, 2024
c1f856b
changelog, mypy
AlyssaCote Jul 15, 2024
d4b67dc
Merge branch 'mli-feature' into rework_schemas
AlyssaCote Jul 18, 2024
f1415f2
pr comments addressed
AlyssaCote Jul 18, 2024
dafb4df
style
AlyssaCote Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jump to:

Description

- Adjust schemas for better performance
- Add TorchWorker first implementation and mock inference app example
- Add error handling in Worker Manager pipeline
- Add EnvironmentConfigLoader for ML Worker Manager
Expand Down
18 changes: 12 additions & 6 deletions ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ def print_timings(self, to_file: bool = False):


def run_model(self, model: bytes | str, batch: torch.Tensor):
tensors = [batch.numpy()]
self.start_timings(batch.shape[0])
built_tensor = MessageHandler.build_tensor(
batch.numpy(), "c", "float32", list(batch.shape))
self.measure_time("build_tensor")
built_tensor_desc = MessageHandler.build_tensor_descriptor(
"c", "float32", list(batch.shape))
self.measure_time("build_tensor_descriptor")
built_model = None
if isinstance(model, str):
model_arg = MessageHandler.build_model_key(model)
Expand All @@ -120,7 +121,7 @@ def run_model(self, model: bytes | str, batch: torch.Tensor):
request = MessageHandler.build_request(
reply_channel=self._from_worker_ch_serialized,
model= model_arg,
inputs=[built_tensor],
inputs=[built_tensor_desc],
outputs=[],
output_descriptors=[],
custom_attributes=None,
Expand All @@ -130,6 +131,9 @@ def run_model(self, model: bytes | str, batch: torch.Tensor):
self.measure_time("serialize_request")
with self._to_worker_fli.sendh(timeout=None, stream_channel=self._to_worker_ch) as to_sendh:
to_sendh.send_bytes(request_bytes)
for t in tensors:
to_sendh.send_bytes(t.tobytes()) #TODO NOT FAST ENOUGH!!!
# to_sendh.send_bytes(bytes(t.data))
logger.info(f"Message size: {len(request_bytes)} bytes")

self.measure_time("send")
Expand All @@ -138,10 +142,12 @@ def run_model(self, model: bytes | str, batch: torch.Tensor):
self.measure_time("receive")
response = MessageHandler.deserialize_response(resp)
self.measure_time("deserialize_response")
# list of data blobs? recv depending on the len(response.result.descriptors)?
data_blob = from_recvh.recv_bytes(timeout=None)
result = torch.from_numpy(
numpy.frombuffer(
response.result.data[0].blob,
dtype=str(response.result.data[0].tensorDescriptor.dataType),
data_blob,
dtype=str(response.result.descriptors[0].dataType),
)
)
self.measure_time("deserialize_tensor")
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/mli/comm/channel/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def send(self, value: bytes) -> None:
:param value: The value to send"""

@abstractmethod
def recv(self) -> bytes:
def recv(self) -> t.List[bytes]:
"""Receieve a message through the underlying communication channel
:returns: the received message"""

Expand Down
5 changes: 3 additions & 2 deletions smartsim/_core/mli/comm/channel/dragonchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import sys
import typing as t

import smartsim._core.mli.comm.channel.channel as cch
from smartsim.log import get_logger
Expand Down Expand Up @@ -52,9 +53,9 @@ def send(self, value: bytes) -> None:
with self._channel.sendh(timeout=None) as sendh:
sendh.send_bytes(value)

def recv(self) -> bytes:
def recv(self) -> t.List[bytes]:
"""Receieve a message through the underlying communication channel
:returns: the received message"""
with self._channel.recvh(timeout=None) as recvh:
message_bytes: bytes = recvh.recv_bytes(timeout=None)
return message_bytes
return [message_bytes]
17 changes: 10 additions & 7 deletions smartsim/_core/mli/comm/channel/dragonfli.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ def send(self, value: bytes) -> None:
with self._fli.sendh(timeout=None, stream_channel=self._channel) as sendh:
sendh.send_bytes(value)

def recv(self) -> bytes:
def recv(self) -> t.List[bytes]:
"""Receieve a message through the underlying communication channel
:returns: the received message"""
messages = []
eot = False
with self._fli.recvh(timeout=None) as recvh:
try:
request_bytes: bytes
request_bytes, _ = recvh.recv_bytes(timeout=None)
return request_bytes
except fli.FLIEOT as exc:
return b""
while not eot:
try:
message, _ = recvh.recv_bytes(timeout=None)
messages.append(message)
except fli.FLIEOT as exc:
eot = True
return messages
52 changes: 32 additions & 20 deletions smartsim/_core/mli/infrastructure/control/workermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

from smartsim._core.mli.mli_schemas.model.model_capnp import Model
from smartsim._core.mli.mli_schemas.response.response_capnp import Status
from smartsim._core.mli.mli_schemas.tensor.tensor_capnp import TensorDescriptor

logger = get_logger(__name__)

Expand Down Expand Up @@ -88,25 +89,23 @@ def deserialize_message(
elif request.model.which() == "data":
model_bytes = request.model.data

callback_key = request.replyChannel.reply
callback_key = request.replyChannel.descriptor

# todo: shouldn't this be `CommChannel.find` instead of `DragonCommChannel`
comm_channel = channel_type(callback_key)
# comm_channel = DragonCommChannel(request.replyChannel)

input_keys: t.Optional[t.List[str]] = None
input_bytes: t.Optional[t.List[bytes]] = (
None # these will really be tensors already
)
input_bytes: t.Optional[t.List[bytes]] = None

output_keys: t.Optional[t.List[str]] = None

input_meta: t.List[t.Any] = []
input_meta: t.Optional[t.List[TensorDescriptor]] = None

if request.input.which() == "keys":
input_keys = [input_key.key for input_key in request.input.keys]
elif request.input.which() == "data":
input_bytes = [data.blob for data in request.input.data]
input_meta = [data.tensorDescriptor for data in request.input.data]
elif request.input.which() == "descriptors":
input_meta = request.input.descriptors # type: ignore

if request.output:
output_keys = [tensor_key.key for tensor_key in request.output]
Expand Down Expand Up @@ -142,20 +141,13 @@ def prepare_outputs(reply: InferenceReply) -> t.List[t.Any]:
msg_key = MessageHandler.build_tensor_key(key)
prepared_outputs.append(msg_key)
elif reply.outputs:
arrays: t.List[np.ndarray[t.Any, np.dtype[t.Any]]] = [
output.numpy() for output in reply.outputs
]
for tensor in arrays:
# todo: need to have the output attributes specified in the req?
# maybe, add `MessageHandler.dtype_of(tensor)`?
# can `build_tensor` do dtype and shape?
msg_tensor = MessageHandler.build_tensor(
tensor,
for _ in reply.outputs:
msg_tensor_desc = MessageHandler.build_tensor_descriptor(
"c",
"float32",
[1],
)
prepared_outputs.append(msg_tensor)
prepared_outputs.append(msg_tensor_desc)
return prepared_outputs


Expand Down Expand Up @@ -272,13 +264,28 @@ def _on_iteration(self) -> None:
return

timings = [] # timing
# perform default deserialization of the message envelope
request_bytes: bytes = self._task_queue.recv()

bytes_list: t.List[bytes] = self._task_queue.recv()

if not bytes_list:
exception_handler(
ValueError("No request data found"),
None,
"No request data found.",
)
return

request_bytes = bytes_list[0]
tensor_bytes_list = bytes_list[1:]

interm = time.perf_counter() # timing
request = deserialize_message(
request_bytes, self._comm_channel_type, self._device
)

if request.input_meta and tensor_bytes_list:
request.raw_inputs = tensor_bytes_list

if not self._validate_request(request):
return

Expand Down Expand Up @@ -430,7 +437,12 @@ def _on_iteration(self) -> None:
timings.append(time.perf_counter() - interm) # timing
interm = time.perf_counter() # timing
if request.callback:
# send serialized response
request.callback.send(serialized_resp)
if reply.outputs:
# send tensor data after response
for output in reply.outputs:
request.callback.send(output)

timings.append(time.perf_counter() - interm) # timing
interm = time.perf_counter() # timing
Expand Down
10 changes: 8 additions & 2 deletions smartsim/_core/mli/infrastructure/worker/torch_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,16 @@ def transform_output(
result_device: str,
) -> TransformOutputResult:
if result_device != "cpu":
transformed = [item.to("cpu") for item in execute_result.predictions]
transformed = [
item.to("cpu").numpy().tobytes() for item in execute_result.predictions
]

# todo: need the shape from latest schemas added here.
return TransformOutputResult(transformed, None, "c", "float32") # fixme

return TransformOutputResult(
execute_result.predictions, None, "c", "float32"
[item.numpy().tobytes() for item in execute_result.predictions],
None,
"c",
"float32",
) # fixme
2 changes: 1 addition & 1 deletion smartsim/_core/mli/infrastructure/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
self.model_key = model_key
self.raw_model = raw_model
self.callback = callback
self.raw_inputs = raw_inputs
self.raw_inputs = raw_inputs or []
self.input_keys = input_keys or []
self.input_meta = input_meta or []
self.output_keys = output_keys or []
Expand Down
46 changes: 21 additions & 25 deletions smartsim/_core/mli/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import typing as t

import numpy as np

from .mli_schemas.data import data_references_capnp
from .mli_schemas.model import model_capnp
from .mli_schemas.request import request_capnp
Expand All @@ -38,17 +36,15 @@

class MessageHandler:
@staticmethod
def build_tensor(
tensor: np.ndarray[t.Any, np.dtype[t.Any]],
def build_tensor_descriptor(
order: "tensor_capnp.Order",
data_type: "tensor_capnp.NumericalType",
dimensions: t.List[int],
) -> tensor_capnp.Tensor:
) -> tensor_capnp.TensorDescriptor:
"""
Builds a Tensor message using the provided data,
Builds a TensorDescriptor message using the provided
order, data type, and dimensions.

:param tensor: Tensor to build the message around
:param order: Order of the tensor, such as row-major (c) or column-major (f)
:param data_type: Data type of the tensor
:param dimensions: Dimensions of the tensor
Expand All @@ -59,15 +55,12 @@ def build_tensor(
description.order = order
description.dataType = data_type
description.dimensions = dimensions
built_tensor = tensor_capnp.Tensor.new_message()
built_tensor.blob = tensor.tobytes() # tensor channel instead?
built_tensor.tensorDescriptor = description
except Exception as e:
raise ValueError(
"Error building tensor."
"Error building tensor descriptor."
) from e # TODO: create custom exception

return built_tensor
return description

@staticmethod
def build_output_tensor_descriptor(
Expand Down Expand Up @@ -240,15 +233,16 @@ def _assign_reply_channel(
:raises ValueError: if building fails
"""
try:
request.replyChannel.reply = reply_channel
request.replyChannel.descriptor = reply_channel
except Exception as e:
raise ValueError("Error building reply channel portion of request.") from e

@staticmethod
def _assign_inputs(
request: request_capnp.Request,
inputs: t.Union[
t.List[data_references_capnp.TensorKey], t.List[tensor_capnp.Tensor]
t.List[data_references_capnp.TensorKey],
t.List[tensor_capnp.TensorDescriptor],
],
) -> None:
"""
Expand All @@ -262,14 +256,13 @@ def _assign_inputs(
if inputs:
display_name = inputs[0].schema.node.displayName # type: ignore
input_class_name = display_name.split(":")[-1]
if input_class_name == "Tensor":
request.input.data = inputs # type: ignore
if input_class_name == "TensorDescriptor":
request.input.descriptors = inputs # type: ignore
elif input_class_name == "TensorKey":
request.input.keys = inputs # type: ignore
else:
raise ValueError(
"Invalid input class name. Expected 'Tensor' or 'TensorKey'."
)
raise ValueError("""Invalid input class name. Expected
'TensorDescriptor' or 'TensorKey'.""")
except Exception as e:
raise ValueError("Error building inputs portion of request.") from e

Expand Down Expand Up @@ -351,7 +344,8 @@ def build_request(
reply_channel: bytes,
model: t.Union[data_references_capnp.ModelKey, model_capnp.Model],
inputs: t.Union[
t.List[data_references_capnp.TensorKey], t.List[tensor_capnp.Tensor]
t.List[data_references_capnp.TensorKey],
t.List[tensor_capnp.TensorDescriptor],
],
outputs: t.List[data_references_capnp.TensorKey],
output_descriptors: t.List[tensor_capnp.OutputDescriptor],
Expand Down Expand Up @@ -437,7 +431,8 @@ def _assign_message(response: response_capnp.Response, message: str) -> None:
def _assign_result(
response: response_capnp.Response,
result: t.Union[
t.List[tensor_capnp.Tensor], t.List[data_references_capnp.TensorKey]
t.List[tensor_capnp.TensorDescriptor],
t.List[data_references_capnp.TensorKey],
],
) -> None:
"""
Expand All @@ -452,13 +447,13 @@ def _assign_result(
first_result = result[0]
display_name = first_result.schema.node.displayName # type: ignore
result_class_name = display_name.split(":")[-1]
if result_class_name == "Tensor":
response.result.data = result # type: ignore
if result_class_name == "TensorDescriptor":
response.result.descriptors = result # type: ignore
elif result_class_name == "TensorKey":
response.result.keys = result # type: ignore
else:
raise ValueError("""Invalid custom attribute class name.
Expected 'Tensor' or 'TensorKey'.""")
Expected 'TensorDescriptor' or 'TensorKey'.""")
except Exception as e:
raise ValueError("Error assigning result to response.") from e

Expand Down Expand Up @@ -501,7 +496,8 @@ def build_response(
status: "response_capnp.Status",
message: str,
result: t.Union[
t.List[tensor_capnp.Tensor], t.List[data_references_capnp.TensorKey]
t.List[tensor_capnp.TensorDescriptor],
t.List[data_references_capnp.TensorKey],
],
custom_attributes: t.Union[
response_attributes_capnp.TorchResponseAttributes,
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/mli/mli_schemas/request/request.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ using DataRef = import "../data/data_references.capnp";
using Models = import "../model/model.capnp";

struct ChannelDescriptor {
reply @0 :Data;
descriptor @0 :Data;
}

struct Request {
Expand All @@ -43,7 +43,7 @@ struct Request {
}
input :union {
keys @3 :List(DataRef.TensorKey);
data @4 :List(Tensors.Tensor);
descriptors @4 :List(Tensors.TensorDescriptor);
}
output @5 :List(DataRef.TensorKey);
outputDescriptors @6 :List(Tensors.OutputDescriptor);
Expand Down
Loading