5

Hi I'm dealing with a slightly difficult file format which I'm trying to clean for some future processing. I've been using Pyspark to process the data into a dataframe.

The file looks similar to this:

AA 1234  ZXYW
BB A 890
CC B 321
AA 1234  LMNO
BB D 123
CC E 321
AA 1234  ZXYW
CC E 456

Each 'AA' record defines the start of a logical group or records, and the data on each line is fixed length and has information encoded in it that I want to extract. There are at least 20-30 different record types. They are always identified with a two letter code at the start of each line. There can be 1 or many different record types in each group (i.e. not all record types are present for each group)

As a first stage, I've managed to group the records together in this format:

+----------------+---------------------------------+
|           index|                           result|
+----------------+---------------------------------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|
|               3|[AA 1234  ZXYV,CC B 321]         |
+----------------+---------------------------------+

And as a second stage I really want to get data into the following columns in a dataframe:

+----------------+---------------------------------+-------------+--------+--------+
|           index|                           result|           AA|      BB|      CC|
+----------------+---------------------------------+-------------+--------+--------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|AA 1234  ZXYV|BB A 890|CC B 321|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|AA 1234  LMNO|BB D 123|CC E 321|
|               3|[AA 1234  ZXYV,CC B 321]         |AA 1234  ZXYV|    Null|CC B 321|
+----------------+---------------------------------+-------------+--------+--------+

Because at that point extracting the information that I need should be trivial.

Does anyone have any suggestions as to how I might be able to do this?

Many Thanks.

3 Answers 3

7

Alternate way to explode array without converting to rdd,

from pyspark.sql import functions as F

udf1 = F.udf(lambda x : x.split()[0])
df.select('index',F.explode('result').alias('id'),udf1(F.col('id')).alias('idtype')).show()

+-----+-------------+------+
|index|           id|idtype|
+-----+-------------+------+
|    1|AA 1234  ZXYV|    AA|
|    1|     BB A 890|    BB|
|    1|     CC B 321|    CC|
|    2|AA 1234  LMNO|    AA|
|    2|     BB D 123|    BB|
|    2|     CC E 321|    CC|
|    3|AA 1234  ZXYV|    AA|
|    3|     CC B 321|    CC|
+-----+-------------+------+ 

df1.groupby('index').pivot('idtype').agg(F.first('id')).join(df,'index').show()
Sign up to request clarification or add additional context in comments.

Comments

2

You can use flatMap and pivot to achieve this. Starting from the results from the first stage:

rdd = sc.parallelize([(1,['AA 1234  ZXYV','BB A 890','CC B 321']),
                      (2,['AA 1234  LMNO','BB D 123','CC E 321']),
                      (3,['AA 1234  ZXYV','CC B 321'])])

df = rdd.toDF(['index', 'result'])

You can first explode the array into multiple rows using flatMap and extract the two letter identifier into a separate column.

df_flattened = df.rdd.flatMap(lambda x: [(x[0],y, y[0:2],y[3::]) for y in x[1]])\
               .toDF(['index','result', 'identifier','identifiertype'])

and use pivot to change the two letter identifier into column names:

df_result = df_flattened.groupby(df_flattened.index,)\
                        .pivot("identifier")\
                        .agg(first("identifiertype"))\
                        .join(df,'index')

I added the join to get the result column back

1 Comment

That worked absolutely perfectly, exactly what I needed. Thanks very much for your help.
0

Assuming you are using Spark 2.x, I think what you are looking for is the pivot operation on the spark dataframe.

First you could create a table with just 2 columns, the 2 letter encoding and the rest of the content in another column. Then you can use pivot on the dataframe to do this as can be seen below.

df.pivot("encoding_col",Seq("AA","BB"))

You can find some good examples for pivoting with dataframes here

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.