Content Type Decoding

MLServer extends the V2 inference protocol by adding support for a content_type annotation. This annotation can be provided either through the model metadata parameters, or through the input parameters. By leveraging the content_type annotation, we can provide the necessary information to MLServer so that it can decode the input payload from the “wire” V2 protocol to something meaningful to the model / user (e.g. a NumPy array).

This example will walk you through some examples which illustrate how this works, and how it can be extended.

Echo Inference Runtime

To start with, we will write a dummy runtime which just prints the input, the decoded input and returns it. This will serve as a testbed to showcase how the content_type support works.

Later on, we will extend this runtime by adding custom codecs that will decode our V2 payload to custom types.

%%writefile runtime.py
import json

from mlserver import MLModel
from mlserver.types import InferenceRequest, InferenceResponse, ResponseOutput
from mlserver.codecs import DecodedParameterName

_to_exclude = {
    "parameters": {DecodedParameterName},
    'inputs': {"__all__": {"parameters": {DecodedParameterName}}}
}

class EchoRuntime(MLModel):
    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        outputs = []
        for request_input in payload.inputs:
            decoded_input = self.decode(request_input)
            print(f"------ Encoded Input ({request_input.name}) ------")
            as_dict = request_input.dict(exclude=_to_exclude)  # type: ignore
            print(json.dumps(as_dict, indent=2))
            print(f"------ Decoded input ({request_input.name}) ------")
            print(decoded_input)
            
            outputs.append(
                ResponseOutput(
                    name=request_input.name,
                    datatype=request_input.datatype,
                    shape=request_input.shape,
                    data=request_input.data
                )
            )
        
        return InferenceResponse(model_name=self.name, outputs=outputs)
        

As you can see above, this runtime will decode the incoming payloads by calling the self.decode() helper method. This method will check what’s the right content type for each input in the following order:

  1. Is there any content type defined in the inputs[].parameters.content_type field within the request payload?

  2. Is there any content type defined in the inputs[].parameters.content_type field within the model metadata?

  3. Is there any default content type that should be assumed?

Model Settings

In order to enable this runtime, we will also create a model-settings.json file. This file should be present (or accessible from) in the folder where we run mlserver start ..

%%writefile model-settings.json

{
    "name": "content-type-example",
    "implementation": "runtime.EchoRuntime"
}

Request Inputs

Our initial step will be to decide the content type based on the incoming inputs[].parameters field. For this, we will start our MLServer in the background (e.g. running mlserver start .)

import requests

payload = {
    "inputs": [
        {
            "name": "parameters-np",
            "datatype": "INT32",
            "shape": [2, 2],
            "data": [1, 2, 3, 4],
            "parameters": {
                "content_type": "np"
            }
        },
        {
            "name": "parameters-str",
            "datatype": "BYTES",
            "shape": [11],
            "data": "hello world 😁",
            "parameters": {
                "content_type": "str"
            }
        }
    ]
}

response = requests.post(
    "http://localhost:8080/v2/models/content-type-example/infer",
    json=payload
)

Model Metadata

Our next step will be to define the expected content type through the model metadata. This can be done by extending the model-settings.json file, and adding a section on inputs.

%%writefile model-settings.json

{
    "name": "content-type-example",
    "implementation": "runtime.EchoRuntime",
    "inputs": [
        {
            "name": "metadata-np",
            "datatype": "INT32",
            "shape": [2, 2],
            "parameters": {
                "content_type": "np"
            }
        },
        {
            "name": "metadata-str",
            "datatype": "BYTES",
            "shape": [11],
            "parameters": {
                "content_type": "str"
            }
        }
    ]
}

After adding this metadata, we will re-start MLServer (e.g. mlserver start .) and we will send a new request without any explicit parameters.

import requests

payload = {
    "inputs": [
        {
            "name": "metadata-np",
            "datatype": "INT32",
            "shape": [2, 2],
            "data": [1, 2, 3, 4],
        },
        {
            "name": "metadata-str",
            "datatype": "BYTES",
            "shape": [11],
            "data": "hello world 😁",
        }
    ]
}

response = requests.post(
    "http://localhost:8080/v2/models/content-type-example/infer",
    json=payload
)

As you should be able to see in the server logs, MLServer will cross-reference the input names against the model metadata to find the right content type.

Custom Codecs

There may be cases where a custom inference runtime may need to encode / decode to custom datatypes. As an example, we can think of computer vision models which may only operate with pillow image objects.

In these scenarios, it’s possible to extend the Codec interface to write our custom encoding logic. A Codec, is simply an object which defines a decode() and encode() methods. To illustrate how this would work, we will extend our custom runtime to add a custom PillowCodec.

%%writefile runtime.py
import io
import json

from PIL import Image

from mlserver import MLModel
from mlserver.types import (
    InferenceRequest,
    InferenceResponse, 
    RequestInput, 
    ResponseOutput
)
from mlserver.codecs import NumpyCodec, register_input_codec, DecodedParameterName


