6

I have a scenario where I have XML data in a dataframe column.

+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
|     county|created_at|first_name|                  id|meta|name_count|position|sex|               sid|updated_at|            visitors|year|
+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
|      KINGS|1574264158|      ZOEY|00000000-0000-000...| { }|        11|       0|  F|row-r9pv-p86t.ifsp|1574264158|<?xml version="1....|2007|

I want to parse - Visitors column - the nested XML fields into columns in Dataframe using UDF

Format of XML -

<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>
1
  • Please share the solution for this Commented Jul 2, 2021 at 17:26

2 Answers 2

5

There's a section on the Databricks spark-xml Github page which talks about parsing nested xml, and it provides a solution using the Scala API, as well as a couple of Pyspark helper functions to work around the issue that there is no separate Python package for spark-xml. So using these, here's one way you could solve the problem:

# 1. Copy helper functions from https://github.com/databricks/spark-xml#pyspark-notes

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
import pyspark.sql.functions as F


def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

# 2. Set up example dataframe

xml = '<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>'

df = spark.createDataFrame([('1',xml)],['id','visitors'])
df.show()

# +---+--------------------+
# | id|            visitors|
# +---+--------------------+
# |  1|<?xml version="1....|
# +---+--------------------+

# 3. Get xml schema and parse xml column

payloadSchema = ext_schema_of_xml_df(df.select("visitors"))
parsed = df.withColumn("parsed", ext_from_xml(F.col("visitors"), payloadSchema))
parsed.show()

# +---+--------------------+--------------------+
# | id|            visitors|              parsed|
# +---+--------------------+--------------------+
# |  1|<?xml version="1....|[[[, 68, 9615, F]...|
# +---+--------------------+--------------------+

# 4. Extract 'visitor' field from StructType
df2 = parsed.select(*parsed.columns[:-1],F.explode(F.col('parsed').getItem('visitor')))
df2.show()

# +---+--------------------+---------------+
# | id|            visitors|            col|
# +---+--------------------+---------------+
# |  1|<?xml version="1....|[, 68, 9615, F]|
# |  1|<?xml version="1....|[, 34, 1882, M]|
# |  1|<?xml version="1....|[, 23, 5987, M]|
# +---+--------------------+---------------+

# 5. Get field names, which will become new columns
# (there's probably a much better way of doing this :D)
new_col_names = [s.split(':')[0] for s in payloadSchema['visitor'].simpleString().split('<')[-1].strip('>>').split(',')]

new_col_names

# ['_VALUE', '_age', '_id', '_sex']

# 6. Create new columns

for c in new_col_names:
    df2 = df2.withColumn(c, F.col('col').getItem(c))
    
df2 = df2.drop('col','_VALUE')

df2.show()

# +---+--------------------+----+----+----+
# | id|            visitors|_age| _id|_sex|
# +---+--------------------+----+----+----+
# |  1|<?xml version="1....|  68|9615|   F|
# |  1|<?xml version="1....|  34|1882|   M|
# |  1|<?xml version="1....|  23|5987|   M|
# +---+--------------------+----+----+----+

One thing to look out for is the new column names duplicating existing column names - in this case the new column names are all preceded by underscores so we don't have any duplication, but it's probably good to check that the nested xml tags don't conflict with existing column names beforehand.

Sign up to request clarification or add additional context in comments.

3 Comments

Hi @user6386471 I try to use the above sample code in a databricks notebook> I hit an error "TypeError: 'JavaPackage' object is not callable" on java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options) of the function. Do you think is it because I am missing the spark-xml_2.12-0.14.0.jar in the classpath?
Hi @java_enthu, yep you're most likely seeing that error as you may not have the spark-xml library installed on your Databricks cluster. If you go to your cluster libraries and install it from Maven using the coordinates com.databricks:spark-xml_2.12:0.14.0 (ensuring that your cluster's version of Scala is 2.12 to match), then it should work.
Thanks. It turns out I don't have privileges to add/install additional jars to the cluster. So, I am unable to try, meanwhile got around it by using XPath APIs.
0

#3 above is throwing exception "Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063."

any pointers....

2 Comments

As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.
This does not provide an answer to the question. Once you have sufficient reputation you will be able to comment on any post; instead, provide answers that don't require clarification from the asker. - From Review

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.