0

I'm having trouble to create one instance and make it shared between all threads, here's my code:

Here's main method:

public static void main(String... args) throws IOException, ClassNotFoundException {
        MainApp mainApp = new MainApp();
        mainApp.init();
        mainApp.multiThread();
    }

here's init():

private void init() {
        HttpClient httpClient = HttpClientBuilder.create()
            .setMaxConnTotal(TOTAL_CONNECTION)
            .setMaxConnPerRoute(PER_ROUTE)
            .build();
        final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
        restTemplate = new RestTemplate(requestFactory);
    }

TOTAL_CONNECTION is 100, and PER_ROUTE is 100

here's multithread():

private void multiThread() {
        MaterializationChecker materializationChecker = new MaterializationChecker(restTemplate, new TotalInfo(0, 0));
        materializationChecker.check();
    }

Here's TotalInfo class:

public class TotalInfo {
    @Getter private int total;
    @Getter private int aboveThreshold;

    public TotalInfo(int total, int aboveThreshold) {
        this.total = total;
        this.aboveThreshold = aboveThreshold;
    }

    protected synchronized void increaseAboveThreshold() {
        aboveThreshold++;
    }

    protected synchronized void increaseTotal() {
        total++;
    }
}

Here's materializationChecker.check() method: (threadCount is set to 10, taskCount is set to 100)

public boolean check() {

        try {
            executor = Executors.newFixedThreadPool(threadCount);
            completionService = new ExecutorCompletionService<>(executor);

            submit(taskCount);

            destroy();

            System.out.println("Check finished -> OK !!!");

        } catch (Exception e) {
            System.out.println("exception when process - {}" + e);
        }
        return true;
    }



    private void submit(int taskCount) throws InterruptedException, ExecutionException {
        for (int i = 0; i < taskCount; i++) {
            completionService.submit(new MaterializationCallable(totalInfo));
        }

        int doneNum = 0;
        MaterializationCallable materializationCallable;
        Future<MaterializationCallable> future;
        long averageLatencyOfAllAverages = 0L, minLatencyOfAllMins = Long.MAX_VALUE, maxLatencyOfAllMaxs = Long.MIN_VALUE;
        while ((future = this.completionService.take()) != null) {
            materializationCallable = future.get();
            doneNum++;
            System.out.println("Task " + doneNum + " done.");

            averageLatencyOfAllAverages += materializationCallable.getLatencies().get(0);
            minLatencyOfAllMins = Math.min(minLatencyOfAllMins, materializationCallable.getLatencies().get(1));
            maxLatencyOfAllMaxs = Math.max(maxLatencyOfAllMaxs, materializationCallable.getLatencies().get(2));
            if (doneNum >= taskCount) break;
        }
        System.out.println("----\naverageLatencyOfAllAverages = " + averageLatencyOfAllAverages/taskCount + " miiliseconds\nminLatencyOfAllMins = " + minLatencyOfAllMins
            + " ms\nmaxLatencyOfAllMaxs = " + maxLatencyOfAllMaxs + " ms");
        System.out.println("total requests: " + totalInfo.getTotal() + ", total aboveThreshold: " + totalInfo.getAboveThreshold() + ", ratio (aboveThreshold/total): " + (totalInfo.getAboveThreshold()/totalInfo.getTotal()));
        System.out.println("all tasks have been done.");
    }

    private void destroy() {
        if (this.executor != null && !executor.isShutdown()) {
            System.out.println("Shutdown and wait for all worker threads to be terminated.");
            this.executor.shutdownNow();
            while (!this.executor.isTerminated()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("Occurred InterruptedException : {}" + e.getMessage());
                }
                System.out.println("Shutdown -> OK !!!");
            }
        }
    }

Here's code for MaterializationCallable class:

public class MaterializationCallable implements Callable<MaterializationCallable> {

    public static final int DURATION = 30;
    private final TotalInfo totalInfo;
    @Getter private List<Long> latencies;

    public MaterializationCallable(TotalInfo totalInfo) {
        this.latencies = new ArrayList<>();
        this.totalInfo = totalInfo;
    }

    @Override
    public MaterializationCallable call() throws Exception {
        long totalLatency = 0;
        long maxLatency = Long.MIN_VALUE;
        long minLatency = Long.MAX_VALUE;

        totalInfo.increaseTotal();

        for (int i = 0; i < itemIds.size(); i++){
            restTemplate.getForObject(endpoint, byte[].class);
            if (i != 0) {
                long oneLatency = receiveLatency + desiralizeLatency;
                totalLatency += oneLatency;

                if (minLatency > oneLatency) {
                    minLatency = oneLatency;
                }

                if (maxLatency < oneLatency) {
                    maxLatency = oneLatency;
                }

                long threshold = TimeUnit.MILLISECONDS.toMillis(DURATION);

                if (oneLatency > threshold) {

                    totalInfo.increaseAboveThreshold();

                    System.out.println("[] This request went over threshold: " + threshold + " ms, and took " + oneLatency  + " ms to finish, its endpoint = " + endpoint);
                }
            }
        }
        latencies.add(average);
        latencies.add(minLatency);
        latencies.add(maxLatency);
        System.out.println("Thread " + Thread.currentThread().getId() + " is done.");
        return this;
    }
}

My question is: at the end of materializationChecker.check() method, totalInfo.getTotal() is only 100 instead of 1000, I have initialized a 10 thread pool, and submitted the task 100 times, how come totalInfo object fields are not incremented 1000 times?

What went wrong? Please help me understand this.

Thanks a lot!

1 Answer 1

2

That's because you only submitted 100 tasks.

Your code is designed to increment the TotalInfo values by one for each task submitted. The fact your executor has 10 threads is irrelevant with how the values of TotalInfo are calculated.

The 10 threads simply allows the executor to perform 10 concurrent tasks and nothing more.

Sign up to request clarification or add additional context in comments.

6 Comments

Great! Thanks a lot @Naros for helping me understand it. But how can I achieve the effect to have a counter across all tasks and threads?
Are you trying to determine how many tasks did each thread actually process or something? I'm just not sure I understand the relevance of a thread here as its just a means for a task to be executed in parallel.
Sure. I wanted to have 10 threads, each thread to process 100 requests, how could I achieve that? Thanks.
The reality is you have no control over whether the threads will each process equal numbers of tasks or whether some threads will process more than others. Depending on your hardware, thread context swaps are expensive and could impair their ability to be scheduled. It sounds like if you have 10 threads, you basically want 1000 tasks submitted. You basically just need to take in your submit loop 100 * threadCount and submit that number of tasks. How they get divided among the threads in the executor is outside your control.
Thanks. Sure, that I totally understand. Sorry for the ambiguity. I have omitted some code in the OP, by 100 requests, I meant inside my MaterializationCallable class call() method, I'm sending one HTTP request 100 times. I was expecting 10 threads * 100 requests inside each call() method to end up in 1000 in TotalInfo class. How can I achieve that? Thanks.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.