In order to replace "space separated words" into a list of words you'll need to replace:
lines1 = new.map(lambda x: (x, ))
with
lines1 = new.map(lambda line: line.split(' '))
I tried it on my machine, and after executing the following
df = sqlContext.createDataFrame(lines1)
A new DF was created:
df.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
|-- _4: string (nullable = true)
df.show()
+-------------+----+---+-----------------+
| _1| _2| _3| _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP| -| tr.vwt.gsf.asfh|
|1472237494.63|HTTP| -| tr.sdf.sff.sdfg|
|1473297794.26|HTTP| -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP| -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP| -| tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+
You can execute groupBy:
>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>>
In order to use schema, you'll need first to define it:
see: https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
A schema sample can be found below (you'll need to add fields, and update names, type in order to adopt it to your case)
from pyspark.sql.types import *
schema = StructType([
StructField("F1", StringType(), True),
StructField("F2", StringType(), True),
StructField("F3", StringType(), True),
StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Afterwards you'll be able to run it with schema:
df = sqlContext.createDataFrame(lines1,schema)
And now, you'll have names for the fields:
df.show()
+-------------+----+---+-----------------+
| F1| F2| F3| F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP| -| tr.vwt.gsf.asfh|
|1472237494.63|HTTP| -| tr.sdf.sff.sdfg|
|1473297794.26|HTTP| -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP| -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP| -| tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+
in order to save it to CSV, you'll need to use "to_pandas()" , and "to_csv()"
(part of python pandas)
http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
df.toPandas().to_csv('mycsv.csv')
the content of the csv file:
cat mycsv.csv
,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf
Note that you can cast a column using ".cast()", e.g. casting F1 to be of type float - adding a new column with type float, and dropping the old column)
df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")
lines1.take(1)saveAsTextFilemethod forDataFrameclass too? What is your spark version?