13

I need to extract some data from a pipelinedRDD but while converting it to Dataframe it is giving the following error:

Traceback (most recent call last):

  File "/home/karan/Desktop/meds.py", line 42, in <module>

    relevantToSymEntered(newrdd)

  File "/home/karan/Desktop/meds.py", line 26, in relevantToSymEntered

    mat = spark.createDataFrame(self,StructType([StructField("Prescribed 

medicine",StringType), StructField(["Disease","ID","Symptoms 

Recorded","Severeness"],ArrayType)]))

  File "/home/karan/Downloads/spark-2.4.2-bin-

hadoop2.7/python/pyspark/sql/types.py", line 409, in __init__

    "dataType %s should be an instance of %s" % (dataType, DataType)

AssertionError: dataType <class 'pyspark.sql.types.StringType'> should be an 
instance of <class 'pyspark.sql.types.DataType'>

1. Thing my error is of different type it is TypeError while I got problems with AssertionError.

  1. My problem has nothing to do with casting of data types.

I've already tried using toDF() but it changes the column names which is undesirable.

import findspark
findspark.init('/home/karan/Downloads/spark-2.4.2-bin-hadoop2.7')
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StringType, IntegerType, StructField, ArrayType
from pyspark import SparkConf, SparkContext
import pandas as pd

def reduceColoumns(self):
    try:
        filtered=self.rdd.map(lambda x: (x["Prescribed medicine"],list([x["Disease"],x["ID"],x["Symptoms Recorded"],x["Severeness"]])))
    except Exception as e:
        print("Error in CleanData:- ")
        print(e)
    return filtered

def cleanData(self,s):
    try:
        self.zipWithIndex
    except Exception as e:
        print("Error in CleanData:- ")
        print(e)
    return self.filter(lambda x: x[1][0]==s)

def relevantToSymEntered(self):
    mat = spark.createDataFrame(self,StructType([StructField("Prescribed medicine",StringType), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType)]))
    #mat = mat.rdd.map(lambda x: (x["Prescribed medicine"],list([x["ID"],x["Symptoms Recorded"],x["Severeness"]])))
    print(type(mat))


conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)
spark=SQLContext(sc)
rdd = spark.read.csv("/home/karan/Desktop/ExportExcel2.csv",header=True,sep=",",multiLine="True")

print(rdd)
newrdd=reduceColoumns(rdd)
x=input("Enter the disease-")
newrdd=cleanData(newrdd,x)
relevantToSymEntered(newrdd)
4
  • You seem to have a class with a method called reduceColumns that doesn't take any input arguments, yet you give it one after your print(rdd) statement ? Commented May 7, 2019 at 7:30
  • Possible duplicate of unexpected type: <class 'pyspark.sql.types.DataTypeSingleton'> when casting to Int on a ApacheSpark Dataframe Commented May 7, 2019 at 8:06
  • 9
    you are missing () from data types - spark.createDataFrame(self,StructType([StructField("Prescribed medicine",StringType()), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType(StringType()))])). Also Changed ArrayType check Commented May 8, 2019 at 7:29
  • @RakeshKumar Thanks for that..... now it makes sense actually...... Commented May 10, 2019 at 13:50

1 Answer 1

24

StructType([StructField("Prescribed medicine",StringType), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType)])

replace into:

StructType([StructField("Prescribed medicine",StringType()), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType())]).

You need to instantiate the class.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.