How do you write RDD[Array[Byte]] to a file using Apache Spark and read it back again?
2 Answers
Common problems seem to be getting a weird cannot cast exception from BytesWritable to NullWritable. Other common problem is BytesWritable getBytes is a totally pointless pile of nonsense which doesn't get bytes at all. What getBytes does is get your bytes than adds a ton of zeros on the end! You have to use copyBytes
val rdd: RDD[Array[Byte]] = ???
// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
.saveAsSequenceFile("/output/path", codecOpt)
// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
.map(_._2.copyBytes())
5 Comments
Sam Stoelinga
This post is relatively old so just wanted to know whether the answer is still up to date? Is it still necessary to use copyBytes before reading?
samthebest
@SamStoelinga Yes I think so, it's Hadoop API that is unlikely to change.
user1609012
A more efficient alternative is to use
<BytesWritableInstance>.getBytes() and process only up to <BytesWritableInstance>.getLength() bytes. Of course, if you strictly need an RDD[Array[Byte]], this approach won't work, but you could consider an RDD[(Array[Byte], Int)].Choix
Can anyone post an entire working code snippet including what packages to be imported? Thanks.
Chris Bedford
@Choix - I had the same issue. Posting snippet that solved my problem as a separate answer.
Here is a snippet with all required imports that you can run from spark-shell, as requested by @Choix
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
val path = "/tmp/path"
val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str => (NullWritable.get, new BytesWritable(str.getBytes) ) }
bytesRdd.saveAsSequenceFile(path)
val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is: Array[String] = Array(foo)