2

I am new to pyspark and trying to parse telecom.value if telecom.system = "fax|phone" but getting below error. I understand that filter() would return me a struct and I am selecting a column from that. How do I select the column value after calling filter()?

File "", line 3, in raise_from pyspark.sql.utils.AnalysisException: Resolved attribute(s) telecom#27,telecom#33 missing from name#3,telecom#5,address#7 in operator !Project [name#3.family AS Practitioner_LastName#23, name#3.suffix AS Practitioner_NameSuffix#24, name#3.given[0] AS Practitioner_FirstName#25, telecom#27.value AS telecom.value#42, telecom#33.value AS telecom.value#43, address#7.city AS PractitionerCity#38, address#7.line[0] AS PractitionerAddress_1#39, address#7.postalCode AS PractitionerZip#40, address#7.state AS PractitionerState#41]. Attribute(s) with the same name appear in the operation: telecom,telecom. Please check if the right attribute(s) are used.

root
 |-- resource: struct (nullable = true)
 |    |-- address: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- line: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- postalCode: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- identifier: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- type: struct (nullable = true)
 |    |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- name: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- family: string (nullable = true)
 |    |    |    |-- given: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- suffix: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- use: string (nullable = true)
 |    |-- resourceType: string (nullable = true)
 |    |-- telecom: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- div: string (nullable = true)
 |    |    |-- status: string (nullable = true)

import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

json_file_path = 'C:\\Users\\M\\Documents\\Practitioner.json'
source_df = spark.read.json(json_file_path, multiLine=True)

source_df.printSchema()
output = source_df.select(source_df["resource.name"][0].alias("name"), 
                        source_df["resource.telecom"].alias("telecom"),
                        source_df["resource.address"][0].alias("address"))
output.printSchema()

practitioner = output.select(
    output.name.family.alias("Practitioner_LastName"),
    output.name.suffix.alias("Practitioner_NameSuffix"),
    output.name.given[0].alias("Practitioner_FirstName"),
    output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "phone").telecom.value,
    output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "fax").telecom.value,
    output.address.city.alias("PractitionerCity"),
    output.address.line[0].alias("PractitionerAddress_1"), 
    output.address.postalCode.alias("PractitionerZip"), 
    output.address.state.alias("PractitionerState")
)                    
        
practitioner.printSchema()
practitioner.show()

My json is: {"resource":{"resourceType":"Practitioner","id":"scm-ambqa1821624401190","text":{"status":"generated","div":""},"identifier":[{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"NPI"}]},"value":"1548206097"},{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"DEA"}]},"value":"HB1548206"}],"name":[{"use":"official","family":"BERNSTEIN","given":["HELENE","B"],"suffix":["MD"]}],"telecom":[{"system":"phone","value":"6106547854","use":"work"},{"system":"email","value":"[email protected]","use":"work"},{"system":"fax","value":"7106547895","use":"work"}],"address":[{"use":"work","line":["West Street 1","West Street 2"],"city":"Michigan","state":"MI","postalCode":"49036","country":"USA"}]}}

1 Answer 1

1

The data structure is a bit complex, so I will use a UDF to parse it:

import pyspark.sql.functions as f
import pyspark.sql.types as t

@f.udf(t.StringType())
def phone_parser(row):
    for item in row:
        if item['system'] == 'phone':
            return item['value']
        
@f.udf(t.StringType())
def fax_parser(row):
    for item in row:
        if item['system'] == 'fax':
            return item['value']

output.select(phone_parser('telecom'), fax_parser('telecom'))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.