3

I have created a PySpark RDD (converted from XML to CSV) that does not have headers. I need to convert it to a DataFrame with headers to perform some SparkSQL queries on it. I cannot seem to find a simple way to add headers. Most examples start with a dataset that already has headers.

    df = spark.read.csv('some.csv', header=True, schema=schema)

However, I need to append headers.

    headers = ['a', 'b', 'c', 'd']

This seems to be a trivial problem, I am not sure why I cannot find a working solution. Thank you.

3
  • If the headers are not there, you can specify schema which will have col name, datatype, nullable. Then you can use SparkSQL. Commented May 11, 2019 at 7:44
  • Great, thank you. I will try that. I'm very new to Spark, and sometimes it's the trivial syntax type things that I get stuck on. Commented May 11, 2019 at 17:30
  • @Annabanana This should work: df = spark.read.csv(filename).toDF("col1","col2","col3") Ref Commented Jun 12, 2022 at 0:52

2 Answers 2

6

Like this ... you need to specify schema and .option("header", "false") if your csv does not contain a header row

spark.version
'2.3.2'

! cat sample.csv

1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

PATH = "sample.csv"

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([\
    StructField("col1", IntegerType(), True),\
    StructField("col2", FloatType(), True),\
    StructField("col3", StringType(), True)])

csvFile = spark.read.format("csv")\
.option("header", "false")\
.schema(schema)\
.load(PATH)

csvFile.show()

+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1| 2.0|          hello|
|   3| 4.0|        "there"|
|   5| 6.0| "how are you?"|
+----+----+---------------+

# if you have rdd and want to convert straight to df
rdd = sc.textFile(PATH)

# just showing rows
for i in rdd.collect(): print(i)
1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

# use Row to construct a schema from rdd
from pyspark.sql import Row

csvDF = rdd\
    .map(lambda x: Row(col1 = int(x.split(",")[0]),\
                       col2 = float(x.split(",")[1]),\
                       col3 = str(x.split(",")[2]))).toDF()

csvDF.show()
+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1| 2.0|        "hello"|
|   3| 4.0|        "there"|
|   5| 6.0| "how are you?"|
+----+----+---------------+

csvDF.printSchema()
root
 |-- col1: long (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: string (nullable = true)

Sign up to request clarification or add additional context in comments.

4 Comments

Thank you @thePurplePython. What I have is an RDD that is a comma delimited text file without headers. When I save it to my hard drive it is separated into 100 partitions. I want to skip the saving part and create a DF from a comma delimited RDD. So I need to add headers and convert an RDD to a DF. How would I do it?
What I have is an RDD of comma delimited text files: "1,2.0,hello/3,4.0,there/5,6.0,how are you?" and I need to convert it to DF as you kindly show above. Any suggestions are appreciated. Thank you.
Thank you. I tried but it gave me an error. I checked my RDD type and type(RDD) = pyspark.rdd.PipelinedRDD. How do I convert a pipeline RDD to a dataframe?
I am not sure what your rdd looks like ... try this => stackoverflow.com/questions/48111066/…
5

rdd.toDF(schema=['a', 'b', 'c', 'd']

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.