0

So I have a java application that has spark maven dependencies and on running it, it launches spark server on the host where its run. The server instance has 36 cores. I am specifying SparkSession instance where I am mentioning the number of cores and other config properties in parallel but when I see the stats using htop, it doesn't seem to use all the cores but just 1.

   SparkSession spark  = SparkSession
                .builder()
                .master("local")
                .appName("my-spark")
                .config("spark.driver.memory","50g")
                .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
                .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
                .config("spark.sql.shuffle.partitions", "400")
                .config("spark.eventLog.enabled", "true")
                .config("spark.eventLog.dir", "/dir1/dir2/logs")
                .config("spark.history.fs.logDirectory", "/dir1/dir2/logs")
                .config("spark.executor.cores", "36")

I also added in JavaSparkContext as well:

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);
sc.hadoopConfiguration().set("spark.driver.memory","50g");
sc.hadoopConfiguration().set("spark.eventLog.enabled", "true");
sc.hadoopConfiguration().set("spark.eventLog.dir", "/dir1/dir2/logs");
sc.hadoopConfiguration().set("spark.executor.cores", "36");

My task is reading data from aws s3 into a df and writing data in another bucket.

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("s3a://bucket/file.csv.gz");
        //df = df.repartition(200);

        df.withColumn("col_name", df.col("col_name")).sort("col_name", "_id").write().format("iceberg").mode("append").save(location);

2 Answers 2

1

.gz files are "unspittable": to decompress them you have to start at byte 0 and read forwards. As a result, spark, hive, MapReduce, etc, give the whole file to a single worker. If you want parallel processing, use a different compression format (e.g. snappy)

Sign up to request clarification or add additional context in comments.

2 Comments

But if you have many such files then you should be able to run partially in parallel. Or am I missing something?
each file can be given to an individual worker, yes. But your example load("s3a://bucket/file.csv.gz") isn't doing that, so gets a parallelism of "1"
0

You are running Spark in local mode, spark.executor.cores will not take effect, consider change .master("local") to .master("local[*]")

Hope this helps

3 Comments

Thanks for your reply. Not all 36 of them are used, still. And the memory I am specifying 50gb as I have 60 gb memory in total and it using 30 gbs. Does spark takes that config as upper limit?
From what I know, when you create a Spark session with the builder, you create a 'global' session. You can then create new sessions with spark.newSession() method. You may need this if you are reading multiple files simultaneously or the same file repeatedly for performing different operations. For each file read you can create a new Spark session with the newSession(). Each call newSession() creates a new thread.
@Atihska yes that config is absolute. for cores, can you check the vcores on nodemanager ui

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.