_to_exclude = {
    "parameters": {DecodedParameterName},
    'inputs': {"__all__": {"parameters": {DecodedParameterName}}}
}

@register_input_codec
class PillowCodec(NumpyCodec):
    ContentType = "img"
    DefaultMode = "L"
    
    def encode(self, name: str, payload: Image) -> ResponseOutput:
        byte_array = io.BytesIO()
        payload.save(byte_array, mode=self.DefaultMode)
        
        return ResponseOutput(
            name=name,
            shape=payload.size,
            datatype="BYTES",
            data=byte_array.getvalue()
        )
    
    def decode(self, request_input: RequestInput) -> Image:
        if request_input.datatype != "BYTES":
            # If not bytes, assume it's an array
            image_array = super().decode(request_input)
            return Image.fromarray(image_array, mode=self.DefaultMode)
        
        encoded = request_input.data.__root__
        if isinstance(encoded, str):
            encoded = encoded.encode()

        return Image.frombytes(
            mode=self.DefaultMode,
            size=request_input.shape,
            data=encoded
        )

class EchoRuntime(MLModel):
    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        outputs = []
        for request_input in payload.inputs:
            decoded_input = self.decode(request_input)
            print(f"------ Encoded Input ({request_input.name}) ------")
            as_dict = request_input.dict(exclude=_to_exclude)  # type: ignore
            print(json.dumps(as_dict, indent=2))
            print(f"------ Decoded input ({request_input.name}) ------")
            print(decoded_input)
            
            outputs.append(
                ResponseOutput(
                    name=request_input.name,
                    datatype=request_input.datatype,
                    shape=request_input.shape,
                    data=request_input.data
                )
            )
        
        return InferenceResponse(model_name=self.name, outputs=outputs)
        

We should now be able to restart our instance of MLServer (i.e. with the mlserver start . command), to send a few test requests.

import requests

payload = {
    "inputs": [
        {
            "name": "image-int32",
            "datatype": "INT32",
            "shape": [8, 8],
            "data": [
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0,
                1, 0, 1, 0, 1, 0, 1, 0
            ],
            "parameters": {
                "content_type": "img"
            }
        },
        {
            "name": "image-bytes",
            "datatype": "BYTES",
            "shape": [8, 8],
            "data": (
                "10101010"
                "10101010"
                "10101010"
                "10101010"
                "10101010"
                "10101010"
                "10101010"
                "10101010"
            ),
            "parameters": {
                "content_type": "img"
            }
        }
    ]
}

response = requests.post(
    "http://localhost:8080/v2/models/content-type-example/infer",
    json=payload
)

As you should be able to see in the MLServer logs, the server is now able to decode the payload into a Pillow image. This example also illustrates how Codec objects can be compatible with multiple datatype values (e.g. tensor and BYTES in this case).

Request Codecs

So far, we’ve seen how you can specify codecs so that they get applied at the input level. However, it is also possible to use request-wide codecs that aggregate multiple inputs to decode the payload. This is usually relevant for cases where the models expect a multi-column input type, like a Pandas DataFrame.

To illustrate this, we will first tweak our EchoRuntime so that it prints the decoded contents at the request level.

%%writefile runtime.py
import json

from mlserver import MLModel
from mlserver.types import InferenceRequest, InferenceResponse, ResponseOutput
from mlserver.codecs import DecodedParameterName

_to_exclude = {
    "parameters": {DecodedParameterName},
    'inputs': {"__all__": {"parameters": {DecodedParameterName}}}
}

class EchoRuntime(MLModel):
    async def predict(self, payload: InferenceRequest) -> InferenceResponse:
        print("------ Encoded Input (request) ------")
        as_dict = payload.dict(exclude=_to_exclude)  # type: ignore
        print(json.dumps(as_dict, indent=2))
        print("------ Decoded input (request) ------")
        decoded_request = None
        if payload.parameters:
            decoded_request = getattr(payload.parameters, DecodedParameterName)
        print(decoded_request)
            
        outputs = []
        for request_input in payload.inputs:
            outputs.append(
                ResponseOutput(
                    name=request_input.name,
                    datatype=request_input.datatype,
                    shape=request_input.shape,
                    data=request_input.data
                )
            )
        
        return InferenceResponse(model_name=self.name, outputs=outputs)
        

We should now be able to restart our instance of MLServer (i.e. with the mlserver start . command), to send a few test requests.

import requests

payload = {
    "inputs": [
        {
            "name": "parameters-np",
            "datatype": "INT32",
            "shape": [2, 2],
            "data": [1, 2, 3, 4],
            "parameters": {
                "content_type": "np"
            }
        },
        {
            "name": "parameters-str",
            "datatype": "BYTES",
            "shape": [2, 11],
            "data": ["hello world 😁", "bye bye 😁"],
            "parameters": {
                "content_type": "str"
            }
        }
    ],
    "parameters": {
        "content_type": "pd"
    }
}

response = requests.post(
    "http://localhost:8080/v2/models/content-type-example/infer",
    json=payload
)