diff --git a/doc/changelog.md b/doc/changelog.md index 1f201f3a8f..78d06663b5 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -9,6 +9,14 @@ Jump to: ## SmartSim +### MLI branch + +Description + +- Added schemas and MessageHandler class for de/serialization of + inference requests and response messages + + ### Development branch To be released at some future point in time diff --git a/setup.py b/setup.py index 96f98bc2cb..55a917e9c6 100644 --- a/setup.py +++ b/setup.py @@ -176,6 +176,7 @@ def has_ext_modules(_placeholder): "protobuf~=3.20", "jinja2>=3.1.2", "watchdog>=4.0.0", + "pycapnp==2.0.0", "pydantic==1.10.14", "pyzmq>=25.1.2", "pygithub>=2.3.0", diff --git a/smartsim/_core/mli/__init__.py b/smartsim/_core/mli/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smartsim/_core/mli/message_handler.py b/smartsim/_core/mli/message_handler.py new file mode 100644 index 0000000000..733fa83d98 --- /dev/null +++ b/smartsim/_core/mli/message_handler.py @@ -0,0 +1,535 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import typing as t + +import numpy as np + +from .mli_schemas.data import data_references_capnp +from .mli_schemas.request import request_capnp +from .mli_schemas.request.request_attributes import request_attributes_capnp +from .mli_schemas.response import response_capnp +from .mli_schemas.response.response_attributes import response_attributes_capnp +from .mli_schemas.tensor import tensor_capnp + + +class MessageHandler: + @staticmethod + def build_tensor( + tensor: np.ndarray[t.Any, np.dtype[t.Any]], + order: "tensor_capnp.Order", + data_type: "tensor_capnp.NumericalType", + dimensions: t.List[int], + ) -> tensor_capnp.Tensor: + """ + Builds a Tensor message using the provided data, + order, data type, and dimensions. + + :param tensor: Tensor to build the message around + :param order: Order of the tensor, such as row-major (c) or column-major (f) + :param data_type: Data type of the tensor + :param dimensions: Dimensions of the tensor + :raises ValueError: if building fails + """ + try: + description = tensor_capnp.TensorDescriptor.new_message() + description.order = order + description.dataType = data_type + description.dimensions = dimensions + built_tensor = tensor_capnp.Tensor.new_message() + built_tensor.blob = tensor.tobytes() # tensor channel instead? + built_tensor.tensorDescriptor = description + except Exception as e: + raise ValueError( + "Error building tensor." + ) from e # TODO: create custom exception + + return built_tensor + + @staticmethod + def build_output_tensor_descriptor( + order: "tensor_capnp.Order", + keys: t.List["data_references_capnp.TensorKey"], + data_type: "tensor_capnp.ReturnNumericalType", + dimensions: t.List[int], + ) -> tensor_capnp.OutputDescriptor: + """ + Builds an OutputDescriptor message using the provided + order, data type, and dimensions. + + :param order: Order of the tensor, such as row-major (c) or column-major (f) + :param keys: List of TensorKeys to apply transorm descriptor to + :param data_type: Tranform data type of the tensor + :param dimensions: Transform dimensions of the tensor + :raises ValueError: if building fails + """ + try: + description = tensor_capnp.OutputDescriptor.new_message() + description.order = order + description.optionalKeys = keys + description.optionalDatatype = data_type + description.optionalDimension = dimensions + + except Exception as e: + raise ValueError("Error building output tensor descriptor.") from e + + return description + + @staticmethod + def build_tensor_key(key: str) -> data_references_capnp.TensorKey: + """ + Builds a new TensorKey message with the provided key. + + :param key: String to set the TensorKey + :raises ValueError: if building fails + """ + try: + tensor_key = data_references_capnp.TensorKey.new_message() + tensor_key.key = key + except Exception as e: + raise ValueError("Error building tensor key.") from e + return tensor_key + + @staticmethod + def build_model_key(key: str) -> data_references_capnp.ModelKey: + """ + Builds a new ModelKey message with the provided key. + + :param key: String to set the ModelKey + :raises ValueError: if building fails + """ + try: + model_key = data_references_capnp.ModelKey.new_message() + model_key.key = key + except Exception as e: + raise ValueError("Error building model key.") from e + return model_key + + @staticmethod + def build_torch_request_attributes( + tensor_type: "request_attributes_capnp.TorchTensorType", + ) -> request_attributes_capnp.TorchRequestAttributes: + """ + Builds a new TorchRequestAttributes message with the provided tensor type. + + :param tensor_type: Type of the tensor passed in + :raises ValueError: if building fails + """ + try: + attributes = request_attributes_capnp.TorchRequestAttributes.new_message() + attributes.tensorType = tensor_type + except Exception as e: + raise ValueError("Error building Torch request attributes.") from e + return attributes + + @staticmethod + def build_tf_request_attributes( + name: str, tensor_type: "request_attributes_capnp.TFTensorType" + ) -> request_attributes_capnp.TensorFlowRequestAttributes: + """ + Builds a new TensorFlowRequestAttributes message with + the provided name and tensor type. + + :param name: Name of the tensor + :param tensor_type: Type of the tensor passed in + :raises ValueError: if building fails + """ + try: + attributes = ( + request_attributes_capnp.TensorFlowRequestAttributes.new_message() + ) + attributes.name = name + attributes.tensorType = tensor_type + except Exception as e: + raise ValueError("Error building TensorFlow request attributes.") from e + return attributes + + @staticmethod + def build_torch_response_attributes() -> ( + response_attributes_capnp.TorchResponseAttributes + ): + """ + Builds a new TorchResponseAttributes message. + """ + return response_attributes_capnp.TorchResponseAttributes.new_message() + + @staticmethod + def build_tf_response_attributes() -> ( + response_attributes_capnp.TensorFlowResponseAttributes + ): + """ + Builds a new TensorFlowResponseAttributes message. + """ + return response_attributes_capnp.TensorFlowResponseAttributes.new_message() + + @staticmethod + def _assign_model( + request: request_capnp.Request, + model: t.Union[data_references_capnp.ModelKey, t.ByteString], + ) -> None: + """ + Assigns a model to the supplied request. + + :param request: Request being built + :param model: Model to be assigned + :raises ValueError: if building fails + """ + try: + if isinstance(model, bytes): + request.model.modelData = model + else: + request.model.modelKey = model # type: ignore + except Exception as e: + raise ValueError("Error building model portion of request.") from e + + @staticmethod + def _assign_reply_channel( + request: request_capnp.Request, reply_channel: t.ByteString + ) -> None: + """ + Assigns a reply channel to the supplied request. + + :param request: Request being built + :param reply_channel: Reply channel to be assigned + :raises ValueError: if building fails + """ + try: + request.replyChannel.reply = reply_channel + except Exception as e: + raise ValueError("Error building reply channel portion of request.") from e + + @staticmethod + def _assign_device( + request: request_capnp.Request, device: "request_capnp.Device" + ) -> None: + """ + Assigns a device to the supplied request. + + :param request: Request being built + :param device: Device to be assigned + :raises ValueError: if building fails + """ + try: + request.device = device + except Exception as e: + raise ValueError("Error building device portion of request.") from e + + @staticmethod + def _assign_inputs( + request: request_capnp.Request, + inputs: t.Union[ + t.List[data_references_capnp.TensorKey], t.List[tensor_capnp.Tensor] + ], + ) -> None: + """ + Assigns inputs to the supplied request. + + :param request: Request being built + :param inputs: Inputs to be assigned + :raises ValueError: if building fails + """ + try: + if inputs: + display_name = inputs[0].schema.node.displayName # type: ignore + input_class_name = display_name.split(":")[-1] + if input_class_name == "Tensor": + request.input.inputData = inputs # type: ignore + elif input_class_name == "TensorKey": + request.input.inputKeys = inputs # type: ignore + else: + raise ValueError( + "Invalid input class name. Expected 'Tensor' or 'TensorKey'." + ) + except Exception as e: + raise ValueError("Error building inputs portion of request.") from e + + @staticmethod + def _assign_outputs( + request: request_capnp.Request, + outputs: t.List[data_references_capnp.TensorKey], + ) -> None: + """ + Assigns outputs to the supplied request. + + :param request: Request being built + :param outputs: Outputs to be assigned + :raises ValueError: if building fails + """ + try: + request.output = outputs + + except Exception as e: + raise ValueError("Error building outputs portion of request.") from e + + @staticmethod + def _assign_output_descriptors( + request: request_capnp.Request, + output_descriptors: t.List[tensor_capnp.OutputDescriptor], + ) -> None: + """ + Assigns a list of output tensor descriptors to the supplied request. + + :param request: Request being built + :param output_descriptors: Output descriptors to be assigned + :raises ValueError: if building fails + """ + try: + request.outputDescriptors = output_descriptors + except Exception as e: + raise ValueError( + "Error building the output descriptors portion of request." + ) from e + + @staticmethod + def _assign_custom_request_attributes( + request: request_capnp.Request, + custom_attrs: t.Union[ + request_attributes_capnp.TorchRequestAttributes, + request_attributes_capnp.TensorFlowRequestAttributes, + None, + ], + ) -> None: + """ + Assigns request attributes to the supplied request. + + :param request: Request being built + :param custom_attrs: Custom attributes to be assigned + :raises ValueError: if building fails + """ + try: + if custom_attrs is None: + request.customAttributes.none = custom_attrs + else: + custom_attribute_class_name = ( + custom_attrs.schema.node.displayName.split(":")[-1] # type: ignore + ) + if custom_attribute_class_name == "TorchRequestAttributes": + request.customAttributes.torch = custom_attrs # type: ignore + elif custom_attribute_class_name == "TensorFlowRequestAttributes": + request.customAttributes.tf = custom_attrs # type: ignore + else: + raise ValueError("""Invalid custom attribute class name. + Expected 'TensorFlowRequestAttributes' or + 'TorchRequestAttributes'.""") + except Exception as e: + raise ValueError( + "Error building custom attributes portion of request." + ) from e + + @staticmethod + def build_request( + reply_channel: t.ByteString, + model: t.Union[data_references_capnp.ModelKey, t.ByteString], + device: "request_capnp.Device", + inputs: t.Union[ + t.List[data_references_capnp.TensorKey], t.List[tensor_capnp.Tensor] + ], + outputs: t.List[data_references_capnp.TensorKey], + output_descriptors: t.List[tensor_capnp.OutputDescriptor], + custom_attributes: t.Union[ + request_attributes_capnp.TorchRequestAttributes, + request_attributes_capnp.TensorFlowRequestAttributes, + None, + ], + ) -> request_capnp.Request: + """ + Builds the request message. + + :param reply_channel: Reply channel to be assigned to request + :param model: Model to be assigned to request + :param device: Device to be assigned to request + :param inputs: Inputs to be assigned to request + :param outputs: Outputs to be assigned to request + :param output_descriptors: Output descriptors to be assigned to request + :param custom_attributes: Custom attributes to be assigned to request + """ + request = request_capnp.Request.new_message() + MessageHandler._assign_reply_channel(request, reply_channel) + MessageHandler._assign_model(request, model) + MessageHandler._assign_device(request, device) + MessageHandler._assign_inputs(request, inputs) + MessageHandler._assign_outputs(request, outputs) + MessageHandler._assign_output_descriptors(request, output_descriptors) + MessageHandler._assign_custom_request_attributes(request, custom_attributes) + return request + + @staticmethod + def serialize_request(request: request_capnp.RequestBuilder) -> t.ByteString: + """ + Serializes a built request message. + + :param request: Request to be serialized + """ + return request.to_bytes() + + @staticmethod + def deserialize_request(request_bytes: t.ByteString) -> request_capnp.Request: + """ + Deserializes a serialized request message. + + :param request_bytes: Bytes to be deserialized into a Request + """ + bytes_message = request_capnp.Request.from_bytes(request_bytes) + + with bytes_message as message: + return message + + @staticmethod + def _assign_status( + response: response_capnp.Response, status: "response_capnp.StatusEnum" + ) -> None: + """ + Assigns a status to the supplied response. + + :param response: Response being built + :param status: Status to be assigned + :raises ValueError: if building fails + """ + try: + response.status = status + except Exception as e: + raise ValueError("Error assigning status to response.") from e + + @staticmethod + def _assign_message(response: response_capnp.Response, message: str) -> None: + """ + Assigns a message to the supplied response. + + :param response: Response being built + :param message: Message to be assigned + :raises ValueError: if building fails + """ + try: + response.message = message + except Exception as e: + raise ValueError("Error assigning message to response.") from e + + @staticmethod + def _assign_result( + response: response_capnp.Response, + result: t.Union[ + t.List[tensor_capnp.Tensor], t.List[data_references_capnp.TensorKey] + ], + ) -> None: + """ + Assigns a result to the supplied response. + + :param response: Response being built + :param result: Result to be assigned + :raises ValueError: if building fails + """ + try: + if result: + first_result = result[0] + display_name = first_result.schema.node.displayName # type: ignore + result_class_name = display_name.split(":")[-1] + if result_class_name == "Tensor": + response.result.data = result # type: ignore + elif result_class_name == "TensorKey": + response.result.keys = result # type: ignore + else: + raise ValueError("""Invalid custom attribute class name. + Expected 'Tensor' or 'TensorKey'.""") + except Exception as e: + raise ValueError("Error assigning result to response.") from e + + @staticmethod + def _assign_custom_response_attributes( + response: response_capnp.Response, + custom_attrs: t.Union[ + response_attributes_capnp.TorchResponseAttributes, + response_attributes_capnp.TensorFlowResponseAttributes, + None, + ], + ) -> None: + """ + Assigns custom attributes to the supplied response. + + :param response: Response being built + :param custom_attrs: Custom attributes to be assigned + :raises ValueError: if building fails + """ + try: + if custom_attrs is None: + response.customAttributes.none = custom_attrs + else: + custom_attribute_class_name = ( + custom_attrs.schema.node.displayName.split(":")[-1] # type: ignore + ) + if custom_attribute_class_name == "TorchResponseAttributes": + response.customAttributes.torch = custom_attrs # type: ignore + elif custom_attribute_class_name == "TensorFlowResponseAttributes": + response.customAttributes.tf = custom_attrs # type: ignore + else: + raise ValueError("""Invalid custom attribute class name. + Expected 'TensorFlowResponseAttributes' or + 'TorchResponseAttributes'.""") + except Exception as e: + raise ValueError("Error assigning custom attributes to response.") from e + + @staticmethod + def build_response( + status: "response_capnp.StatusEnum", + message: str, + result: t.Union[ + t.List[tensor_capnp.Tensor], t.List[data_references_capnp.TensorKey] + ], + custom_attributes: t.Union[ + response_attributes_capnp.TorchResponseAttributes, + response_attributes_capnp.TensorFlowResponseAttributes, + None, + ], + ) -> response_capnp.Response: + """ + Builds the response message. + + :param status: Status to be assigned to response + :param message: Message to be assigned to response + :param result: Result to be assigned to response + :param custom_attributes: Custom attributes to be assigned to response + """ + response = response_capnp.Response.new_message() + MessageHandler._assign_status(response, status) + MessageHandler._assign_message(response, message) + MessageHandler._assign_result(response, result) + MessageHandler._assign_custom_response_attributes(response, custom_attributes) + return response + + @staticmethod + def serialize_response(response: response_capnp.ResponseBuilder) -> t.ByteString: + """ + Serializes a built response message. + """ + return response.to_bytes() + + @staticmethod + def deserialize_response(response_bytes: t.ByteString) -> response_capnp.Response: + """ + Deserializes a serialized response message. + """ + bytes_message = response_capnp.Response.from_bytes(response_bytes) + + with bytes_message as message: + return message diff --git a/smartsim/_core/mli/mli_schemas/data/data_references.capnp b/smartsim/_core/mli/mli_schemas/data/data_references.capnp new file mode 100644 index 0000000000..fa35989b32 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/data/data_references.capnp @@ -0,0 +1,35 @@ +# BSD 2-Clause License + +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +@0x8ca69fd1aacb6668; + +struct ModelKey { + key @0 :Text; +} + +struct TensorKey { + key @0 :Text; +} \ No newline at end of file diff --git a/smartsim/_core/mli/mli_schemas/data/data_references_capnp.py b/smartsim/_core/mli/mli_schemas/data/data_references_capnp.py new file mode 100644 index 0000000000..de3f080116 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/data/data_references_capnp.py @@ -0,0 +1,15 @@ +"""This is an automatically generated stub for `data_references.capnp`.""" + +import os + +import capnp # type: ignore + +capnp.remove_import_hook() +here = os.path.dirname(os.path.abspath(__file__)) +module_file = os.path.abspath(os.path.join(here, "data_references.capnp")) +ModelKey = capnp.load(module_file).ModelKey +ModelKeyBuilder = ModelKey +ModelKeyReader = ModelKey +TensorKey = capnp.load(module_file).TensorKey +TensorKeyBuilder = TensorKey +TensorKeyReader = TensorKey diff --git a/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi b/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi new file mode 100644 index 0000000000..0e0edb8f99 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi @@ -0,0 +1,79 @@ +"""This is an automatically generated stub for `data_references.capnp`.""" + +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import contextmanager +from io import BufferedWriter +from typing import Iterator + +class ModelKey: + key: str + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[ModelKeyReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> ModelKeyReader: ... + @staticmethod + def new_message() -> ModelKeyBuilder: ... + def to_dict(self) -> dict: ... + +class ModelKeyReader(ModelKey): + def as_builder(self) -> ModelKeyBuilder: ... + +class ModelKeyBuilder(ModelKey): + @staticmethod + def from_dict(dictionary: dict) -> ModelKeyBuilder: ... + def copy(self) -> ModelKeyBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> ModelKeyReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + +class TensorKey: + key: str + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TensorKeyReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TensorKeyReader: ... + @staticmethod + def new_message() -> TensorKeyBuilder: ... + def to_dict(self) -> dict: ... + +class TensorKeyReader(TensorKey): + def as_builder(self) -> TensorKeyBuilder: ... + +class TensorKeyBuilder(TensorKey): + @staticmethod + def from_dict(dictionary: dict) -> TensorKeyBuilder: ... + def copy(self) -> TensorKeyBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TensorKeyReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... diff --git a/smartsim/_core/mli/mli_schemas/request/request.capnp b/smartsim/_core/mli/mli_schemas/request/request.capnp new file mode 100644 index 0000000000..446c628a4c --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/request/request.capnp @@ -0,0 +1,61 @@ +# BSD 2-Clause License + +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +@0xa27f0152c7bb299e; + +using Tensors = import "../tensor/tensor.capnp"; +using RequestAttributes = import "request_attributes/request_attributes.capnp"; +using DataRef = import "../data/data_references.capnp"; + +enum Device { + cpu @0; + gpu @1; + auto @2; +} + +struct ChannelDescriptor { + reply @0 :Data; +} + +struct Request { + replyChannel @0 :ChannelDescriptor; + model :union { + modelKey @1 :DataRef.ModelKey; + modelData @2 :Data; + } + device @3 :Device; + input :union { + inputKeys @4 :List(DataRef.TensorKey); + inputData @5 :List(Tensors.Tensor); + } + output @6 :List(DataRef.TensorKey); + outputDescriptors @7 :List(Tensors.OutputDescriptor); + customAttributes :union { + torch @8 :RequestAttributes.TorchRequestAttributes; + tf @9 :RequestAttributes.TensorFlowRequestAttributes; + none @10 :Void; + } +} \ No newline at end of file diff --git a/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes.capnp b/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes.capnp new file mode 100644 index 0000000000..bc1af14d12 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes.capnp @@ -0,0 +1,49 @@ +# BSD 2-Clause License + +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +@0xdd14d8ba5c06743f; + +enum TorchTensorType { + nested @0; # ragged + sparse @1; + tensor @2; # "normal" tensor +} + +enum TFTensorType { + ragged @0; + sparse @1; + variable @2; + constant @3; +} + +struct TorchRequestAttributes { + tensorType @0 :TorchTensorType; +} + +struct TensorFlowRequestAttributes { + name @0 :Text; + tensorType @1 :TFTensorType; +} \ No newline at end of file diff --git a/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes_capnp.py b/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes_capnp.py new file mode 100644 index 0000000000..446ee6541f --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes_capnp.py @@ -0,0 +1,15 @@ +"""This is an automatically generated stub for `request_attributes.capnp`.""" + +import os + +import capnp # type: ignore + +capnp.remove_import_hook() +here = os.path.dirname(os.path.abspath(__file__)) +module_file = os.path.abspath(os.path.join(here, "request_attributes.capnp")) +TorchRequestAttributes = capnp.load(module_file).TorchRequestAttributes +TorchRequestAttributesBuilder = TorchRequestAttributes +TorchRequestAttributesReader = TorchRequestAttributes +TensorFlowRequestAttributes = capnp.load(module_file).TensorFlowRequestAttributes +TensorFlowRequestAttributesBuilder = TensorFlowRequestAttributes +TensorFlowRequestAttributesReader = TensorFlowRequestAttributes diff --git a/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes_capnp.pyi b/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes_capnp.pyi new file mode 100644 index 0000000000..977c3e6a09 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/request/request_attributes/request_attributes_capnp.pyi @@ -0,0 +1,83 @@ +"""This is an automatically generated stub for `request_attributes.capnp`.""" + +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import contextmanager +from io import BufferedWriter +from typing import Iterator, Literal + +TorchTensorType = Literal["nested", "sparse", "tensor"] +TFTensorType = Literal["ragged", "sparse", "variable", "constant"] + +class TorchRequestAttributes: + tensorType: TorchTensorType + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TorchRequestAttributesReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TorchRequestAttributesReader: ... + @staticmethod + def new_message() -> TorchRequestAttributesBuilder: ... + def to_dict(self) -> dict: ... + +class TorchRequestAttributesReader(TorchRequestAttributes): + def as_builder(self) -> TorchRequestAttributesBuilder: ... + +class TorchRequestAttributesBuilder(TorchRequestAttributes): + @staticmethod + def from_dict(dictionary: dict) -> TorchRequestAttributesBuilder: ... + def copy(self) -> TorchRequestAttributesBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TorchRequestAttributesReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + +class TensorFlowRequestAttributes: + name: str + tensorType: TFTensorType + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TensorFlowRequestAttributesReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TensorFlowRequestAttributesReader: ... + @staticmethod + def new_message() -> TensorFlowRequestAttributesBuilder: ... + def to_dict(self) -> dict: ... + +class TensorFlowRequestAttributesReader(TensorFlowRequestAttributes): + def as_builder(self) -> TensorFlowRequestAttributesBuilder: ... + +class TensorFlowRequestAttributesBuilder(TensorFlowRequestAttributes): + @staticmethod + def from_dict(dictionary: dict) -> TensorFlowRequestAttributesBuilder: ... + def copy(self) -> TensorFlowRequestAttributesBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TensorFlowRequestAttributesReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... diff --git a/smartsim/_core/mli/mli_schemas/request/request_capnp.py b/smartsim/_core/mli/mli_schemas/request/request_capnp.py new file mode 100644 index 0000000000..d8370b662d --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/request/request_capnp.py @@ -0,0 +1,15 @@ +"""This is an automatically generated stub for `request.capnp`.""" + +import os + +import capnp # type: ignore + +capnp.remove_import_hook() +here = os.path.dirname(os.path.abspath(__file__)) +module_file = os.path.abspath(os.path.join(here, "request.capnp")) +ChannelDescriptor = capnp.load(module_file).ChannelDescriptor +ChannelDescriptorBuilder = ChannelDescriptor +ChannelDescriptorReader = ChannelDescriptor +Request = capnp.load(module_file).Request +RequestBuilder = Request +RequestReader = Request diff --git a/smartsim/_core/mli/mli_schemas/request/request_capnp.pyi b/smartsim/_core/mli/mli_schemas/request/request_capnp.pyi new file mode 100644 index 0000000000..5d622d4e6d --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/request/request_capnp.pyi @@ -0,0 +1,286 @@ +"""This is an automatically generated stub for `request.capnp`.""" + +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import contextmanager +from io import BufferedWriter +from typing import Iterator, Literal, Sequence, overload + +from ..data.data_references_capnp import ( + ModelKey, + ModelKeyBuilder, + ModelKeyReader, + TensorKey, + TensorKeyBuilder, + TensorKeyReader, +) +from ..tensor.tensor_capnp import ( + OutputDescriptor, + OutputDescriptorBuilder, + OutputDescriptorReader, + Tensor, + TensorBuilder, + TensorReader, +) +from .request_attributes.request_attributes_capnp import ( + TensorFlowRequestAttributes, + TensorFlowRequestAttributesBuilder, + TensorFlowRequestAttributesReader, + TorchRequestAttributes, + TorchRequestAttributesBuilder, + TorchRequestAttributesReader, +) + +Device = Literal["cpu", "gpu", "auto"] + +class ChannelDescriptor: + reply: bytes + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[ChannelDescriptorReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> ChannelDescriptorReader: ... + @staticmethod + def new_message() -> ChannelDescriptorBuilder: ... + def to_dict(self) -> dict: ... + +class ChannelDescriptorReader(ChannelDescriptor): + def as_builder(self) -> ChannelDescriptorBuilder: ... + +class ChannelDescriptorBuilder(ChannelDescriptor): + @staticmethod + def from_dict(dictionary: dict) -> ChannelDescriptorBuilder: ... + def copy(self) -> ChannelDescriptorBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> ChannelDescriptorReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + +class Request: + class Model: + modelKey: ModelKey | ModelKeyBuilder | ModelKeyReader + modelData: bytes + def which(self) -> Literal["modelKey", "modelData"]: ... + def init(self, name: Literal["modelKey"]) -> ModelKey: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[Request.ModelReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Request.ModelReader: ... + @staticmethod + def new_message() -> Request.ModelBuilder: ... + def to_dict(self) -> dict: ... + + class ModelReader(Request.Model): + modelKey: ModelKeyReader + def as_builder(self) -> Request.ModelBuilder: ... + + class ModelBuilder(Request.Model): + modelKey: ModelKey | ModelKeyBuilder | ModelKeyReader + @staticmethod + def from_dict(dictionary: dict) -> Request.ModelBuilder: ... + def copy(self) -> Request.ModelBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> Request.ModelReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + + class Input: + inputKeys: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + inputData: Sequence[Tensor | TensorBuilder | TensorReader] + def which(self) -> Literal["inputKeys", "inputData"]: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[Request.InputReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Request.InputReader: ... + @staticmethod + def new_message() -> Request.InputBuilder: ... + def to_dict(self) -> dict: ... + + class InputReader(Request.Input): + inputKeys: Sequence[TensorKeyReader] + inputData: Sequence[TensorReader] + def as_builder(self) -> Request.InputBuilder: ... + + class InputBuilder(Request.Input): + inputKeys: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + inputData: Sequence[Tensor | TensorBuilder | TensorReader] + @staticmethod + def from_dict(dictionary: dict) -> Request.InputBuilder: ... + def copy(self) -> Request.InputBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> Request.InputReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + + class CustomAttributes: + torch: ( + TorchRequestAttributes + | TorchRequestAttributesBuilder + | TorchRequestAttributesReader + ) + tf: ( + TensorFlowRequestAttributes + | TensorFlowRequestAttributesBuilder + | TensorFlowRequestAttributesReader + ) + none: None + def which(self) -> Literal["torch", "tf", "none"]: ... + @overload + def init(self, name: Literal["torch"]) -> TorchRequestAttributes: ... + @overload + def init(self, name: Literal["tf"]) -> TensorFlowRequestAttributes: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[Request.CustomAttributesReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Request.CustomAttributesReader: ... + @staticmethod + def new_message() -> Request.CustomAttributesBuilder: ... + def to_dict(self) -> dict: ... + + class CustomAttributesReader(Request.CustomAttributes): + torch: TorchRequestAttributesReader + tf: TensorFlowRequestAttributesReader + def as_builder(self) -> Request.CustomAttributesBuilder: ... + + class CustomAttributesBuilder(Request.CustomAttributes): + torch: ( + TorchRequestAttributes + | TorchRequestAttributesBuilder + | TorchRequestAttributesReader + ) + tf: ( + TensorFlowRequestAttributes + | TensorFlowRequestAttributesBuilder + | TensorFlowRequestAttributesReader + ) + @staticmethod + def from_dict(dictionary: dict) -> Request.CustomAttributesBuilder: ... + def copy(self) -> Request.CustomAttributesBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> Request.CustomAttributesReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + replyChannel: ChannelDescriptor | ChannelDescriptorBuilder | ChannelDescriptorReader + model: Request.Model | Request.ModelBuilder | Request.ModelReader + device: Device + input: Request.Input | Request.InputBuilder | Request.InputReader + output: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + outputDescriptors: Sequence[ + OutputDescriptor | OutputDescriptorBuilder | OutputDescriptorReader + ] + customAttributes: ( + Request.CustomAttributes + | Request.CustomAttributesBuilder + | Request.CustomAttributesReader + ) + @overload + def init(self, name: Literal["replyChannel"]) -> ChannelDescriptor: ... + @overload + def init(self, name: Literal["model"]) -> Model: ... + @overload + def init(self, name: Literal["input"]) -> Input: ... + @overload + def init(self, name: Literal["customAttributes"]) -> CustomAttributes: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[RequestReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> RequestReader: ... + @staticmethod + def new_message() -> RequestBuilder: ... + def to_dict(self) -> dict: ... + +class RequestReader(Request): + replyChannel: ChannelDescriptorReader + model: Request.ModelReader + input: Request.InputReader + output: Sequence[TensorKeyReader] + outputDescriptors: Sequence[OutputDescriptorReader] + customAttributes: Request.CustomAttributesReader + def as_builder(self) -> RequestBuilder: ... + +class RequestBuilder(Request): + replyChannel: ChannelDescriptor | ChannelDescriptorBuilder | ChannelDescriptorReader + model: Request.Model | Request.ModelBuilder | Request.ModelReader + input: Request.Input | Request.InputBuilder | Request.InputReader + output: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + outputDescriptors: Sequence[ + OutputDescriptor | OutputDescriptorBuilder | OutputDescriptorReader + ] + customAttributes: ( + Request.CustomAttributes + | Request.CustomAttributesBuilder + | Request.CustomAttributesReader + ) + @staticmethod + def from_dict(dictionary: dict) -> RequestBuilder: ... + def copy(self) -> RequestBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> RequestReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... diff --git a/smartsim/_core/mli/mli_schemas/response/response.capnp b/smartsim/_core/mli/mli_schemas/response/response.capnp new file mode 100644 index 0000000000..0c5cee1a1c --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/response/response.capnp @@ -0,0 +1,51 @@ +# BSD 2-Clause License + +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +@0xa05dcb4444780705; + +using Tensors = import "../tensor/tensor.capnp"; +using ResponseAttributes = import "response_attributes/response_attributes.capnp"; +using DataRef = import "../data/data_references.capnp"; + +enum StatusEnum { + complete @0; + fail @1; + timeout @2; +} + +struct Response { + status @0 :StatusEnum; + message @1 :Text; + result :union { + keys @2 :List(DataRef.TensorKey); + data @3 :List(Tensors.Tensor); + } + customAttributes :union { + torch @4 :ResponseAttributes.TorchResponseAttributes; + tf @5 :ResponseAttributes.TensorFlowResponseAttributes; + none @6 :Void; + } +} \ No newline at end of file diff --git a/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes.capnp b/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes.capnp new file mode 100644 index 0000000000..59acd60312 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes.capnp @@ -0,0 +1,33 @@ +# BSD 2-Clause License + +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +@0xee59c60fccbb1bf9; + +struct TorchResponseAttributes { +} + +struct TensorFlowResponseAttributes { +} \ No newline at end of file diff --git a/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes_capnp.py b/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes_capnp.py new file mode 100644 index 0000000000..3df1115b47 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes_capnp.py @@ -0,0 +1,15 @@ +"""This is an automatically generated stub for `response_attributes.capnp`.""" + +import os + +import capnp # type: ignore + +capnp.remove_import_hook() +here = os.path.dirname(os.path.abspath(__file__)) +module_file = os.path.abspath(os.path.join(here, "response_attributes.capnp")) +TorchResponseAttributes = capnp.load(module_file).TorchResponseAttributes +TorchResponseAttributesBuilder = TorchResponseAttributes +TorchResponseAttributesReader = TorchResponseAttributes +TensorFlowResponseAttributes = capnp.load(module_file).TensorFlowResponseAttributes +TensorFlowResponseAttributesBuilder = TensorFlowResponseAttributes +TensorFlowResponseAttributesReader = TensorFlowResponseAttributes diff --git a/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes_capnp.pyi b/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes_capnp.pyi new file mode 100644 index 0000000000..63c2218ff4 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/response/response_attributes/response_attributes_capnp.pyi @@ -0,0 +1,77 @@ +"""This is an automatically generated stub for `response_attributes.capnp`.""" + +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import contextmanager +from io import BufferedWriter +from typing import Iterator + +class TorchResponseAttributes: + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TorchResponseAttributesReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TorchResponseAttributesReader: ... + @staticmethod + def new_message() -> TorchResponseAttributesBuilder: ... + def to_dict(self) -> dict: ... + +class TorchResponseAttributesReader(TorchResponseAttributes): + def as_builder(self) -> TorchResponseAttributesBuilder: ... + +class TorchResponseAttributesBuilder(TorchResponseAttributes): + @staticmethod + def from_dict(dictionary: dict) -> TorchResponseAttributesBuilder: ... + def copy(self) -> TorchResponseAttributesBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TorchResponseAttributesReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + +class TensorFlowResponseAttributes: + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TensorFlowResponseAttributesReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TensorFlowResponseAttributesReader: ... + @staticmethod + def new_message() -> TensorFlowResponseAttributesBuilder: ... + def to_dict(self) -> dict: ... + +class TensorFlowResponseAttributesReader(TensorFlowResponseAttributes): + def as_builder(self) -> TensorFlowResponseAttributesBuilder: ... + +class TensorFlowResponseAttributesBuilder(TensorFlowResponseAttributes): + @staticmethod + def from_dict(dictionary: dict) -> TensorFlowResponseAttributesBuilder: ... + def copy(self) -> TensorFlowResponseAttributesBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TensorFlowResponseAttributesReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... diff --git a/smartsim/_core/mli/mli_schemas/response/response_capnp.py b/smartsim/_core/mli/mli_schemas/response/response_capnp.py new file mode 100644 index 0000000000..5762408272 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/response/response_capnp.py @@ -0,0 +1,12 @@ +"""This is an automatically generated stub for `response.capnp`.""" + +import os + +import capnp # type: ignore + +capnp.remove_import_hook() +here = os.path.dirname(os.path.abspath(__file__)) +module_file = os.path.abspath(os.path.join(here, "response.capnp")) +Response = capnp.load(module_file).Response +ResponseBuilder = Response +ResponseReader = Response diff --git a/smartsim/_core/mli/mli_schemas/response/response_capnp.pyi b/smartsim/_core/mli/mli_schemas/response/response_capnp.pyi new file mode 100644 index 0000000000..194c50d1c5 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/response/response_capnp.pyi @@ -0,0 +1,178 @@ +"""This is an automatically generated stub for `response.capnp`.""" + +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import contextmanager +from io import BufferedWriter +from typing import Iterator, Literal, Sequence, overload + +from ..data.data_references_capnp import TensorKey, TensorKeyBuilder, TensorKeyReader +from ..tensor.tensor_capnp import Tensor, TensorBuilder, TensorReader +from .response_attributes.response_attributes_capnp import ( + TensorFlowResponseAttributes, + TensorFlowResponseAttributesBuilder, + TensorFlowResponseAttributesReader, + TorchResponseAttributes, + TorchResponseAttributesBuilder, + TorchResponseAttributesReader, +) + +StatusEnum = Literal["complete", "fail", "timeout"] + +class Response: + class Result: + keys: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + data: Sequence[Tensor | TensorBuilder | TensorReader] + def which(self) -> Literal["keys", "data"]: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[Response.ResultReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Response.ResultReader: ... + @staticmethod + def new_message() -> Response.ResultBuilder: ... + def to_dict(self) -> dict: ... + + class ResultReader(Response.Result): + keys: Sequence[TensorKeyReader] + data: Sequence[TensorReader] + def as_builder(self) -> Response.ResultBuilder: ... + + class ResultBuilder(Response.Result): + keys: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + data: Sequence[Tensor | TensorBuilder | TensorReader] + @staticmethod + def from_dict(dictionary: dict) -> Response.ResultBuilder: ... + def copy(self) -> Response.ResultBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> Response.ResultReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + + class CustomAttributes: + torch: ( + TorchResponseAttributes + | TorchResponseAttributesBuilder + | TorchResponseAttributesReader + ) + tf: ( + TensorFlowResponseAttributes + | TensorFlowResponseAttributesBuilder + | TensorFlowResponseAttributesReader + ) + none: None + def which(self) -> Literal["torch", "tf", "none"]: ... + @overload + def init(self, name: Literal["torch"]) -> TorchResponseAttributes: ... + @overload + def init(self, name: Literal["tf"]) -> TensorFlowResponseAttributes: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[Response.CustomAttributesReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Response.CustomAttributesReader: ... + @staticmethod + def new_message() -> Response.CustomAttributesBuilder: ... + def to_dict(self) -> dict: ... + + class CustomAttributesReader(Response.CustomAttributes): + torch: TorchResponseAttributesReader + tf: TensorFlowResponseAttributesReader + def as_builder(self) -> Response.CustomAttributesBuilder: ... + + class CustomAttributesBuilder(Response.CustomAttributes): + torch: ( + TorchResponseAttributes + | TorchResponseAttributesBuilder + | TorchResponseAttributesReader + ) + tf: ( + TensorFlowResponseAttributes + | TensorFlowResponseAttributesBuilder + | TensorFlowResponseAttributesReader + ) + @staticmethod + def from_dict(dictionary: dict) -> Response.CustomAttributesBuilder: ... + def copy(self) -> Response.CustomAttributesBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> Response.CustomAttributesReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + status: StatusEnum + message: str + result: Response.Result | Response.ResultBuilder | Response.ResultReader + customAttributes: ( + Response.CustomAttributes + | Response.CustomAttributesBuilder + | Response.CustomAttributesReader + ) + @overload + def init(self, name: Literal["result"]) -> Result: ... + @overload + def init(self, name: Literal["customAttributes"]) -> CustomAttributes: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[ResponseReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> ResponseReader: ... + @staticmethod + def new_message() -> ResponseBuilder: ... + def to_dict(self) -> dict: ... + +class ResponseReader(Response): + result: Response.ResultReader + customAttributes: Response.CustomAttributesReader + def as_builder(self) -> ResponseBuilder: ... + +class ResponseBuilder(Response): + result: Response.Result | Response.ResultBuilder | Response.ResultReader + customAttributes: ( + Response.CustomAttributes + | Response.CustomAttributesBuilder + | Response.CustomAttributesReader + ) + @staticmethod + def from_dict(dictionary: dict) -> ResponseBuilder: ... + def copy(self) -> ResponseBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> ResponseReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... diff --git a/smartsim/_core/mli/mli_schemas/tensor/tensor.capnp b/smartsim/_core/mli/mli_schemas/tensor/tensor.capnp new file mode 100644 index 0000000000..0097a0f9bb --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/tensor/tensor.capnp @@ -0,0 +1,80 @@ +# BSD 2-Clause License + +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +@0x9a0aeb2e04838fb1; + +using DataRef = import "../data/data_references.capnp"; + +enum Order { + c @0; # row major (contiguous layout) + f @1; # column major (fortran contiguous layout) +} + +enum NumericalType { + int8 @0; + int16 @1; + int32 @2; + int64 @3; + uInt8 @4; + uInt16 @5; + uInt32 @6; + uInt64 @7; + float32 @8; + float64 @9; +} + +enum ReturnNumericalType { + int8 @0; + int16 @1; + int32 @2; + int64 @3; + uInt8 @4; + uInt16 @5; + uInt32 @6; + uInt64 @7; + float32 @8; + float64 @9; + none @10; + auto @ 11; +} + +struct Tensor { + blob @0 :Data; + tensorDescriptor @1 :TensorDescriptor; +} + +struct TensorDescriptor { + dimensions @0 :List(Int32); + order @1 :Order; + dataType @2 :NumericalType; +} + +struct OutputDescriptor { + order @0 :Order; + optionalKeys @1 :List(DataRef.TensorKey); + optionalDimension @2 :List(Int32); + optionalDatatype @3 :ReturnNumericalType; +} \ No newline at end of file diff --git a/smartsim/_core/mli/mli_schemas/tensor/tensor_capnp.py b/smartsim/_core/mli/mli_schemas/tensor/tensor_capnp.py new file mode 100644 index 0000000000..a3938bda53 --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/tensor/tensor_capnp.py @@ -0,0 +1,18 @@ +"""This is an automatically generated stub for `tensor.capnp`.""" + +import os + +import capnp # type: ignore + +capnp.remove_import_hook() +here = os.path.dirname(os.path.abspath(__file__)) +module_file = os.path.abspath(os.path.join(here, "tensor.capnp")) +Tensor = capnp.load(module_file).Tensor +TensorBuilder = Tensor +TensorReader = Tensor +TensorDescriptor = capnp.load(module_file).TensorDescriptor +TensorDescriptorBuilder = TensorDescriptor +TensorDescriptorReader = TensorDescriptor +OutputDescriptor = capnp.load(module_file).OutputDescriptor +OutputDescriptorBuilder = OutputDescriptor +OutputDescriptorReader = OutputDescriptor diff --git a/smartsim/_core/mli/mli_schemas/tensor/tensor_capnp.pyi b/smartsim/_core/mli/mli_schemas/tensor/tensor_capnp.pyi new file mode 100644 index 0000000000..462911afdf --- /dev/null +++ b/smartsim/_core/mli/mli_schemas/tensor/tensor_capnp.pyi @@ -0,0 +1,159 @@ +"""This is an automatically generated stub for `tensor.capnp`.""" + +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import contextmanager +from io import BufferedWriter +from typing import Iterator, Literal, Sequence + +from ..data.data_references_capnp import TensorKey, TensorKeyBuilder, TensorKeyReader + +Order = Literal["c", "f"] +NumericalType = Literal[ + "int8", + "int16", + "int32", + "int64", + "uInt8", + "uInt16", + "uInt32", + "uInt64", + "float32", + "float64", +] +ReturnNumericalType = Literal[ + "int8", + "int16", + "int32", + "int64", + "uInt8", + "uInt16", + "uInt32", + "uInt64", + "float32", + "float64", + "none", + "auto", +] + +class TensorDescriptor: + dimensions: Sequence[int] + order: Order + dataType: NumericalType + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TensorDescriptorReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TensorDescriptorReader: ... + @staticmethod + def new_message() -> TensorDescriptorBuilder: ... + def to_dict(self) -> dict: ... + +class TensorDescriptorReader(TensorDescriptor): + def as_builder(self) -> TensorDescriptorBuilder: ... + +class TensorDescriptorBuilder(TensorDescriptor): + @staticmethod + def from_dict(dictionary: dict) -> TensorDescriptorBuilder: ... + def copy(self) -> TensorDescriptorBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TensorDescriptorReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + +class Tensor: + blob: bytes + tensorDescriptor: ( + TensorDescriptor | TensorDescriptorBuilder | TensorDescriptorReader + ) + def init(self, name: Literal["tensorDescriptor"]) -> TensorDescriptor: ... + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[TensorReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> TensorReader: ... + @staticmethod + def new_message() -> TensorBuilder: ... + def to_dict(self) -> dict: ... + +class TensorReader(Tensor): + tensorDescriptor: TensorDescriptorReader + def as_builder(self) -> TensorBuilder: ... + +class TensorBuilder(Tensor): + tensorDescriptor: ( + TensorDescriptor | TensorDescriptorBuilder | TensorDescriptorReader + ) + @staticmethod + def from_dict(dictionary: dict) -> TensorBuilder: ... + def copy(self) -> TensorBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> TensorReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... + +class OutputDescriptor: + order: Order + optionalKeys: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + optionalDimension: Sequence[int] + optionalDatatype: ReturnNumericalType + @staticmethod + @contextmanager + def from_bytes( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> Iterator[OutputDescriptorReader]: ... + @staticmethod + def from_bytes_packed( + data: bytes, + traversal_limit_in_words: int | None = ..., + nesting_limit: int | None = ..., + ) -> OutputDescriptorReader: ... + @staticmethod + def new_message() -> OutputDescriptorBuilder: ... + def to_dict(self) -> dict: ... + +class OutputDescriptorReader(OutputDescriptor): + optionalKeys: Sequence[TensorKeyReader] + def as_builder(self) -> OutputDescriptorBuilder: ... + +class OutputDescriptorBuilder(OutputDescriptor): + optionalKeys: Sequence[TensorKey | TensorKeyBuilder | TensorKeyReader] + @staticmethod + def from_dict(dictionary: dict) -> OutputDescriptorBuilder: ... + def copy(self) -> OutputDescriptorBuilder: ... + def to_bytes(self) -> bytes: ... + def to_bytes_packed(self) -> bytes: ... + def to_segments(self) -> list[bytes]: ... + def as_reader(self) -> OutputDescriptorReader: ... + @staticmethod + def write(file: BufferedWriter) -> None: ... + @staticmethod + def write_packed(file: BufferedWriter) -> None: ... diff --git a/tests/test_message_handler/__init__.py b/tests/test_message_handler/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/test_message_handler/test_build_model_key.py b/tests/test_message_handler/test_build_model_key.py new file mode 100644 index 0000000000..135e967983 --- /dev/null +++ b/tests/test_message_handler/test_build_model_key.py @@ -0,0 +1,44 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim._core.mli.message_handler import MessageHandler + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +handler = MessageHandler() + + +def test_build_model_key_successful(): + model_key = handler.build_model_key("tensor_key") + assert model_key.key == "tensor_key" + + +def test_build_model_key_unsuccessful(): + with pytest.raises(ValueError): + model_key = handler.build_model_key(100) diff --git a/tests/test_message_handler/test_build_request_attributes.py b/tests/test_message_handler/test_build_request_attributes.py new file mode 100644 index 0000000000..5b1e09b0aa --- /dev/null +++ b/tests/test_message_handler/test_build_request_attributes.py @@ -0,0 +1,55 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim._core.mli.message_handler import MessageHandler + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +handler = MessageHandler() + + +def test_build_torch_request_attributes_successful(): + attribute = handler.build_torch_request_attributes("sparse") + assert attribute.tensorType == "sparse" + + +def test_build_torch_request_attributes_unsuccessful(): + with pytest.raises(ValueError): + attribute = handler.build_torch_request_attributes("invalid!") + + +def test_build_tf_request_attributes_successful(): + attribute = handler.build_tf_request_attributes(name="tfcnn", tensor_type="sparse") + assert attribute.tensorType == "sparse" + assert attribute.name == "tfcnn" + + +def test_build_tf_request_attributes_unsuccessful(): + with pytest.raises(ValueError): + attribute = handler.build_tf_request_attributes("tf_fail", "invalid!") diff --git a/tests/test_message_handler/test_build_tensor.py b/tests/test_message_handler/test_build_tensor.py new file mode 100644 index 0000000000..aa7bd4e6e2 --- /dev/null +++ b/tests/test_message_handler/test_build_tensor.py @@ -0,0 +1,185 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +try: + import tensorflow as tf +except ImportError: + should_run_tf = False +else: + should_run_tf = True + + small_tf_tensor = tf.zeros((3, 2, 5), dtype=tf.int8) + small_tf_tensor = small_tf_tensor.numpy() + medium_tf_tensor = tf.ones((1040, 1040, 3), dtype=tf.int64) + medium_tf_tensor = medium_tf_tensor.numpy() + + +try: + import torch +except ImportError: + should_run_torch = False +else: + should_run_torch = True + + small_torch_tensor = torch.zeros((3, 2, 5), dtype=torch.int8) + small_torch_tensor = small_torch_tensor.numpy() + medium_torch_tensor = torch.ones((1040, 1040, 3), dtype=torch.int64) + medium_torch_tensor = medium_torch_tensor.numpy() + +from smartsim._core.mli.message_handler import MessageHandler + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +handler = MessageHandler() + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "tensor, dtype, order, dimension", + [ + pytest.param( + small_torch_tensor, + "int8", + "c", + [3, 2, 5], + id="small torch tensor", + ), + pytest.param( + medium_torch_tensor, + "int64", + "c", + [1040, 1040, 3], + id="medium torch tensor", + ), + ], +) +def test_build_torch_tensor_successful(tensor, dtype, order, dimension): + built_tensor = handler.build_tensor(tensor, order, dtype, dimension) + assert built_tensor is not None + assert type(built_tensor.blob) == bytes + assert built_tensor.tensorDescriptor.order == order + assert built_tensor.tensorDescriptor.dataType == dtype + for i, j in zip(built_tensor.tensorDescriptor.dimensions, dimension): + assert i == j + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "tensor, dtype, order, dimension", + [ + pytest.param( + small_tf_tensor, + "int8", + "c", + [3, 2, 5], + id="small tf tensor", + ), + pytest.param( + medium_tf_tensor, + "int64", + "c", + [1040, 1040, 3], + id="medium tf tensor", + ), + ], +) +def test_build_tf_tensor_successful(tensor, dtype, order, dimension): + built_tensor = handler.build_tensor(tensor, order, dtype, dimension) + assert built_tensor is not None + assert type(built_tensor.blob) == bytes + assert built_tensor.tensorDescriptor.order == order + assert built_tensor.tensorDescriptor.dataType == dtype + for i, j in zip(built_tensor.tensorDescriptor.dimensions, dimension): + assert i == j + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "tensor, dtype, order, dimension", + [ + pytest.param([1, 2, 4], "c", "int8", [1, 2, 3], id="bad tensor type"), + pytest.param( + small_torch_tensor, + "bad_order", + "int8", + [3, 2, 5], + id="bad order type", + ), + pytest.param( + small_torch_tensor, + "f", + "bad_num_type", + [3, 2, 5], + id="bad numerical type", + ), + pytest.param( + small_torch_tensor, + "f", + "int8", + "bad shape type", + id="bad shape type", + ), + ], +) +def test_build_torch_tensor_bad_input(tensor, dtype, order, dimension): + with pytest.raises(ValueError): + built_tensor = handler.build_tensor(tensor, order, dtype, dimension) + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "tensor, dtype, order, dimension", + [ + pytest.param([1, 2, 4], "c", "int8", [1, 2, 3], id="bad tensor type"), + pytest.param( + small_tf_tensor, + "bad_order", + "int8", + [3, 2, 5], + id="bad order type", + ), + pytest.param( + small_tf_tensor, + "f", + "bad_num_type", + [3, 2, 5], + id="bad numerical type", + ), + pytest.param( + small_tf_tensor, + "f", + "int8", + "bad shape type", + id="bad shape type", + ), + ], +) +def test_build_tf_tensor_bad_input(tensor, dtype, order, dimension): + with pytest.raises(ValueError): + built_tensor = handler.build_tensor(tensor, order, dtype, dimension) diff --git a/tests/test_message_handler/test_build_tensor_key.py b/tests/test_message_handler/test_build_tensor_key.py new file mode 100644 index 0000000000..7abe9e853d --- /dev/null +++ b/tests/test_message_handler/test_build_tensor_key.py @@ -0,0 +1,44 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim._core.mli.message_handler import MessageHandler + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +handler = MessageHandler() + + +def test_build_tensor_key_successful(): + tensor_key = handler.build_tensor_key("tensor_key") + assert tensor_key.key == "tensor_key" + + +def test_build_tensor_key_unsuccessful(): + with pytest.raises(ValueError): + tensor_key = handler.build_tensor_key(100) diff --git a/tests/test_message_handler/test_output_descriptor.py b/tests/test_message_handler/test_output_descriptor.py new file mode 100644 index 0000000000..fd21eeb0d5 --- /dev/null +++ b/tests/test_message_handler/test_output_descriptor.py @@ -0,0 +1,77 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim._core.mli.message_handler import MessageHandler + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +handler = MessageHandler() + +tensor_key = handler.build_tensor_key("key") + + +@pytest.mark.parametrize( + "order, keys, dtype, dimension", + [ + pytest.param("c", [tensor_key], "int8", [1, 2, 3, 4], id="all specified"), + pytest.param( + "c", [tensor_key, tensor_key], "none", [1, 2, 3, 4], id="none dtype" + ), + pytest.param("c", [tensor_key], "int8", [], id="empty dimensions"), + pytest.param("c", [], "int8", [1, 2, 3, 4], id="empty keys"), + ], +) +def test_build_output_tensor_descriptor_successful(dtype, keys, order, dimension): + built_descriptor = handler.build_output_tensor_descriptor( + order, keys, dtype, dimension + ) + assert built_descriptor is not None + assert built_descriptor.order == order + assert len(built_descriptor.optionalKeys) == len(keys) + assert built_descriptor.optionalDatatype == dtype + for i, j in zip(built_descriptor.optionalDimension, dimension): + assert i == j + + +@pytest.mark.parametrize( + "order, keys, dtype, dimension", + [ + pytest.param("bad_order", [], "int8", [3, 2, 5], id="bad order type"), + pytest.param( + "f", [tensor_key], "bad_num_type", [3, 2, 5], id="bad numerical type" + ), + pytest.param("f", [tensor_key], "int8", "bad shape type", id="bad shape type"), + pytest.param("f", ["tensor_key"], "int8", [3, 2, 5], id="bad key type"), + ], +) +def test_build_output_tensor_descriptor_unsuccessful(order, keys, dtype, dimension): + with pytest.raises(ValueError): + built_tensor = handler.build_output_tensor_descriptor( + order, keys, dtype, dimension + ) diff --git a/tests/test_message_handler/test_request.py b/tests/test_message_handler/test_request.py new file mode 100644 index 0000000000..d33a0376a8 --- /dev/null +++ b/tests/test_message_handler/test_request.py @@ -0,0 +1,906 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim._core.mli.message_handler import MessageHandler + +try: + import tensorflow as tf +except ImportError: + should_run_tf = False +else: + should_run_tf = True + tflow1 = tf.zeros((3, 2, 5), dtype=tf.int8) + tflow2 = tf.ones((10, 10, 3), dtype=tf.int64) + + tensor_3 = MessageHandler.build_tensor( + tflow1.numpy(), "c", "int8", list(tflow1.shape) + ) + tensor_4 = MessageHandler.build_tensor( + tflow2.numpy(), "c", "int64", list(tflow2.shape) + ) + + tf_attributes = MessageHandler.build_tf_request_attributes( + name="tf", tensor_type="sparse" + ) + + +try: + import torch +except ImportError: + should_run_torch = False +else: + should_run_torch = True + + torch1 = torch.zeros((3, 2, 5), dtype=torch.int8) + torch2 = torch.ones((10, 10, 3), dtype=torch.int64) + + tensor_1 = MessageHandler.build_tensor( + torch1.numpy(), "c", "int8", list(torch1.shape) + ) + tensor_2 = MessageHandler.build_tensor( + torch2.numpy(), "c", "int64", list(torch2.shape) + ) + + torch_attributes = MessageHandler.build_torch_request_attributes("sparse") + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +model_key = MessageHandler.build_model_key("model_key") + +input_key1 = MessageHandler.build_tensor_key("input_key1") +input_key2 = MessageHandler.build_tensor_key("input_key2") + +output_key1 = MessageHandler.build_tensor_key("output_key1") +output_key2 = MessageHandler.build_tensor_key("output_key2") + +output_descriptor1 = MessageHandler.build_output_tensor_descriptor( + "c", [output_key1, output_key2], "int64", [] +) +output_descriptor2 = MessageHandler.build_output_tensor_descriptor("f", [], "auto", []) +output_descriptor3 = MessageHandler.build_output_tensor_descriptor( + "c", [output_key1], "none", [1, 2, 3] +) + + +if should_run_tf: + tf_indirect_request = MessageHandler.build_request( + b"reply", + b"model", + "cpu", + [input_key1, input_key2], + [output_key1, output_key2], + [output_descriptor1, output_descriptor2, output_descriptor3], + tf_attributes, + ) + + tf_direct_request = MessageHandler.build_request( + b"reply", + b"model", + "cpu", + [tensor_3, tensor_4], + [], + [output_descriptor1, output_descriptor2], + tf_attributes, + ) + +if should_run_torch: + torch_indirect_request = MessageHandler.build_request( + b"reply", + b"model", + "cpu", + [input_key1, input_key2], + [output_key1, output_key2], + [output_descriptor1, output_descriptor2, output_descriptor3], + torch_attributes, + ) + torch_direct_request = MessageHandler.build_request( + b"reply", + b"model", + "cpu", + [tensor_1, tensor_2], + [], + [output_descriptor1, output_descriptor2], + torch_attributes, + ) + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1, input_key2], + [output_key1, output_key2], + [output_descriptor1], + tf_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "gpu", + [input_key1], + [output_key2], + [output_descriptor1], + tf_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "auto", + [input_key1], + [output_key2], + [output_descriptor1], + tf_attributes, + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1], + [output_descriptor1], + None, + ), + ], +) +def test_build_request_indirect_tf_successful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + assert built_request is not None + assert built_request.replyChannel.reply == reply_channel + if built_request.model.which() == "modelKey": + assert built_request.model.modelKey.key == model.key + else: + assert built_request.model.modelData == model + assert built_request.device == device + assert built_request.input.which() == "inputKeys" + assert built_request.input.inputKeys[0].key == input[0].key + assert len(built_request.input.inputKeys) == len(input) + assert len(built_request.output) == len(output) + for i, j in zip(built_request.outputDescriptors, output_descriptors): + assert i.order == j.order + if built_request.customAttributes.which() == "tf": + assert ( + built_request.customAttributes.tf.tensorType == custom_attributes.tensorType + ) + elif built_request.customAttributes.which() == "torch": + assert ( + built_request.customAttributes.torch.tensorType + == custom_attributes.tensorType + ) + else: + assert built_request.customAttributes.none == custom_attributes + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1, input_key2], + [output_key1, output_key2], + [output_descriptor1], + torch_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "gpu", + [input_key1], + [output_key2], + [output_descriptor1], + torch_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "auto", + [input_key1], + [output_key2], + [output_descriptor1], + torch_attributes, + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1], + [output_descriptor1], + None, + ), + ], +) +def test_build_request_indirect_torch_successful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + assert built_request is not None + assert built_request.replyChannel.reply == reply_channel + if built_request.model.which() == "modelKey": + assert built_request.model.modelKey.key == model.key + else: + assert built_request.model.modelData == model + assert built_request.device == device + assert built_request.input.which() == "inputKeys" + assert built_request.input.inputKeys[0].key == input[0].key + assert len(built_request.input.inputKeys) == len(input) + assert len(built_request.output) == len(output) + for i, j in zip(built_request.outputDescriptors, output_descriptors): + assert i.order == j.order + if built_request.customAttributes.which() == "tf": + assert ( + built_request.customAttributes.tf.tensorType == custom_attributes.tensorType + ) + elif built_request.customAttributes.which() == "torch": + assert ( + built_request.customAttributes.torch.tensorType + == custom_attributes.tensorType + ) + else: + assert built_request.customAttributes.none == custom_attributes + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + [], + model_key, + "cpu", + [input_key1, input_key2], + [output_key1, output_key2], + [output_descriptor1], + torch_attributes, + id="bad channel", + ), + pytest.param( + b"reply channel", + "bad model", + "gpu", + [input_key1], + [output_key2], + [output_descriptor1], + torch_attributes, + id="bad model", + ), + pytest.param( + b"reply channel", + model_key, + "bad device", + [input_key1], + [output_key2], + [output_descriptor1], + torch_attributes, + id="bad device", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + ["input_key1", "input_key2"], + [output_key1, output_key2], + [output_descriptor1], + torch_attributes, + id="bad inputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [model_key], + [output_key1, output_key2], + [output_descriptor1], + torch_attributes, + id="bad input schema type", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + ["output_key1", "output_key2"], + [output_descriptor1], + torch_attributes, + id="bad outputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [model_key], + [output_descriptor1], + torch_attributes, + id="bad output schema type", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1, output_key2], + [output_descriptor1], + "bad attributes", + id="bad custom attributes", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1, output_key2], + [output_descriptor1], + model_key, + id="bad custom attributes schema type", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1, output_key2], + "bad descriptors", + torch_attributes, + id="bad output descriptors", + ), + ], +) +def test_build_request_indirect_torch_unsuccessful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + with pytest.raises(ValueError): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + [], + model_key, + "cpu", + [input_key1, input_key2], + [output_key1, output_key2], + [output_descriptor1], + tf_attributes, + id="bad channel", + ), + pytest.param( + b"reply channel", + "bad model", + "gpu", + [input_key1], + [output_key2], + [output_descriptor1], + tf_attributes, + id="bad model", + ), + pytest.param( + b"reply channel", + model_key, + "bad device", + [input_key1], + [output_key2], + [output_descriptor1], + tf_attributes, + id="bad device", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + ["input_key1", "input_key2"], + [output_key1, output_key2], + [output_descriptor1], + tf_attributes, + id="bad inputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [model_key], + [output_key1, output_key2], + [output_descriptor1], + tf_attributes, + id="bad input schema type", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + ["output_key1", "output_key2"], + [output_descriptor1], + tf_attributes, + id="bad outputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [model_key], + [output_descriptor1], + tf_attributes, + id="bad output schema type", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1, output_key2], + [output_descriptor1], + "bad attributes", + id="bad custom attributes", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1, output_key2], + [output_descriptor1], + model_key, + id="bad custom attributes schema type", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [input_key1], + [output_key1, output_key2], + "bad descriptors", + tf_attributes, + id="bad output descriptors", + ), + ], +) +def test_build_request_indirect_tf_unsuccessful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + with pytest.raises(ValueError): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + b"reply channel", + model_key, + "cpu", + [tensor_1, tensor_2], + [], + [output_descriptor2], + torch_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "gpu", + [tensor_1], + [], + [output_descriptor3], + torch_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "auto", + [tensor_2], + [], + [output_descriptor1], + torch_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "auto", + [tensor_1], + [], + [output_descriptor1], + None, + ), + ], +) +def test_build_request_direct_torch_successful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + assert built_request is not None + assert built_request.replyChannel.reply == reply_channel + if built_request.model.which() == "modelKey": + assert built_request.model.modelKey.key == model.key + else: + assert built_request.model.modelData == model + assert built_request.device == device + assert built_request.input.which() == "inputData" + assert built_request.input.inputData[0].blob == input[0].blob + assert len(built_request.input.inputData) == len(input) + assert len(built_request.output) == len(output) + for i, j in zip(built_request.outputDescriptors, output_descriptors): + assert i.order == j.order + if built_request.customAttributes.which() == "tf": + assert ( + built_request.customAttributes.tf.tensorType == custom_attributes.tensorType + ) + elif built_request.customAttributes.which() == "torch": + assert ( + built_request.customAttributes.torch.tensorType + == custom_attributes.tensorType + ) + else: + assert built_request.customAttributes.none == custom_attributes + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + b"reply channel", + model_key, + "cpu", + [tensor_3, tensor_4], + [], + [output_descriptor2], + tf_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "gpu", + [tensor_4], + [], + [output_descriptor3], + tf_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "auto", + [tensor_4], + [], + [output_descriptor1], + tf_attributes, + ), + pytest.param( + b"another reply channel", + b"model data", + "auto", + [tensor_3], + [], + [output_descriptor1], + None, + ), + ], +) +def test_build_request_direct_tf_successful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + assert built_request is not None + assert built_request.replyChannel.reply == reply_channel + if built_request.model.which() == "modelKey": + assert built_request.model.modelKey.key == model.key + else: + assert built_request.model.modelData == model + assert built_request.device == device + assert built_request.input.which() == "inputData" + assert built_request.input.inputData[0].blob == input[0].blob + assert len(built_request.input.inputData) == len(input) + assert len(built_request.output) == len(output) + for i, j in zip(built_request.outputDescriptors, output_descriptors): + assert i.order == j.order + if built_request.customAttributes.which() == "tf": + assert ( + built_request.customAttributes.tf.tensorType == custom_attributes.tensorType + ) + elif built_request.customAttributes.which() == "torch": + assert ( + built_request.customAttributes.torch.tensorType + == custom_attributes.tensorType + ) + else: + assert built_request.customAttributes.none == custom_attributes + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + [], + model_key, + "cpu", + [tensor_1, tensor_2], + [], + [output_descriptor2], + torch_attributes, + id="bad channel", + ), + pytest.param( + b"reply channel", + "bad model", + "gpu", + [tensor_1], + [], + [output_descriptor2], + torch_attributes, + id="bad model", + ), + pytest.param( + b"reply channel", + model_key, + "bad device", + [tensor_2], + [], + [output_descriptor2], + torch_attributes, + id="bad device", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + ["input_key1", "input_key2"], + [], + [output_descriptor2], + torch_attributes, + id="bad inputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [], + ["output_key1", "output_key2"], + [output_descriptor2], + torch_attributes, + id="bad outputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [tensor_1], + [], + [output_descriptor2], + "bad attributes", + id="bad custom attributes", + ), + pytest.param( + b"reply_channel", + model_key, + "cpu", + [tensor_1, tensor_2], + [], + ["output_descriptor2"], + torch_attributes, + id="bad output descriptors", + ), + ], +) +def test_build_torch_request_direct_unsuccessful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + with pytest.raises(ValueError): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "reply_channel, model, device, input, output, output_descriptors, custom_attributes", + [ + pytest.param( + [], + model_key, + "cpu", + [tensor_3, tensor_4], + [], + [output_descriptor2], + tf_attributes, + id="bad channel", + ), + pytest.param( + b"reply channel", + "bad model", + "gpu", + [tensor_4], + [], + [output_descriptor2], + tf_attributes, + id="bad model", + ), + pytest.param( + b"reply channel", + model_key, + "bad device", + [tensor_3], + [], + [output_descriptor2], + tf_attributes, + id="bad device", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + ["input_key1", "input_key2"], + [], + [output_descriptor2], + tf_attributes, + id="bad inputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [], + ["output_key1", "output_key2"], + [output_descriptor2], + tf_attributes, + id="bad outputs", + ), + pytest.param( + b"reply channel", + model_key, + "cpu", + [tensor_4], + [], + [output_descriptor2], + "bad attributes", + id="bad custom attributes", + ), + pytest.param( + b"reply_channel", + model_key, + "cpu", + [tensor_3, tensor_4], + [], + ["output_descriptor2"], + tf_attributes, + id="bad output descriptors", + ), + ], +) +def test_build_tf_request_direct_unsuccessful( + reply_channel, model, device, input, output, output_descriptors, custom_attributes +): + with pytest.raises(ValueError): + built_request = MessageHandler.build_request( + reply_channel, + model, + device, + input, + output, + output_descriptors, + custom_attributes, + ) + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "req", + [ + pytest.param(torch_indirect_request, id="indirect"), + pytest.param(torch_direct_request, id="direct"), + ], +) +def test_serialize_torch_request_successful(req): + serialized = MessageHandler.serialize_request(req) + assert type(serialized) == bytes + + deserialized = MessageHandler.deserialize_request(serialized) + assert deserialized.to_dict() == req.to_dict() + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "req", + [ + pytest.param(tf_indirect_request, id="indirect"), + pytest.param(tf_direct_request, id="direct"), + ], +) +def test_serialize_tf_request_successful(req): + serialized = MessageHandler.serialize_request(req) + assert type(serialized) == bytes + + deserialized = MessageHandler.deserialize_request(serialized) + assert deserialized.to_dict() == req.to_dict() diff --git a/tests/test_message_handler/test_response.py b/tests/test_message_handler/test_response.py new file mode 100644 index 0000000000..9d59a18793 --- /dev/null +++ b/tests/test_message_handler/test_response.py @@ -0,0 +1,341 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim._core.mli.message_handler import MessageHandler + +try: + import tensorflow as tf +except ImportError: + should_run_tf = False +else: + should_run_tf = True + + tflow1 = tf.zeros((3, 2, 5), dtype=tf.int8) + tflow2 = tf.ones((1040, 1040, 3), dtype=tf.int64) + + small_tf_tensor = MessageHandler.build_tensor( + tflow1.numpy(), "c", "int8", list(tflow1.shape) + ) + medium_tf_tensor = MessageHandler.build_tensor( + tflow2.numpy(), "c", "int64", list(tflow2.shape) + ) + + tf_attributes = MessageHandler.build_tf_response_attributes() + + tf_direct_response = MessageHandler.build_response( + "complete", + "Success again!", + [small_tf_tensor, medium_tf_tensor], + tf_attributes, + ) + + +try: + import torch +except ImportError: + should_run_torch = False +else: + should_run_torch = True + + torch1 = torch.zeros((3, 2, 5), dtype=torch.int8) + torch2 = torch.ones((1040, 1040, 3), dtype=torch.int64) + + small_torch_tensor = MessageHandler.build_tensor( + torch1.numpy(), "c", "int8", list(torch1.shape) + ) + medium_torch_tensor = MessageHandler.build_tensor( + torch2.numpy(), "c", "int64", list(torch2.shape) + ) + + torch_attributes = MessageHandler.build_torch_response_attributes() + + torch_direct_response = MessageHandler.build_response( + "complete", + "Success again!", + [small_torch_tensor, medium_torch_tensor], + torch_attributes, + ) + + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + + +result_key1 = MessageHandler.build_tensor_key("result_key1") +result_key2 = MessageHandler.build_tensor_key("result_key2") + + +if should_run_tf: + tf_indirect_response = MessageHandler.build_response( + "complete", + "Success!", + [result_key1, result_key2], + tf_attributes, + ) + +if should_run_torch: + torch_indirect_response = MessageHandler.build_response( + "complete", + "Success!", + [result_key1, result_key2], + torch_attributes, + ) + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "status, status_message, result, custom_attribute", + [ + pytest.param( + 200, + "Yay, it worked!", + [small_torch_tensor, medium_torch_tensor], + None, + id="tensor list", + ), + pytest.param( + 200, + "Yay, it worked!", + [small_torch_tensor], + torch_attributes, + id="small tensor", + ), + pytest.param( + 200, + "Yay, it worked!", + [result_key1, result_key2], + torch_attributes, + id="tensor key list", + ), + ], +) +def test_build_torch_response_successful( + status, status_message, result, custom_attribute +): + response = MessageHandler.build_response( + status=status, + message=status_message, + result=result, + custom_attributes=custom_attribute, + ) + assert response is not None + assert response.status == status + assert response.message == status_message + if response.result.which() == "keys": + assert response.result.keys[0].to_dict() == result[0].to_dict() + else: + assert response.result.data[0].to_dict() == result[0].to_dict() + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "status, status_message, result, custom_attribute", + [ + pytest.param( + 200, + "Yay, it worked!", + [small_tf_tensor, medium_tf_tensor], + None, + id="tensor list", + ), + pytest.param( + 200, + "Yay, it worked!", + [small_tf_tensor], + tf_attributes, + id="small tensor", + ), + pytest.param( + 200, + "Yay, it worked!", + [result_key1, result_key2], + tf_attributes, + id="tensor key list", + ), + ], +) +def test_build_tf_response_successful(status, status_message, result, custom_attribute): + response = MessageHandler.build_response( + status=status, + message=status_message, + result=result, + custom_attributes=custom_attribute, + ) + assert response is not None + assert response.status == status + assert response.message == status_message + if response.result.which() == "keys": + assert response.result.keys[0].to_dict() == result[0].to_dict() + else: + assert response.result.data[0].to_dict() == result[0].to_dict() + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "status, status_message, result, custom_attribute", + [ + pytest.param( + "bad status", + "Yay, it worked!", + [small_tf_tensor, medium_tf_tensor], + None, + id="bad status", + ), + pytest.param( + "complete", + 200, + [small_tf_tensor], + tf_attributes, + id="bad status message", + ), + pytest.param( + "complete", + "Yay, it worked!", + ["result_key1", "result_key2"], + tf_attributes, + id="bad result", + ), + pytest.param( + "complete", + "Yay, it worked!", + [tf_attributes], + tf_attributes, + id="bad result type", + ), + pytest.param( + "complete", + "Yay, it worked!", + [small_tf_tensor, medium_tf_tensor], + "custom attributes", + id="bad custom attributes", + ), + pytest.param( + "complete", + "Yay, it worked!", + [small_tf_tensor, medium_tf_tensor], + result_key1, + id="bad custom attributes type", + ), + ], +) +def test_build_tf_response_unsuccessful( + status, status_message, result, custom_attribute +): + with pytest.raises(ValueError): + response = MessageHandler.build_response( + status, status_message, result, custom_attribute + ) + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "status, status_message, result, custom_attribute", + [ + pytest.param( + "bad status", + "Yay, it worked!", + [small_torch_tensor, medium_torch_tensor], + None, + id="bad status", + ), + pytest.param( + "complete", + 200, + [small_torch_tensor], + torch_attributes, + id="bad status message", + ), + pytest.param( + "complete", + "Yay, it worked!", + ["result_key1", "result_key2"], + torch_attributes, + id="bad result", + ), + pytest.param( + "complete", + "Yay, it worked!", + [torch_attributes], + torch_attributes, + id="bad result type", + ), + pytest.param( + "complete", + "Yay, it worked!", + [small_torch_tensor, medium_torch_tensor], + "custom attributes", + id="bad custom attributes", + ), + pytest.param( + "complete", + "Yay, it worked!", + [small_torch_tensor, medium_torch_tensor], + result_key1, + id="bad custom attributes type", + ), + ], +) +def test_build_torch_response_unsuccessful( + status, status_message, result, custom_attribute +): + with pytest.raises(ValueError): + response = MessageHandler.build_response( + status, status_message, result, custom_attribute + ) + + +@pytest.mark.skipif(not should_run_torch, reason="Test needs Torch to run") +@pytest.mark.parametrize( + "response", + [ + pytest.param(torch_indirect_response, id="indirect"), + pytest.param(torch_direct_response, id="direct"), + ], +) +def test_torch_serialize_response(response): + serialized = MessageHandler.serialize_response(response) + assert type(serialized) == bytes + + deserialized = MessageHandler.deserialize_response(serialized) + assert deserialized.to_dict() == response.to_dict() + + +@pytest.mark.skipif(not should_run_tf, reason="Test needs TF to run") +@pytest.mark.parametrize( + "response", + [ + pytest.param(tf_indirect_response, id="indirect"), + pytest.param(tf_direct_response, id="direct"), + ], +) +def test_tf_serialize_response(response): + serialized = MessageHandler.serialize_response(response) + assert type(serialized) == bytes + + deserialized = MessageHandler.deserialize_response(serialized) + assert deserialized.to_dict() == response.to_dict()