2

I am trying to parse multiple xml files with pyspark. All xml files have the same known schema.
First I load all the files as text to spark DF:

path = 'c:\\path\\to\\xml\\files\\*.xml'
df = spark.read.text(path)

At this point my DF looks like this:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| value                                                                                     
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>|
|<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

xml file schema is as follwoing:

df.printSchema()
root
 |-- Header: struct (nullable = false)
 |    |-- tag1: string (nullable = false)
 |    |-- tag2: integer (nullable = false)
 |    |-- tag3: timestamp (nullable = false)
 |-- Body: struct (nullable = false)
 |    |-- Pair: array (nullable = false)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- N: string (nullable = false)
 |    |    |    |-- V: string (nullable = false)

So the final output after parsing should look like this:

+---------+-----+------------------------+---+--+
|tag1     | tag2| tag3                   | N |V |
+---------+-----+------------------------+---+--+
|some str1| 2   |2022-02-16 10:39:26.730 |N1 |V1|
|some str1| 2   |2022-02-16 10:39:26.730 |N2 |V2|
|some str1| 2   |2022-02-16 10:39:26.730 |N3 |V3|
|some str2| 5   |2022-02-17 10:39:26.730 |N4 |V4|
|some str2| 5   |2022-02-17 10:39:26.730 |N5 |V5|
+---------+-----+------------------------+---+--+

Meaning "Header" element should repeat itself for all NV pairs that come from the same xml string.
So I think I found a way to extract all the header tags with xpath or xml.etree.ElementTree but my problem is that I don't really understand how to extract my NV pairs to something that I can explode later.

What am I missing?

---- Clarification ----
I tried to load my xml files with

path = 'c:\\path\\to\\xml\\files\\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)

But this option does not enable providing *.xml path so this is why I read my files as text.

2
  • I think it was related to my local spark version. I've updated it and also added HADOOP_HOME to my path so now path = 'c:\\path\\to\\xml\\files\\*.xml' df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path) works perfectly! Commented Mar 3, 2022 at 6:51
  • P.S. No need for all thar double backslashes in the path. "Both string and bytes literals may optionally be prefixed with a letter 'r' or 'R'; such strings are called raw strings and treat backslashes as literal characters. As a result, in string literals, '\U' and '\u' escapes in raw strings are not treated specially" docs.python.org/3/reference/lexical_analysis.html Commented Mar 3, 2022 at 7:04

3 Answers 3

1

Give a try to:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.format('xml').options(rowTag='book').load('books.xml')

See https://github.com/databricks/spark-xml#python-api

Sign up to request clarification or add additional context in comments.

3 Comments

I tried this one, but here I cannot provide path with .xml, but specify the exact xml file name. In my real world use case I have millions xml files that I need to parse so my input file, as I mentioned will be path = "s3://bucket/.xml". That's why I am reading all the files as text instead of using format('xml') option.
@NinaVolfenzon ok, spark-xml can also parse XML in a string-valued column in an existing DataFrame with from_xml see github.com/databricks/spark-xml#parsing-nested-xml
I looked through this documentation before I posted my question but didn't come out with any relevant solution for me
1

Depending on your spark version, you have to add this to the environment. I am using spark 2.4.0, and this version worked for me. databricks xml version

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.7.0 pyspark-shell'

The input_path.xml looks like the following:

<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>
<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>
input_path = 'src/input/input.xml'

xmlDF = spark.read.format('xml').option('rowTag', 'Msg').load(input_path)

xmlDF.printSchema()
root
 |-- Body: struct (nullable = true)
 |    |-- Pair: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- N: string (nullable = true)
 |    |    |    |-- V: string (nullable = true)
 |-- Header: struct (nullable = true)
 |    |-- tag1: string (nullable = true)
 |    |-- tag2: long (nullable = true)
 |    |-- tag3: timestamp (nullable = true)

Since you can't explode 2 lists in the same query, you can divide it that way:

xmlDF.select(
        '*',
        explode("Body.Pair.N").alias('N')
    ).select(
        'N',
        explode("Body.Pair.V").alias('V'),
        col("Header.tag1").alias('tag1'),
        col("Header.tag2").alias('tag2'),
        col("Header.tag3").alias('tag3'),
    ) \
        .dropDuplicates() \
        .show(truncate=False)

It will give the following result based on your input:

+---+---+---------+----+----------------------+
|N  |V  |tag1     |tag2|tag3                  |
+---+---+---------+----+----------------------+
|N2 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N4 |V5 |some str2|5   |2022-02-17 10:39:26.73|
|N1 |V3 |some str1|2   |2022-02-16 10:39:26.73|
|N5 |V5 |some str2|5   |2022-02-17 10:39:26.73|
|N5 |V4 |some str2|5   |2022-02-17 10:39:26.73|
|N4 |V4 |some str2|5   |2022-02-17 10:39:26.73|
|N1 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V3 |some str1|2   |2022-02-16 10:39:26.73|
|N2 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N1 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N2 |V3 |some str1|2   |2022-02-16 10:39:26.73|
+---+---+---------+----+----------------------+

1 Comment

I tried this one, but here I cannot provide path with .xml, but specify the exact xml file name. In my real world use case I have millions xml files that I need to parse so my input file, as I mentioned will be path = "s3://bucket/.xml". That's why I am reading all the files as text instead of using format('xml') option.
0

It seems it was related to my local spark version. I've updated it and also added HADOOP_HOME to my PATH. So now:

path = 'c:\\path\\to\\xml\\files\\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)

Works perfectly!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.