0

My main class, generates multiple threads based on some rules. (20-40 threads live for long time). Each thread create several threads (short time ) --> I am using executer for this one. I need to work on Multi dimension arrays in the short time threads --> I wrote it like it is in the code below --> but I think that it is not efficient since I pass it so many times to so many threads / tasks --. I tried to access it directly from the threads (by declaring it as public --> no success) --> will be happy to get comments / advices on how to improve it. I also look at next step to return a 1 dimension array as a result (which might be better just to update it at the Assetfactory class ) --> and I am not sure how to. please see the code below. thanks Paz

import java.util.concurrent.*;
import java.util.logging.Level;

public class AssetFactory implements Runnable{
    private volatile boolean stop = false;
    private volatile String feed ;
    private double[][][] PeriodRates= new double[10][500][4];

    private String TimeStr,Bid,periodicalRateIndicator;
    private final BlockingQueue<String> workQueue;
    ExecutorService IndicatorPool = Executors.newCachedThreadPool();

    public AssetFactory(BlockingQueue<String> workQueue) {
      this.workQueue = workQueue;
    }   

    @Override
    public void run(){
      while (!stop) {
       try{   
          feed = workQueue.take(); 
            periodicalRateIndicator = CheckPeriod(TimeStr, Bid) ;
            if (periodicalRateIndicator.length() >0) {
                IndicatorPool.submit(new CalcMvg(periodicalRateIndicator,PeriodRates));
            } 

          } 

          if ("Stop".equals(feed)) {
              stop = true ;
          }

       } // try
       catch (InterruptedException ex) {
           logger.log(Level.SEVERE, null, ex);
           stop = true;
       }


      } // while
    } // run  

Here is the CalcMVG class

public class CalcMvg implements Runnable {
    private double [][][] PeriodRates = new double[10][500][4];

    public CalcMvg(String Periods, double[][][] PeriodRates) {
        System.out.println(Periods); 
        this.PeriodRates = PeriodRates ;
    }

    @Override
    public void run(){
       try{
          // do some work with the data of PeriodRates array e.g. print it (no changes to array
          System.out.println(PeriodRates[1][1][1]);
          }
       catch (Exception ex){
          System.out.println(Thread.currentThread().getName() + ex.getMessage());
          logger.log(Level.SEVERE, null, ex);
         }
      }//run

  } // mvg class
5
  • I don't see a specific question in your post above. Please clarify this for us. Commented Oct 13, 2013 at 11:35
  • There are two questions here, one is a code review question, and the other is a SO question but is unclear. These questions not only do not belong in one question on SO, they belong on different sites. Code review questions go here: codereview.stackexchange.com Commented Oct 13, 2013 at 11:38
  • thanks, My questions are: 1. I am looking to for a better way to access the array instead of send it as a parameter to the created thread. 2. how to access backward from the created task to a an array in the AssetFactory class. Commented Oct 13, 2013 at 11:44
  • 2
    Do you encounter performance problems with your current code? I'd say passing arrays around is not a problem at all, as their values are not copied. Commented Oct 13, 2013 at 11:57
  • not yet, but happens if the 40 major thread will create 10 new threads / task each --> and in each it will pass this 3D array. Might be not a problem at all --> so I just wanted more ideas on how can I implement it. I also placed it in code review as Robin suggested Commented Oct 13, 2013 at 12:11

2 Answers 2

4

There are several things going on here which seem to be wrong, but it is hard to give a good answer with the limited amount of code presented.

First the actual coding issues:

  • There is no need to define a variable as volatile if only one thread ever accesses it (stop, feed)

  • You should declare variables that are only used in a local context (run method) locally in that function and not globally for the whole instance (almost all variables). This allows the JIT to do various optimizations.

  • The InterruptedException should terminate the thread. Because it is thrown as a request to terminate the thread's work.

  • In your code example the workQueue doesn't seem to do anything but to put the threads to sleep or stop them. Why doesn't it just immediately feed the actual worker-threads with the required workload?

