13

I have some spark code to process a csv file. It does some transformation on it. I now want to save this RDD as a csv file and add a header. Each line of this RDD is already formatted correctly.

I am not sure how to do it. I wanted to do a union with the header string and my RDD but the header string is not an RDD so it does not work.

5 Answers 5

10

You can make an RDD out of your header line and then union it, yes:

val rdd: RDD[String] = ...
val header: RDD[String] = sc.parallelize(Array("my,header,row"))
header.union(rdd).saveAsTextFile(...)

Then you end up with a bunch of part-xxxxx files that you merge.

The problem is that I don't think you're guaranteed that the header will be the first partition and therefore end up in part-00000 and at the top of your file. In practice, I'm pretty sure it will.

More reliable would be to use Hadoop commands like hdfs to merge the part-xxxxx files, and as part of the command, just throw in the header line from a file.

Sign up to request clarification or add additional context in comments.

2 Comments

In Spark 1.6.2 running in distributed mode, union did not put header on top for me. Here is my code snippet :- val header = sc.parallelize(Array('col1','col2'), 1) header.union( rdd.map(_.toString)) .repartition(1).saveAsTextFile(outputLocation)
same issue for me... union() is not guaranteed to preserve order. looking for a workaround now, looks like sorting the RDD might help
0

Some help on writing it without Union(Supplied the header at the time of merge)

val fileHeader ="This is header"
val fileHeaderStream: InputStream = new  ByteArrayInputStream(fileHeader.getBytes(StandardCharsets.UTF_8));
val output = IOUtils.copyBytes(fileHeaderStream,out,conf,false)

Now loop over you file parts to write the complete file using

val in: DataInputStream = ...<data input stream from file >
 IOUtils.copyBytes(in, output, conf, false)

This made sure for me that header always comes as first line even when you use coalasec/repartition for efficient writing

Comments

0
def addHeaderToRdd(sparkCtx: SparkContext, lines: RDD[String], header: String): RDD[String] = {

    val headerRDD = sparkCtx.parallelize(List((-1L, header)))     // We index the header with -1, so that the sort will put it on top.

    val pairRDD = lines.zipWithIndex()

    val pairRDD2 = pairRDD.map(t => (t._2, t._1))

    val allRDD = pairRDD2.union(headerRDD)

    val allSortedRDD = allRDD.sortByKey()

    return allSortedRDD.values
}

Comments

0

Slightly diff approach with Spark SQL

From Question: I now want to save this RDD as a CSV file and add a header. Each line of this RDD is already formatted correctly.

With Spark 2.x you have several options to convert RDD to DataFrame

val rdd = .... //Assume rdd properly formatted with case class or tuple
val df = spark.createDataFrame(rdd).toDF("col1", "col2", ... "coln")

df.write
  .format("csv")
  .option("header", "true")  //adds header to file
  .save("hdfs://location/to/save/csv")

Now we can even use Spark SQL DataFrame to load, transform and save CSV file

Comments

0
spark.sparkContext.parallelize(Seq(SqlHelper.getARow(temRet.columns, 
temRet.columns.length))).union(temRet.rdd).map(x => 
x.mkString("\x01")).coalesce(1, true).saveAsTextFile(retPath)


object SqlHelper {
//create one row
def getARow(x: Array[String], size: Int): Row = {
var columnArray = new Array[String](size)
for (i <- 0 to (size - 1)) {
  columnArray(i) = x(i).toString()
}
Row.fromSeq(columnArray)
}
}

1 Comment

can someone write this in Java

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.