1

I'm using Google cloud platform to make the image proccessing with Spark (2.0.2). When I execute my code (Java), I get this Error :

[Stage 1:> (0 + 0) / 2]17/10/15 13:39:44 WARN org.apache.spark.scheduler.TaskSetManager: Stage 1 contains a task of very large size (165836 KB). The maximum recommended task size is 100 KB.

[Stage 1:> (0 + 1) / 2]Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: Java heap space

at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709) at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) at org.apache.spark.util.SerializableBuffer$$anonfun$writeObject$1.apply(SerializableBuffer.scala:49) at org.apache.spark.util.SerializableBuffer$$anonfun$writeObject$1.apply(SerializableBuffer.scala:47) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.util.SerializableBuffer.writeObject(SerializableBuffer.scala:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:250) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:249) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.launchTasks(CoarseGrainedSchedulerBackend.scala:249) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:220)

Where and how do I increase Java heap space?

My program :

public static void main(String[] args) {
  try{

          //Configuration de Spark .... 
          SparkSession spark = SparkSession
                .builder()
                .appName("Features")
                .getOrCreate();

          JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

          //Configuration HBase  .... 
          String tableName = "Descripteurs";
          Configuration conf = HBaseConfiguration.create();
          conf.addResource(new Path("/home/ibtissam/hbase-1.2.5/conf/hbase-site.xml"));
          conf.addResource(new Path("/home/ibtissam/hbase-1.2.5/conf/core-site.xml"));
          conf.set(TableInputFormat.INPUT_TABLE, tableName);

          Connection connection = ConnectionFactory.createConnection(conf);
          Admin admin = connection.getAdmin(); 
          Table tab = connection.getTable(TableName.valueOf(tableName));

          for (int n=0; n<10; n++) {
              List<String> images =new ArrayList<>();
              String repertory_ = "/home/ibtissam/images-test-10000/images-"+n+"/"; 
              File repertory = new File(repertory_);
              String files[] = repertory.list(); 

              for(int k=0; k<10;k++){
                  ExecutorService executorService = Executors.newCachedThreadPool();
                  List<MyRunnable> runnableList = new ArrayList<>();

                  for(int i=k*100; i<(k+1)*100 ; i++){
                        MyRunnable runnable = new MyRunnable(repertory_+files[i]); 
                        runnableList.add(runnable);
                        executorService.execute(runnable);
                  }
                  executorService.shutdown();

                  while(!executorService.isTerminated()){}

                  for (int i=0; i<runnableList.size(); i++) {
                      images.add(runnableList.get(i).descripteurs_);
                  }
              }

          JavaRDD<String> rdd = jsc.parallelize(images, 2000);

          //Calcul des descripteurs
          JavaPairRDD<String,String> rdd_final = rdd.mapToPair(new PairFunction<String,String,String>() {
                @Override
                public Tuple2<String,String> call(String value) {

                  String strTab[] = value.split(","); 
                  int h = Integer.parseInt(strTab[1]);
                  int w = Integer.parseInt(strTab[2]);
                  String type = strTab[3]; 
                  String nom = strTab[0];
                  String key = nom+"-"+h+"-"+w+"-"+type;

                  // Conversion de String >> Mat
                  Mat image = new Mat(h, w, 16);
                  UByteRawIndexer idx = image.createIndexer();
                  int indice = 4;
                  for (int i =0;i<h;i++) {
                    for (int j=0;j<w;j++) {
                      idx.put(i, j, Integer.parseInt(strTab[indice]));
                      indice = indice++;
                    }
                  }

                  // Calcul des features 
                  SIFT sift = new SIFT().create(); 
                  KeyPointVector keypoints = new KeyPointVector();
                  Mat descriptors = new Mat();

                  image.convertTo(image, CV_8UC3);

                  sift.detect(image, keypoints);

                  KeyPointVector keypoints_sorted = new KeyPointVector(); 
                  keypoints_sorted = sort(keypoints);
                  KeyPointVector  keypoints_2 = new KeyPointVector((keypoints_sorted.size())/4); 
                  for (int k = 0; k < (keypoints_sorted.size())/4; k++){
                      keypoints_2.put(k, keypoints_sorted.get(k));  
                  }

                  sift.compute(image,keypoints_2,descriptors);
                  image.release(); 

                  int hDes = descriptors.size().height();
                  int wDes = descriptors.size().width();
                  key = key +"-"+hDes+"-"+wDes+"-"+descriptors.type();

                  while(hDes ==0 | wDes==0){
                      SIFT sift_ = new SIFT().create(); 
                      KeyPointVector keypoints_ = new KeyPointVector();

                      sift.detect(image, keypoints_);

                      KeyPointVector keypoints_sorted_ = new KeyPointVector(); 
                      keypoints_sorted_ = sort(keypoints_);
                      KeyPointVector  keypoints_2_ = new KeyPointVector((keypoints_sorted_.size())/4); 
                      for (int k = 0; k < (keypoints_sorted_.size())/4; k++){
                          keypoints_2_.put(k, keypoints_sorted_.get(k));  
                      }

                      sift_.compute(image,keypoints_2_,descriptors);
                  }

                  // Converion des features => String 
                  String featuresStr = new String("");
                  FloatRawIndexer idx_ = descriptors.createIndexer(); 
                  int position =0;

                  for (int i =0;i < descriptors.size().height();i++) {
                    for (int j =0;j < descriptors.size().width();j++) {

                      if (position == 0) {
                          featuresStr = String.valueOf(idx_.get(position))+",";
                      }
                      if (position == ((descriptors.size().height()*descriptors.size().width())-1) ){
                          featuresStr = featuresStr + String.valueOf(idx_.get(position));                  
                      }else{
                          featuresStr = featuresStr + String.valueOf(idx_.get(position))+","; 
                      }
                      position++;
                    }
                  }
                  descriptors.release(); 
                  Tuple2<String, String> tuple = new Tuple2<>(key, featuresStr);
                  return tuple;
                } 
              });

              System.out.println("Fin de calcul des descripteurs  .... ");

              List<Tuple2<String,String>> liste = rdd_final.collect();

              System.out.println("Insertion dans hbase .... \n");
              for (int b=0; b<liste.size(); b++) {

                    String metadata[] = liste.get(b)._1().split("-"); 
                    String data = liste.get(b)._2();
                    // Row 
                    byte [] row = Bytes.toBytes(liste.get(b)._1());

                    // Family
                    byte [] family1 = Bytes.toBytes("Metadata");
                    byte [] family2 = Bytes.toBytes("Data");

                    // Qualifiers
                    byte [] height = Bytes.toBytes("height");
                    byte [] width = Bytes.toBytes("width");
                    byte [] colorSpace = Bytes.toBytes("colorSpace");
                    byte [] name = Bytes.toBytes("name");

                    byte [] features = Bytes.toBytes("features");

                    // Create Put
                    Put put = new Put(row);
                    put.addColumn(family1, height, Bytes.toBytes(metadata[5]));
                    put.addColumn(family1, width, Bytes.toBytes(metadata[6]));
                    put.addColumn(family1, name, Bytes.toBytes(metadata[0]+"-"+metadata[1]+"-"+metadata[2]+"-"+metadata[3]));
                    put.addColumn(family1, colorSpace, Bytes.toBytes(metadata[4]));
                    put.addColumn(family2, features, Bytes.toBytes(liste.get(b)._2()));
                    tab.put(put);
              }
            }
            jsc.close();

      }catch(Exception e){

        System.out.println(e);
      }
    }
5
  • 1
    You really need to focus on reducing your task size, not increasing your JVM size. Commented Oct 15, 2017 at 14:29
  • Why do your task need ~160MB heap space? Please add your program to your question. Commented Oct 15, 2017 at 14:54
  • Looks like broadcast could help here as there's a lookup table used inside (or something very similar). Commented Oct 15, 2017 at 15:05
  • @Progman I've added my program to the question. Commented Oct 15, 2017 at 15:22
  • @JoeC I have reduced my task size to 370Kb and i can't do better than that. Can you tell me how to increase the heap size on google cloud ? Commented Oct 16, 2017 at 6:28

1 Answer 1

1

try to increase the driver heap by "--driver-memory XXXXm"

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.