7

Initial data is in Dataset<Row> and I am trying to write to pipe delimited file and I want each non empty cell and non null values to be placed in quotes. Empty or null values should not contain quotes

result.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("nullValue", "")
            .option("quoteAll", "false")
            .csv(Location);

Expected output:

"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Current Output:

London||UK
Delhi|India
Moscow|Russia

If I change the "quoteAll" to "true", output I am getting is:

"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Spark version is 2.3 and java version is java 8

7
  • 1
    Usually, you don't need quotes around values that do not contain pipes or quotes. That's how CSV usually works. Why do you expect the values to be quoted? Commented Feb 26, 2020 at 16:43
  • "quoteAll" to "true" the output is right. London||UK, between || is a cell. So you should mark it with "", if you want to use quote(to be a valid csv). It results "London"|""|"UK" Commented Feb 26, 2020 at 16:45
  • @RealSkeptic Previous version of code was in Spark 1.6 and that's how it works. Now when upgraded to 2.3 it gives quotes to either everything or nothing. But the business users wants it to be in the previous format and doesn't want any changes. I want to check possibility of adding quotes manually to all non empty values and then make "quoteAll" to "false" Commented Feb 26, 2020 at 16:50
  • @KunLun I don't want the empty cell to be in quotes. I want the empty cell without quotes and cell with values to be in quotes. Commented Feb 26, 2020 at 16:53
  • 1
    This post has everything you are looking for: stackoverflow.com/questions/36248206/… Commented Mar 3, 2020 at 18:53

3 Answers 3

5
+50

Java answer. CSV escape is not just adding " symbols around. You should handle " inside strings. So let's use StringEscapeUtils and define UDF that will call it. Then just apply the UDF to each of the column.

import org.apache.commons.text.StringEscapeUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import java.util.Arrays;

public class Test {

    void test(Dataset<Row> result, String Location) {
        // define UDF
        UserDefinedFunction escape = udf(
            (String str) -> str.isEmpty()?"":StringEscapeUtils.escapeCsv(str), DataTypes.StringType
        );
        // call udf for each column
        Column columns[] = Arrays.stream(result.schema().fieldNames())
                .map(f -> escape.apply(col(f)).as(f))
                .toArray(Column[]::new);

         // save the result
        result.select(columns)
                .coalesce(1).write()
                .option("delimiter", "|")
                .option("header", "true")
                .option("nullValue", "")
                .option("quoteAll", "false")
                .csv(Location);
    }
}

Side note: coalesce(1) is a bad call. It collect all data on one executor. You can get executor OOM in production for huge dataset.

Sign up to request clarification or add additional context in comments.

4 Comments

Thank You, I just finished writing a similar udf, but was struggling with looping through each column. I was adding columns to new dataset and performing join operation. This is certainly more efficient and better. Just wanted to check if there is a way to get quotes for header as well.
change this line: escape.apply(col(f)).as(f)) to escape.apply(col(f)).as(StringEscapeUtils.escapeCsv(f))) for quoting header
When actual data contains delimiter, I am still facing issue, quotes are not coming as expected and additional null values are generating. Can you suggest how to handle it
can you put an example?
2

EDIT & Warning: Did not see java tag. This is Scala solution that uses foldLeft as a loop to go over all columns. If this is replaced by a Java friendly loop, everything should work as is. I will try and look back at this at the later time.

A programmatic solution could be

val columns = result.columns
val randomColumnName = "RND"

val result2 = columns.foldLeft(result) { (data, column) =>
data
  .withColumnRenamed(column, randomColumnName)
  .withColumn(column,
    when(col(randomColumnName).isNull, "")
      .otherwise(concat(lit("\""), col(randomColumnName), lit("\"")))
  )
  .drop(randomColumnName)
}

This will produce the strings with " around them and write empty strings in nulls. If you need to keep nulls, just keep them.

Then just write it down:

result2.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("quoteAll", "false")
            .csv(Location);

2 Comments

I am particularly looking for a solution in java and not scala.
I have never used Java with Spark but the only Scala specific should be the foldLeft, that could be replaced with the different loop. The rest of the answer should be solid. Or at least the logic.
0

This is certainly not a efficient answer and I am modifying this based on one given by Artem Aliev, but thought it would be useful to few people, so posting this answer

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;<br/>
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;<br/>
public class Quotes {<br/>
    private static final String DELIMITER = "|";
    private static final String Location = "Give location here";

    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession.builder() 
                .master("local") 
                .appName("Spark Session") 
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> result = sparkSession.read()
                .option("header", "true")
                .option("delimiter",DELIMITER)
                .csv("Sample file to read"); //Give the details of file to read here

      UserDefinedFunction udfQuotesNonNull = udf(
        (String abc) -> (abc!=null? "\""+abc+"\"":abc),DataTypes.StringType
      );

      result = result.withColumn("ind_val", monotonically_increasing_id()); //inducing a new column to be used for join as there is no identity column in source dataset


      Dataset<Row> dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing temporary results
      Dataset<Row> dataset = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")));  //Dataset used for storing output

      String[] str = result.schema().fieldNames();
      dataset1.show();
      for(int j=0; j<str.length-1;j++)
      {
        dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")),(udfQuotesNonNull.apply(col(str[j]).cast("string")).alias("\""+str[j]+"\""))); 
        dataset=dataset.join(dataset1,"ind_val"); //Joining based on induced column
      }
      result = dataset.drop("ind_val");

      result.coalesce(1).write()
      .option("delimiter", DELIMITER)
      .option("header", "true")
      .option("quoteAll", "false")
      .option("nullValue", null)
      .option("quote", "\u0000") 
      .option("spark.sql.sources.writeJobUUID", false)
      .csv(Location);
    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.