1

I'm reading a large number of files from s3 bucket.

After reading those files, I want to perform filter operation on the dataframe.

But when filter operation is executing, data gets downloaded again from s3 bucket. How can I avoid dataframe reloading?

I have tried caching and/or persisting dataframe before the filter operation. But still, data is pulled from s3 bucket again in spark somehow.

var df = spark.read.json("path_to_s3_bucket/*.json")

df.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

df = df.filter("filter condition").sort(col("columnName").asc)

If the dataframe is cached, it should not be reloaded again from s3.

2
  • 1
    Can you show us the plans with explain ? How are you sure this is reading from bucket again ? Commented Sep 5, 2019 at 7:46
  • If the concern in re-reading do a distcp to your local and read it from local itself Commented Sep 5, 2019 at 7:54

1 Answer 1

1

When you call

var df = spark.read.json("path_to_s3_bucket/*.json")

what happens under the cover is that spark does partition discovery, file listing and schema inference (this may run sum jobs in the background to do the file listing in parallel if you have to many files).

Next when you call

df.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

only information is passed to the query plan that you want to persist the data, but the persisting is not happening at this moment (it is a lazy operation).

Next when you call

df = df.filter("filter condition").sort(col("columnName").asc)

again only query plan is updated.

Now if you call an action such as show(), count() and so on, the query plan will be processed and spark job will be executed. So now the data will be loaded on the cluster, it will be written to the memory (because of caching), then it is read back from the cache, it is filtered, sorted, and further processed according to your query plan.

Sign up to request clarification or add additional context in comments.

5 Comments

Hi, Thanks for your input. But when var df = spark.read.json("path_to_s3_bucket/*.json") is executed, It takes nearly two hours to load the dataframe. and then again when filter operation is performed , it takes nearly two hours. So my guess is dataframe gets reloaded again
@ChinmayR How many files are you reading? This first two hours might be really just file listing and schema inference. Try to provide a schema to the DataframeReader and see if it speeds up.
Totally around 35 files and total size is appoaximately of 2 GB. I use printSchema and show command after it. Which gives me sample data. And data is read again on filter command.
@ChinmayR Data should not be read on filter command, the operation is lazy, it should only update the query plan. I do not quite understand how is that possible. Also 35 files with 2GB in total is not too much, it should not take 2 hours to read. What is the size of your cluster?
Maybe the reason is my internet speed is too low. When i tried running this on ec2 instance it hardly took 5 minutes

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.