0

I'm hoping to get some help with an issue I'm having moving data from a JSON file to a BigQuery table using Spark.

The problem, in a nutshell:

Goal: Load a JSON file into a Spark DataFrame and persist it to a BigQuery table.

Problem: The skills column (an ARRAY) in the DataFrame is not being written to BigQuery. All other columns are fine.

What I've checked: The Spark DataFrame schema correctly identifies skills as array, and the BigQuery table's DDL also specifies the column as ARRAY.

Environment: Spark 3.4 on Serverless Dataproc, using the spark-bigquery-dependencies_2.12-0.28.1.jar connector.

Here's the code and data I'm using, along with the output from my DataFrame:

Sample JSON (data.json):

{
  "_id": {
    "$oid": "6894a88bd35f69f98c7e480f"
  },
  "employeeId": "E123452",
  "name": "John Doe 2",
  "age": 30,
  "department": "Engineering",
  "address": {
    "street": "123 Main St",
    "city": "San Francisco",
    "state": "CA",
    "zip": "94105"
  },
  "skills": ["Scala", "Spark", "BigQuery"],
  "isActive": true,
  "joiningDate": "2022-01-15T09:00:00Z"
}

Scala Code:

object JsonToBigQuery {
  def main(args: Array[String]): Unit = {
    // ... (SparkSession setup)
    val spark: SparkSession = SparkSession.builder
      .appName("JsonToBigQuery")
      .getOrCreate()

    // Load JSON file into a DataFrame
    val jsonPath = "gs://your-gcs-bucket/data/data.json"
    val df = spark.read
      .option("multiline", "true")
      .json(jsonPath)

    df.printSchema()
    df.show()

    // Replace with your project and table names
    val projectId = "your-gcp-project-id"
    val dataset = "your_dataset_name"
    val table = "your_table_name"

    val dfRenamed = df.withColumnRenamed("`_id`.`$oid`", "gid")

    dfRenamed.write
      .format("bigquery")
      .option("table", s"$projectId.$dataset.$table")
      .option("temporaryGcsBucket", "your-temp-gcs-bucket")
      .mode(SaveMode.Append)
      .save()
  }
}

BigQuery Table DDL: This is the schema of the destination table in BigQuery:

CREATE TABLE `your_dataset_name.your_table_name` (
  `_id` STRUCT<
    `oid` STRING
  >,
  `address` STRUCT<
    `city` STRING,
    `state` STRING,
    `street` STRING,
    `zip` STRING
  >,
  `age` INT64,
  `department` STRING,
  `employeeId` STRING,
  `isActive` BOOL,
  `joiningDate` STRING,
  `name` STRING,
  `skills` ARRAY<STRING>
)

DataFrame Output (from df.printSchema() and df.show()):

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)
 |-- employeeId: string (nullable = true)
 |-- isActive: boolean (nullable = true)
 |-- joiningDate: string (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------------+---+-----------+----------+--------+--------------------+------------+--------------------+
|                 _id|             address|age| department|employeeId|isActive|         joiningDate|        name|              skills|
+--------------------+--------------------+---+-----------+----------+--------+--------------------+------------+--------------------+
|[6894a88bd35f69f9...|[San Francisco, C...| 30|Engineering|  E123452|    true|2022-01-15T09:00:00Z|  John Doe 2|[Scala, Spark, BigQuery]|
+--------------------+--------------------+---+-----------+----------+--------+--------------------+------------+--------------------+

For context, here are the versions I'm using:

  • Spark: 3.4
  • BigQuery Connector: spark-bigquery-dependencies_2.12-0.28.1.jar
  • Environment: Serverless Dataproc on GCP Any ideas what I might be missing? Do I need to explicitly handle array types in some way when writing to BigQuery from Spark?
6
  • If printschema and show are showing it as arrays why did you add casting withcolumn .withColumn("skills", col("skills").cast("array<string>"))? Is it because it did not work without it and you tried adding the cast or are you testing with the casting from the beginning? Commented Aug 13 at 3:49
  • It doesn’t work with and without cast. I tried both ways. Updated the post accordingly. Commented Aug 13 at 12:19
  • stackoverflow.com/questions/69468155/…. Check this one out. Multiple sources I found online points to intermediate format connector used to be set to ORC for complex datatypes Setting this one might work . You can give a try may be. spark.conf.set("spark.datasource.bigquery.intermediateFormat", "orc") Commented Aug 13 at 12:43
  • @VindhyaG I had tried this earlier and tried it again, It didn't help spark.conf.set("spark.datasource.bigquery.intermediateFormat", "orc") Commented Aug 13 at 14:31
  • This worked fine with dataproc serverless spark runtime 1.2 Commented Aug 18 at 15:56

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.