0

I have a spark dataframe with the following schema:

root
 |-- CONTRATO: long (nullable = true)
 |-- FECHA_FIN: date (nullable = true)
 |-- IMPORTE_FIN: double (nullable = true)
 |-- MOVIMIENTOS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FECHA: date (nullable = true)
 |    |    |-- IMPORTE: double (nullable = true)

Example of data is below:

[Row(CONTRATO=1, FECHA_FIN=datetime.date(2022, 10, 31), IMPORTE_FIN=895.83, MOVIMIENTOS=[Row(FECHA=datetime.date(2020, 9, 14), IMPORTE=10), Row(FECHA=datetime.date(2020, 9, 15), IMPORTE=20)]]

[Row(CONTRATO=2, FECHA_FIN=datetime.date(2022, 09, 31), IMPORTE_FIN=5.83, MOVIMIENTOS=[Row(FECHA=datetime.date(2021, 9, 14), IMPORTE=30), Row(FECHA=datetime.date(2020, 7, 15), IMPORTE=40)]]

I would like to access the items in 'FECHA' and 'IMPORTE' but I do not know how to do it. I am familiar with pandas dataframe but new to spark dataframes... it would be something like:

df['MOVIMIENTOS'][df['CONTRATO'] == 1][0][0] --> 14/09/2020
df['MOVIMIENTOS'][df['CONTRATO'] == 1][0][1] --> 10
df['MOVIMIENTOS'][df['CONTRATO'] == 1][1][0] --> 15/09/2020
df['MOVIMIENTOS'][df['CONTRATO'] == 1][1][1] --> 20
df['MOVIMIENTOS'][df['CONTRATO'] == 2][1][0] --> 14/09/2021
df['MOVIMIENTOS'][df['CONTRATO'] == 2][1][1] --> 30

Thanks a lot in advance

I tried different combinations but no luck

1 Answer 1

1

You can use the explode function to get a new row for each element in MOVIMIENTOS array, and then select the values that you like, like so:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import explode, col

schema = StructType([
    StructField("CONTRATO", IntegerType(), True),
    StructField("FECHA_FIN", StringType(), True),
    StructField("IMPORTE_FIN", DoubleType(), True),
    StructField("MOVIMIENTOS", ArrayType(
        StructType([
            StructField("FECHA", StringType(), True),
            StructField("IMPORTE", DoubleType(), True)
        ])
    ), True),
])

df = spark.createDataFrame([(1, "2022-10-31", 895.83, [("2022-9-14", 10.0), ("2020-0-15", 20.0)])], schema)

df.select(
    "CONTRATO", 
    "FECHA_FIN", 
    "IMPORTE_FIN", 
    explode("MOVIMIENTOS").alias("MOVIMIENTO_exploded"), 
    col("MOVIMIENTO_exploded.FECHA").alias("FECHA"), 
    col("MOVIMIENTO_exploded.IMPORTE").alias("IMPORTE")) \
    .show(truncate=False)

+--------+----------+-----------+-------------------+---------+-------+
|CONTRATO|FECHA_FIN |IMPORTE_FIN|MOVIMIENTO_exploded|FECHA    |IMPORTE|
+--------+----------+-----------+-------------------+---------+-------+
|1       |2022-10-31|895.83     |{2022-9-14, 10.0}  |2022-9-14|10.0   |
|1       |2022-10-31|895.83     |{2020-0-15, 20.0}  |2020-0-15|20.0   |
+--------+----------+-----------+-------------------+---------+-------+

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.