1

My code 1st extracts data using a regex and writes that data to a text file (string format). I then tried creating a dataframe out of the contents in the text file so that i can have separate columns which led to an error. (Writing it to a csv file writes the entire thing into just one column).

with open("C:\\Sample logs\\dataframe.txt",'a') as f:
    f.write(str(time))
    f.write(" ")
    f.write(qtype)
    f.write(" ")
    f.write(rtype)
    f.write(" ")
    f.write(domain)
    f.write("\n")
 new = sc.textFile("C:\\Sample logs\\dataframe.txt").cache() # cause df requires an rdd
 lines1 = new.map(lambda x: (x, ))
 df = sqlContext.createDataFrame(lines1)

But i get the following error:

TypeError: Can not infer schema for type: type 'unicode'

I tried some other ways but didn't help. All that I want to do is that after performing write operation, i want to create a dataframe that has separate columns in order to use groupBy().

The input in the text file:

1472128348.0 HTTP - tr.vwt.gsf.asfh
1472237494.63 HTTP - tr.sdf.sff.sdfg
1473297794.26 HTTP - tr.asfr.gdfg.sdf
1474589345.0 HTTP - tr.sdgf.gdfg.gdfg
1472038475.0 HTTP - tr.sdf.csgn.sdf

Expected output in csv format:

The same thing as above but separated into columns so i can perform groupby operations.

7
  • Could you please do lines1.take(1) Commented Sep 1, 2016 at 11:44
  • please give example for input data, and expected dataframe structure Commented Sep 1, 2016 at 12:16
  • Is there a saveAsTextFile method for DataFrame class too? What is your spark version? Commented Sep 1, 2016 at 12:29
  • @AlbertoBonsanto: It returns the 1st line from the input Commented Sep 1, 2016 at 13:21
  • @Yaron: I've added it to the question Commented Sep 1, 2016 at 13:23

1 Answer 1

1

In order to replace "space separated words" into a list of words you'll need to replace:

lines1 = new.map(lambda x: (x, ))

with

 lines1 = new.map(lambda line: line.split(' '))

I tried it on my machine, and after executing the following

df = sqlContext.createDataFrame(lines1)

A new DF was created:

df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)

df.show()
+-------------+----+---+-----------------+
|           _1|  _2| _3|               _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

You can execute groupBy:

>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>> 

In order to use schema, you'll need first to define it: see: https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

A schema sample can be found below (you'll need to add fields, and update names, type in order to adopt it to your case)

from pyspark.sql.types import *
schema = StructType([
    StructField("F1", StringType(), True),
    StructField("F2", StringType(), True),
    StructField("F3", StringType(), True),
    StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

Afterwards you'll be able to run it with schema:

df = sqlContext.createDataFrame(lines1,schema)

And now, you'll have names for the fields:

df.show()
+-------------+----+---+-----------------+
|           F1|  F2| F3|               F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

in order to save it to CSV, you'll need to use "to_pandas()" , and "to_csv()" (part of python pandas)

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html

df.toPandas().to_csv('mycsv.csv')

the content of the csv file:

cat mycsv.csv

,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf

Note that you can cast a column using ".cast()", e.g. casting F1 to be of type float - adding a new column with type float, and dropping the old column)

df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks a ton! This worked! BTW when I use schema, for StructField "FloatType" writes only null values. StringType has no issues. But do you know why FloatType copies null to my excel sheet?
when I've updated my schema to use "FloatType" on F1, I got the following error: "TypeError: FloatType can not accept object in type <type 'unicode'>". I'm not sure why it was identify as unicode. see update in my answer regarding ".cast()"
When i did the casting, no new column was added but old F1 was dropped.
try the following: before the casting: df.show(), casting: df = df.withColumn("F1float", df["F1"].cast("float")) , after the casting: df.show() - let me know if a new column was added
Hi.. i tried that. Now there is a new column added. But the new column has only 1 value repeated through out unlike F1 which had a variety of values. Seems like some conversion happened and only one value (which isn't present in F1) has been added.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.