1

Editing the question to have a better view ..

There are 2 tables - Staging and Core.

I am having trouble copying the data from Staging to Core.

Conditions

  1. If id, Year and local_id matches in both staging and core -> the data for that specific Array row should be updated from staging to core
  2. If id does not match in staging and core -> A new Row should be inserted in CORE with values from STAGING
  3. If id matches but either of local_id/Year do not match, then a new row should be inserted in the data array.

BigQuery schema for STAGING

[
    {
        "name": "id",
        "type": "STRING"
    },
    {
        "name": "content",
        "type": "STRING"
    },
    {
        "name": "createdAt",
        "type": "TIMESTAMP"
    },
    {
        "name": "sourceFileName",
        "type": "STRING"
    },
    {
        "name": "data",
        "type": "record",
        "fields": [
            {
                "name": "local_id",
                "type": "STRING",
                "mode": "NULLABLE"
            },
            {
                "name": "year",
                "type": "INTEGER",
                "mode": "NULLABLE"
            },
            {
                "name": "country",
                "type": "STRING",
                "mode": "NULLABLE"
            }
           ]
    }
]

BigQuery schema for CORE

[
    {
        "name": "id",
        "type": "STRING"
    },
    {
        "name": "content",
        "type": "STRING"
    },
    {
        "name": "createdAt",
        "type": "TIMESTAMP"
    },
    {
        "name": "data",
        "type": "record",
        "mode": "REPEATED",
        "fields": [
            {
                "name": "local_id",
                "type": "STRING",
                "mode": "NULLABLE"
            },
            {
                "name": "year",
                "type": "INTEGER",
                "mode": "NULLABLE"
            },
            {
                "name": "country",
                "type": "STRING",
                "mode": "NULLABLE"
            }
           ]
    }
]

Big Query content for staging -

{"id":"1","content":"content1","createdAt":"2020-07-23 12:46:15.054410 UTC","sourceFileName":"abc.json","data":{"local_id":"123","year":2018,"country":"PL"}}
{"id":"1","content":"content3","createdAt":"2020-07-23 12:46:15.054410 UTC","sourceFileName":"abc.json","data":{"local_id":"123","year":2021,"country":"SE"}}
{"id":"2","content":"content4","createdAt":"2020-07-23 12:46:15.054410 UTC","sourceFileName":"abc.json","data":{"local_id":"334","year":2021,"country":"AZ"}}
{"id":"2","content":"content5","createdAt":"2020-07-23 12:46:15.054410 UTC","sourceFileName":"abc.json","data":{"local_id":"337","year":2021,"country":"NZ"}}

Example Content structure enter image description here

Big Query content for core -

{"id":"1","content":"content1","createdAt":"2020-07-23 12:46:15.054410 UTC","data":[{"local_id":"123","year":2018,"country":"SE"},{"local_id":"33","year":2019,"country":"PL"},{"local_id":"123","year":2020,"country":"SE"}]}

Example Content structure enter image description here

2
  • Lets see if I understand it properly: Your first code is inserting into your destination table all the IDs from source table that dont exist in destination yet. In the second code you are replacing the data field in your destination table with the data field in your source table based on the id field? Commented Jul 21, 2020 at 8:32
  • @rmesteves Yes, that is the way, it worked for me. It could be a crude way of doing it. Commented Jul 21, 2020 at 11:00

3 Answers 3

2

I was finally able to nail the problem. To merge 2 records, I had to resort to subqueries pushing in some work. Although, I still think there are chances of improvement to this code.

    -- INSERT IDs
INSERT `deep_test.main_table`  (people_id)
 (
   SELECT distinct(people_id) FROM `deep_test.staging_test`
   WHERE people_id NOT IN ( SELECT people_id FROM `deep_test.main_table` )
 );

-- UPDATE TALENT RECORD
UPDATE
  `deep_test.main_table`  gold
SET
  talent = B.talent
FROM
  (
    SELECT
      gold.people_id as people_id,
      ARRAY_AGG(aggregated_stage.talent) as talent
    FROM
      `deep_test.main_table` gold
      JOIN
        (
          SELECT
            A.people_id,
            A.talent
          FROM
            (
              SELECT
                ARRAY_AGG( t
              ORDER BY
                t.createdAt DESC LIMIT 1 )[OFFSET(0)] A
              FROM
                `deep_test.staging_test` t
              GROUP BY
                t.people_id,
                t.talent.people_l_id,
                t.talent.fiscalYear
            )
        ) as aggregated_stage
        ON gold.people_id = aggregated_stage.people_id
        WHERE aggregated_stage.talent is not null
        GROUP BY people_id
  )
  B
WHERE
  B.people_id = gold.people_id;

-- UPDATE COUNTRY CODE
UPDATE `deep_test.core` core
set core.country_code = countries.number
FROM
 (
 select people_id , (select country from UNNEST(talent) as d order by d.fiscalYear DESC limit 1) as country FROM `deep_test.core`
 ) B, `deep_test.countries` countries
WHERE
core.people_id = B.people_id
AND countries.code = B.country;

This creates a subquery and assigns the results to a variable. This variable can be used as a table in for querying and joining the results with another table.

Sign up to request clarification or add additional context in comments.

Comments

1

Try using the MERGE statement:

MERGE `dataset.destination` D
USING (select id, array(select data) data from `dataset.source`) S
ON D.id = S.id
WHEN MATCHED THEN
  UPDATE SET data = S.data
WHEN NOT MATCHED THEN
  INSERT (id, data) VALUES(S.id, S.data)

3 Comments

With the destination data being an array, this condition does not work!!
@deep updated the answer. Please let me know if it works now
I have updated the question to have clarity.. Can you please check now?
0
  1. to create an array field, use the ARRAY() function.
  2. to append to an array field, use the ARRAY_CONCAT() function.

this query can be used to do "Updated if present" requirement:

UPDATE `destenation` d
SET
  d.data = ARRAY_CONCAT( d.data, ARRAY(
    SELECT
      s.data
    FROM
      `source` s
    WHERE
      d.id = s.id) )
  WHERE d.id in (SELECT id from `source` s)

https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#update_using_joins

https://cloud.google.com/bigquery/docs/reference/standard-sql/arrays#creating_arrays_from_subqueries

https://cloud.google.com/bigquery/docs/reference/standard-sql/arrays#combining_arrays

1 Comment

This scenario fails to work when there are no records in the destination table.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.