I am trying to calculate throughput for my Nvidia Triton Server. I want to send 10k requests from my client and want to pile them up on the server. Only after all the 10k requests are sent by the client, I want the triton server script to start processing sequentially. I am calculating throughput as 10k/(end_time - start_time).
But I keep getting the broken pipe error: BrokenPipeError: [Errno 32] Broken pipe
I do notice that count variable should be accessed via a lock but I don't think that is causing the error.
I have removed all import statements to make it easy to read. I am using nvcr.io/nvidia/tritonserver:24.02-py3 image for Triton.
Model.py:
class TritonPythonModel:
def initialize(self, args):
"""
Initialize MobileNet model and ImageNet labels.
"""
self.model = ResNet50(weights="imagenet")
self.model.predict(np.zeros((1, 224, 224, 3), dtype=np.float32)) # Warm-up
labels_path = os.path.join(os.path.dirname(__file__), "imagenet_labels.json")
with open(labels_path) as f:
class_idx = json.load(f)
self.labels = {int(k): v[1] for k, v in class_idx.items()}
print("SLEEPING...")
time.sleep(1)
print("STARTING...")
def execute(self, requests):
responses = []
for request in requests:
try:
input_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT_IMAGE")
imgs = input_tensor.as_numpy()
# Ensure the images are float32 (if not already)
if imgs.dtype != np.float32:
imgs = imgs.astype(np.float32)
preds = self.model.predict(imgs, verbose=0)
# Process each image in the batch
batch_labels = []
for b in range(imgs.shape[0]):
top5 = np.argsort(preds[b])[::-1][:5]
labels = [self.labels[i].encode("utf-8") for i in top5]
batch_labels.append(labels)
output_array = np.array(batch_labels, dtype=object)
output_tensor = pb_utils.Tensor("OUTPUT_CLASS", output_array)
responses.append(pb_utils.InferenceResponse(output_tensors=[output_tensor]))
except Exception as e:
print(e)
responses.append(pb_utils.InferenceResponse(error=pb_utils.TritonError(str(e))))
return responses
Client.py:
BASE_FILE_PATH = os.path.dirname(os.path.abspath('__file__'))
CSV_HEADER = ['timestamp', 'latency']
image_pool = preload_images_parallel(pool_size=100, max_workers=100)
image_pool_size = len(image_pool)
def get_image(index):
"""Returns the image at index (circular access if needed)."""
return image_pool[index % image_pool_size]
count = 0
def infer_request(url, image_input, model_name):
global count
"""Perform inference on a Triton server and return latency"""
client = httpclient.InferenceServerClient(url=url, network_timeout=2**31 - 1, connection_timeout=2**31 - 1)
start_t = time.time()
try:
response = client.infer(model_name=model_name, inputs=[image_input])
count += 1
end_t = time.time()
duration_ms = (end_t - start_t) * 1000
print(f"{duration_ms} ms")
if count % 200 == 0:
print(f"Processed {count} latencies", flush=True)
return duration_ms
except InferenceServerException as e:
print(f"Inference failed: {e}")
return None
def send_10k_reqs(shared_data):
NUM_REQS = 5000
RPS = 250
success = 0
start_time = time.time()
with ThreadPoolExecutor() as executor:
futures = []
# Use tqdm to wrap the loop and display progress
for i in tqdm(range(NUM_REQS), desc="Sending requests"):
image = get_image(i)
image_input = httpclient.InferInput("INPUT_IMAGE", [1, 224, 224, 3], datatype="FP32")
image_input.set_data_from_numpy(image, binary_data=True)
with shared_data['lock']:
url = shared_data['url']
model_name = shared_data['model_name']
future = executor.submit(infer_request, url, image_input, model_name)
futures.append(future)
# Uncomment below if you need to pace requests:
sleep_time = np.random.exponential(1/RPS)
# time.sleep(sleep_time)
for future in futures:
result = future.result()
end_time = time.time()
total_time = end_time - start_time
throughput = NUM_REQS / total_time
print(f"Total Time: {total_time}, Throughput: {throughput}")
return total_time, throughput
def main():
parser = argparse.ArgumentParser(description="ML Inference Throughput & Latency Benchmark")
parser.add_argument("-d", "--duration", type=int, required=False, help="Workload duration (in seconds)")
parser.add_argument("-o", "--output", type=str, required=False, help="CSV file to store results")
parser.add_argument("-m", "--mode", type=str, required=True, choices=["test_rps", "throughput"],
help="Mode: 'test_rps' for latency benchmarking across RPS, 'throughput' for sending 10K requests")
args = parser.parse_args()
server = "localhost"
model_id = "resnet50_python"
shared_data = {
'url': f'{server}:8000',
'model_name': f'{model_id}',
'lock': threading.Lock()
}
if args.mode == "throughput":
total_time, throughput = send_10k_reqs(shared_data)
with open('output.csv', 'a', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(["Total Time (sec)", "Throughput (req/sec)"])
csvwriter.writerow([total_time, throughput])
print("Throughput benchmarking completed.")
if __name__ == "__main__":
main()