And then the code structure issues:

  • You use threads to feed threads with work. This is inefficient, as you only have a limited amount of cores that can actually do the work. As the execution order of threads is undefined, it is likely that the IndicatorPool is either mostly idle or overfilling with tasks that have not yet been done.

  • If you have a finite set of work to be done, the ExecutorCompletionService might be helpful for your task.

I think you will gain the best speed increase by redesigning the code structure. Imagine the following (assuming that I understood your question correctly):

  • There is a blocking queue of tasks that is fed by some data source (e.g. file-stream, network).

  • A set of worker-threads equal to the amount of cores is waiting on that data source for input, which is then processed and put into a completion queue.

  • A specific data set is the "terminator" for your work (e.g. "null"). If a thread encounters this terminator, it finishes it's loop and shuts down.

Now the following holds true for this construct:

Case 1: The data source is the bottle-neck. It cannot be speed-up by using multiple threads, as your harddisk/network won't work faster if you ask more often.

Case 2: The processing power on your machine is the bottle neck, as you cannot process more data than the worker threads/cores on your machine can handle.

In both cases the conclusion is, that the worker threads need to be the ones that seek for new data as soon as they are ready to process it. As either they need to be put on hold or they need to throttle the incoming data. This will ensure maximum throughput.

If all worker threads have terminated, the work is done. This can be i.E. tracked through the use of a CyclicBarrier or Phaser class.

Pseudo-code for the worker threads:

public void run() {
  DataType e;
  try {
    while ((e = dataSource.next()) != null) {
      process(e);
    }
    barrier.await();
  } catch (InterruptedException ex) {
  }
}

I hope this is helpful on your case.

Sign up to request clarification or add additional context in comments.

3 Comments

Would a fork-join approach fit his scenario? That would allow him to use the built-in Java ForkJoin classes instead of rolling his own (always tricky).
Depends on the amount of data and how it is generated. ForkJoin has two issues: one is that it requires the task to be dividable and second it eventually runs into a StackOverflowException.
thaks, about the first few comments that u wrote --> I just posted here parts of the code and other threads involved too (main etc..) so I do need the specific def. about the solution --> in the end I have implemented it--> using additional blockqueue that I pass as a reference to CalcMVG class(task) and it writes its result to this queue --> I am going to have a few more tasks that will write their results to this new queue --> in the assetfactory class that submit those tasks --> I have access to the queue and I am using those results.
0

Passing the array as an argument to the constructor is a reasonable approach, although unless you intend to copy the array it isn't necessary to initialize PeriodRates with a large array. It seems wasteful to allocate a large block of memory and then reassign its only reference straight away in the constructor. I would initialize it like this:

private final double [][][] PeriodRates;

public CalcMvg(String Periods, double[][][] PeriodRates) {
    System.out.println(Periods); 
    this.PeriodRates = PeriodRates;
}

The other option is to define CalcMvg as an inner class of AssetFactory and declare PeriodRate as final. This would allow instances of CalcMvg to access PeriodRate in the outer instance of AssetFactory.

Returning the result is more difficult since it involves publishing the result across threads. One way to do this is to use synchronized methods:

private double[] result = null;

private synchronized void setResult(double[] result) {
    this.result = result;
}

public synchronized double[] getResult() {
    if (result == null) {
        throw new RuntimeException("Result has not been initialized for this instance: " + this);
    }

    return result;
}  

There are more advanced multi-threading concepts available in the Java libraries, e.g. Future, that might be appropriate in this case.

Regarding your concerns about the number of threads, allowing a library class to manage the allocation of work to a thread pool might solve this concern. Something like an Executor might help with this.

1 Comment

I ccheck about Futures and callable tasks --> but since I am not sure if and how much it delays / stop the execution process --> I used additional blockingQueue and it looks fine, thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.