0

I am trying to calculate throughput for my Nvidia Triton Server. I want to send 10k requests from my client and want to pile them up on the server. Only after all the 10k requests are sent by the client, I want the triton server script to start processing sequentially. I am calculating throughput as 10k/(end_time - start_time).

But I keep getting the broken pipe error: BrokenPipeError: [Errno 32] Broken pipe

I do notice that count variable should be accessed via a lock but I don't think that is causing the error.

I have removed all import statements to make it easy to read. I am using nvcr.io/nvidia/tritonserver:24.02-py3 image for Triton.

Model.py:

class TritonPythonModel:

    def initialize(self, args):
        """
        Initialize MobileNet model and ImageNet labels.
        """
        self.model = ResNet50(weights="imagenet")
        self.model.predict(np.zeros((1, 224, 224, 3), dtype=np.float32))  # Warm-up

        labels_path = os.path.join(os.path.dirname(__file__), "imagenet_labels.json")
        with open(labels_path) as f:
            class_idx = json.load(f)
            self.labels = {int(k): v[1] for k, v in class_idx.items()}

        print("SLEEPING...")
        time.sleep(1)
        print("STARTING...")

        
    def execute(self, requests):
        responses = []
        for request in requests:
            try:
                input_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT_IMAGE")
               
                imgs = input_tensor.as_numpy()
                
                # Ensure the images are float32 (if not already)
                if imgs.dtype != np.float32:
                    imgs = imgs.astype(np.float32)
                
                preds = self.model.predict(imgs, verbose=0)
                
                # Process each image in the batch
                batch_labels = []
                for b in range(imgs.shape[0]):
                    top5 = np.argsort(preds[b])[::-1][:5]
                    labels = [self.labels[i].encode("utf-8") for i in top5]
                    batch_labels.append(labels)
                
                output_array = np.array(batch_labels, dtype=object)
                output_tensor = pb_utils.Tensor("OUTPUT_CLASS", output_array)
                responses.append(pb_utils.InferenceResponse(output_tensors=[output_tensor]))

            except Exception as e:
                print(e)
                responses.append(pb_utils.InferenceResponse(error=pb_utils.TritonError(str(e))))

        return responses

Client.py:


BASE_FILE_PATH = os.path.dirname(os.path.abspath('__file__'))
CSV_HEADER = ['timestamp', 'latency']

image_pool = preload_images_parallel(pool_size=100, max_workers=100)
image_pool_size = len(image_pool)

def get_image(index):
    """Returns the image at index (circular access if needed)."""
    return image_pool[index % image_pool_size]

count = 0

def infer_request(url, image_input, model_name):
    global count
    """Perform inference on a Triton server and return latency"""
    client = httpclient.InferenceServerClient(url=url, network_timeout=2**31 - 1, connection_timeout=2**31 - 1)
    start_t = time.time()
    try:
        response = client.infer(model_name=model_name, inputs=[image_input])
        count += 1
        end_t = time.time()
        duration_ms = (end_t - start_t) * 1000
        print(f"{duration_ms} ms")
        if count % 200 == 0:
            print(f"Processed {count} latencies", flush=True)
        return duration_ms 
    except InferenceServerException as e:
        print(f"Inference failed: {e}")
        return None
    
def send_10k_reqs(shared_data):
    NUM_REQS = 5000
    RPS = 250
    success = 0
    start_time = time.time()
    with ThreadPoolExecutor() as executor:
        futures = []
        # Use tqdm to wrap the loop and display progress
        for i in tqdm(range(NUM_REQS), desc="Sending requests"):
            image = get_image(i)
            image_input = httpclient.InferInput("INPUT_IMAGE", [1, 224, 224, 3], datatype="FP32")
            image_input.set_data_from_numpy(image, binary_data=True)

            with shared_data['lock']:
                url = shared_data['url']
                model_name = shared_data['model_name']

            future = executor.submit(infer_request, url, image_input, model_name)
            futures.append(future)
            # Uncomment below if you need to pace requests:
            sleep_time = np.random.exponential(1/RPS) 
            # time.sleep(sleep_time)

        for future in futures:
            result = future.result()
    end_time = time.time()

    total_time = end_time - start_time
    throughput = NUM_REQS / total_time
    print(f"Total Time: {total_time}, Throughput: {throughput}")
    return total_time, throughput

def main():
    parser = argparse.ArgumentParser(description="ML Inference Throughput & Latency Benchmark")
    parser.add_argument("-d", "--duration", type=int, required=False, help="Workload duration (in seconds)")
    parser.add_argument("-o", "--output", type=str, required=False, help="CSV file to store results")
    parser.add_argument("-m", "--mode", type=str, required=True, choices=["test_rps", "throughput"], 
                        help="Mode: 'test_rps' for latency benchmarking across RPS, 'throughput' for sending 10K requests")
    args = parser.parse_args()

    server = "localhost"
    model_id = "resnet50_python" 

    shared_data = {
        'url': f'{server}:8000',
        'model_name': f'{model_id}',
        'lock': threading.Lock()
    }

    if args.mode == "throughput":
        total_time, throughput = send_10k_reqs(shared_data)
        with open('output.csv', 'a', newline='') as csvfile:
            csvwriter = csv.writer(csvfile)
            csvwriter.writerow(["Total Time (sec)", "Throughput (req/sec)"])
            csvwriter.writerow([total_time, throughput])
        print("Throughput benchmarking completed.")

if __name__ == "__main__":
    main()

2
  • The peer has closed the connection but you are still sending. Commented Mar 23 at 4:32
  • This question is similar to: What causes the Broken Pipe Error?. If you believe it’s different, please edit the question, make it clear how it’s different and/or how the answers on that question are not helpful for your problem. Commented Mar 23 at 4:32

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.