server icon indicating copy to clipboard operation
server copied to clipboard

Triton BLS model with dynamic batching does not execute expected batch size.

Open njaramish opened this issue 1 year ago • 0 comments

Description When setting dynamic batching and a large batch size in config.pbtxt and submitting N individual inference requests, the Triton BLS model iterates through the N requests, executing batches of n<N requests.

With max_queue_delay set to 10s:

  • When I submit 20 individual requests, Triton dynamically batches 20 requests together, as expected.
  • When I submit 40 individual requests, Triton dynamically batches 10 requests together, in 4 sequential batches (instead of batching 40 requests together).
  • When I submit 80 individual requests, Triton dynamically batches 5 requests together, in 16 sequential batches (instead of batching 80 requests together).

Triton Information Tested with nvcr.io/nvidia/tritonserver:23.12-py3 and nvcr.io/nvidia/tritonserver:24.04-py3

To Reproduce config.pbtxt:

name: "bls-test"
max_batch_size: 400
backend: "python"
input [
{
    name: "NODE_CONTENT"
    data_type: TYPE_STRING
    dims: [ 1 ]
}
]

output [
  {
    name: "CHUNK_EMBEDDINGS"
    data_type: TYPE_FP32
    dims: [ 1024 ]
  },
  {
    name: "CHUNK_INDICES"
    data_type: TYPE_INT64
    dims: [ -1, 2 ]
  }
]

instance_group [
    {
      count: 1
      kind: KIND_CPU
    }
]

dynamic_batching {
  max_queue_delay_microseconds: 10000000
}

model.py:

import json
import torch
import os
import time
from typing import *

import triton_python_backend_utils as pb_utils
from torch.utils.dlpack import to_dlpack

class TritonPythonModel:

    def initialize(self, args):

      self.model_config = json.loads(args["model_config"])
      path: str = os.path.join(args["model_repository"], args["model_version"])

    def execute(self, requests):

      responses = []
      
      print(f'[INFO]: Processing {len(requests)} requests')

      for request in requests:
        print(f"[INFO]: Handling a request: {pb_utils.get_input_tensor_by_name(request, 'NODE_CONTENT').as_numpy()[0][0].decode('UTF-8')}")
        # time.sleep(1)

        inference_response = pb_utils.InferenceResponse(
              output_tensors=[pb_utils.Tensor.from_dlpack("CHUNK_EMBEDDINGS", to_dlpack((torch.zeros((12,1024))))),
                              pb_utils.Tensor.from_dlpack("CHUNK_INDICES", to_dlpack((torch.zeros((12,2)))))]
        )
        responses.append(inference_response)

      return responses

    def finalize(self):
      print("Cleaning up...")

launch cmd: tritonserver --model-repository=/triton/model_repo --disable-auto-complete-config

async_infer.py:

import numpy as np
import tritonclient.grpc as grpcclient
from time import sleep
from functools import partial

URL = 'localhost:8001'

node_contents = ["A test string that is the input for a request."]*40
num_node_contents = len(node_contents)

client = grpcclient.InferenceServerClient(url=URL)

def callback(user_data, result, error):
    if error:
        user_data[int(result.get_response('CHUNK_EMBEDDINGS')['id'])] = error
    else:
        user_data[int(result.get_response('CHUNK_EMBEDDINGS')['id'])] = result

np_input_data = np.array([[str.encode(node_content) for node_content in node_contents]])

inputs = []
inputs.append(grpcclient.InferInput('NODE_CONTENT', [num_node_contents,1], "BYTES"))
inputs[0].set_data_from_numpy(np_input_data.reshape([num_node_contents,1]))

outputs = []
outputs.append(grpcclient.InferRequestedOutput('CHUNK_EMBEDDINGS'))
outputs.append(grpcclient.InferRequestedOutput('CHUNK_INDICES'))

results = [None]*num_node_contents
responses = []

print(f'[INFO]: Number of nodes: {num_node_contents}')

# shoot off all of the requests
for i in range(0, num_node_contents):
    responses.append(client.async_infer(model_name='bls-test', 
                                inputs=inputs, 
                                outputs=outputs,
                                request_id=str(i),
                                callback=partial(callback, results)))

# while there are fewer results than responses, triton hasn't finished processing all requests    
while any(result is None for result in results):
    print(f'[INFO]: {sum(result is not None for result in results)}/{num_node_contents} requests completed.')
    sleep(2)

out = [(result.as_numpy('CHUNK_EMBEDDINGS'), result.as_numpy('CHUNK_INDICES')) for result in results]
print(out)

Expected behavior Given that there are 40/80 requests, and the maximum batch size is 400, I expected that all 40/80 requests would be dispatched to the Triton BLS model at the same time so that I can process a large batch. This is important to my use case because several of the steps of my pipeline (which I have omitted in reprex) benefit from larger batch sizes.

Please let me know if any additional details would be helpful, and thank you in advance for your help looking into this issue!

njaramish avatar May 24 '24 21:05 njaramish