0

I'm trying to ingest some mongo collections to big query using pyspark. The schema looks like this.

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- my_field: struct (nullable = true)
 |    |    |    |-- **{ mongo id }**: struct (nullable = true)
 |    |    |    |    |-- A: timestamp (nullable = true)
 |    |    |    |    |-- B: string (nullable = true)
 |    |    |    |    |-- C: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)

The issue is that inside my_field we store the id, each group has it's own id and when I import everything to big query I end up having a new column for each id. I want to convert my_field to a string and store all the nested fields as a json or something like that. But when I try to convert it I'm getting this error

temp_df = temp_df.withColumn("groups.my_field", col("groups.my_field").cast('string'))

TypeError: Column is not iterable

What am I missing?

4
  • have you considered using BQ's TO_JSON_STRING function? Commented Jul 30, 2020 at 18:23
  • can you post a sample input row Commented Jul 31, 2020 at 5:54
  • is to_json not working? Commented Aug 2, 2020 at 6:47
  • It's not working for nested fields. I can create a new field with to_json but only in the root level, I can't replace my_field or even put it in the same level Commented Aug 4, 2020 at 19:12

2 Answers 2

2

So it turns out that in order to append/remove/rename a nested field you need to change the schema. I didn't know that. So here's my answer. I copied and modified the code from here https://stackoverflow.com/a/48906217/984114 in order to make it work with my schema

here's the modified version of "exclude_nested_field"

def change_nested_field_type(schema, fields_to_change, parent=""):
  new_schema = []

  if isinstance(schema, StringType):
      return schema

  for field in schema:
      full_field_name = field.name

      if parent:
          full_field_name = parent + "." + full_field_name

      if full_field_name not in fields_to_change:
          if isinstance(field.dataType, StructType):
              inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, inner_schema))
          elif isinstance(field.dataType, ArrayType):
              inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, ArrayType(inner_schema)))
          else:
              new_schema.append(StructField(field.name, field.dataType))
      else:
          # Here we change the field type to String
          new_schema.append(StructField(field.name, StringType()))

  return StructType(new_schema)

and here's how I call the function

new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["my_field"]))
df = df.withColumn("json", to_json("groups")).drop("groups")
df = df.withColumn("groups", from_json("json", new_schema)).drop("json")
Sign up to request clarification or add additional context in comments.

Comments

1

I needed a generic solution that can handle arbitrary level of nested column casting. By extending the accepted answer, I came up with the following functions

from typing import Dict
from pyspark.sql.types import StructType, ArrayType, StringType, StructField, _all_atomic_types
from pyspark.sql.functions import col

def apply_nested_column_casts(
    schema: StructType, column_cast: Dict[str, str], parent: str
) -> StructType:
    new_schema = []

    if isinstance(schema, StringType):
        return schema

    for field in schema:
        full_field_name = field.name

        if parent:
            full_field_name = parent + "." + full_field_name

        if full_field_name not in column_cast:
            if isinstance(field.dataType, StructType):
                inner_schema = apply_nested_column_casts(
                    field.dataType, column_cast, full_field_name
                )
                new_schema.append(StructField(field.name, inner_schema))
            elif isinstance(field.dataType, ArrayType):
                inner_schema = apply_nested_column_casts(
                    field.dataType.elementType, column_cast, full_field_name
                )
                new_schema.append(StructField(field.name, ArrayType(inner_schema)))
            else:
                new_schema.append(StructField(field.name, field.dataType))
        else:
            # Here we change the field type to the intended type
            cast_type = _all_atomic_types[column_cast[full_field_name]]
            new_schema.append(StructField(field.name, cast_type()))

    return StructType(new_schema)


def apply_column_casts(
    df: SparkDataFrame, column_casts: Dict[str, str]
) -> SparkDataFrame:
    for col_name, cast_to in column_casts.items():
        splitted_col_name = col_name.split(".")

        if len(splitted_col_name) == 1:
            df = df.withColumn(col_name, col(col_name).cast(cast_to))
        else:
            nested_field_parent_field = splitted_col_name[0]
            nested_field_parent_type = df.schema[nested_field_parent_field].dataType
            column_cast = {col_name: cast_to}
            if isinstance(nested_field_parent_type, StructType):
                new_schema = apply_nested_column_casts(
                    nested_field_parent_type, column_cast, nested_field_parent_field
                )
            elif isinstance(nested_field_parent_type, ArrayType):
                new_schema = ArrayType(
                    apply_nested_column_casts(
                        nested_field_parent_type.elementType,
                        column_cast,
                        nested_field_parent_field,
                    )
                )

            tmp_json = f"{nested_field_parent_field}_json"

            df = df.withColumn(tmp_json, to_json(nested_field_parent_field)).drop(
                nested_field_parent_field
            )
            df = df.withColumn(
                nested_field_parent_field, from_json(tmp_json, new_schema)
            ).drop(tmp_json)
    return df

And you can call the functions as shown below using dot notation for nested column casts

column_casts = {
    "col_a": "string",
    "col_b.nested_col": "double",
    "col_b.nested_struct_col.some_col": "long", 
}

df = apply_column_casts(df, column_casts)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.