3

I'm reading in some JSON on the from:

{"a": [{"b": {"c": 1, "d": 2}}]}

That is, the array items are unnecessarily nested. Now, because this happens inside an array, the answers given in How to flatten a struct in a Spark dataframe? don't apply directly.

This is how the dataframe looks when parsed:

root
|-- a: array
|    |-- element: struct
|    |    |-- b: struct
|    |    |    |-- c: integer
|    |    |    |-- d: integer

I'm looking to transform the dataframe into this:

root
|-- a: array
|    |-- element: struct
|    |    |-- b_c: integer
|    |    |-- b_d: integer

How do I go about aliasing the columns inside the array to effectively unnest it?

3 Answers 3

3

You can use transform:

df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
Sign up to request clarification or add additional context in comments.

Comments

1

Using the method presented in the accepted answer I wrote a function to recursively unnest a dataframe (recursing into nested arrays as well):

from pyspark.sql.types import ArrayType, StructType

def flatten(df, sentinel="x"):
    def _gen_flatten_expr(schema, indent, parents, last, transform=False):
        def handle(field, last):
            path = parents + (field.name,)
            alias = (
                " as "
                + "_".join(path[1:] if transform else path)
                + ("," if not last else "")
            )
            if isinstance(field.dataType, StructType):
                yield from _gen_flatten_expr(
                    field.dataType, indent, path, last, transform
                )
            elif (
                isinstance(field.dataType, ArrayType) and
                isinstance(field.dataType.elementType, StructType)
            ):
                yield indent, "transform("
                yield indent + 1, ".".join(path) + ","
                yield indent + 1, sentinel + " -> struct("
                yield from _gen_flatten_expr(
                    field.dataType.elementType, 
                    indent + 2, 
                    (sentinel,), 
                    True, 
                    True
                )
                yield indent + 1, ")"
                yield indent, ")" + alias
            else:
                yield (indent, ".".join(path) + alias)

        try:
            *fields, last_field = schema.fields
        except ValueError:
            pass
        else:
            for field in fields:
                yield from handle(field, False)
            yield from handle(last_field, last)

    lines = []
    for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
        spaces = " " * 4 * indent
        lines.append(spaces + line)

    expr = "struct(" + "\n".join(lines) + ") as " + sentinel
    return df.selectExpr(expr).select(sentinel + ".*")

3 Comments

this does absolutely nothing for me
@Garglesoap can you reduce your problem to a short example that can be shared here?
Sorry I was fustrated. I found something like this to work: newdf = result.withColumn("sentiment", explode("sentiment")).select("",col("sentiment.")).drop("document","sentence","tokens","word_embeddings","sentence_embeddings","sentiment")
1

Simplified Approach:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

ref: https://learn.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.