10

How do you write RDD[Array[Byte]] to a file using Apache Spark and read it back again?

2 Answers 2

15

Common problems seem to be getting a weird cannot cast exception from BytesWritable to NullWritable. Other common problem is BytesWritable getBytes is a totally pointless pile of nonsense which doesn't get bytes at all. What getBytes does is get your bytes than adds a ton of zeros on the end! You have to use copyBytes

val rdd: RDD[Array[Byte]] = ???

// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
  .saveAsSequenceFile("/output/path", codecOpt)

// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
  .map(_._2.copyBytes())
Sign up to request clarification or add additional context in comments.

5 Comments

This post is relatively old so just wanted to know whether the answer is still up to date? Is it still necessary to use copyBytes before reading?
@SamStoelinga Yes I think so, it's Hadoop API that is unlikely to change.
A more efficient alternative is to use <BytesWritableInstance>.getBytes() and process only up to <BytesWritableInstance>.getLength() bytes. Of course, if you strictly need an RDD[Array[Byte]], this approach won't work, but you could consider an RDD[(Array[Byte], Int)].
Can anyone post an entire working code snippet including what packages to be imported? Thanks.
@Choix - I had the same issue. Posting snippet that solved my problem as a separate answer.
0

Here is a snippet with all required imports that you can run from spark-shell, as requested by @Choix

import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable

val path = "/tmp/path"

val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str  =>  (NullWritable.get, new BytesWritable(str.getBytes) )  }
bytesRdd.saveAsSequenceFile(path)

val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is:  Array[String] = Array(foo)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.