13

I read many documents about google bigquery-python, but I can't understand how to manage bigquery data by python code.

At first, I make a new table as below.

credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
               'projectId': project_id}
table_ref = {'tableId': table_id,
             'datasetId': dataset_id,
             'projectId': project_id}

dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
    {'name': 'id', 'type': 'string'},
...
]}

table = service.tables().insert(body = table, **dataset_ref).execute()

And then I want to insert a data into this table, so I tried to do like below.

fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)

table = service.tables().patch(body = fetch_list, **table_ref).execute()

But nothing happened.

How can I update new data into bigquery table?

Please show me some example codes.

2 Answers 2

27

EDIT Nov 2018:

The answer of this question is outdated already as the google cloud client has evolved considerably since this last post.

The official docs contains all information needed already; here you can find everything needed for streaming insert and this one has a complete overview of all methods available so far (you'll also find Python code examples on each page and each method).

Original Answer:

There are a few different ways that you can use to insert data to BQ.

For a deeper understanding of how the python-api works, here's everything you'll need: bq-python-api (at first the docs are somewhat scary but after you get a hang of it it's rather quite simple).

There are 2 main methods that I use to insert data to BQ. The first one is data streaming and it's supposed to be used when you can insert row by row in a real time fashion. Code example:

import uuid
def stream_data(self, table, data, schema):
    # first checks if table already exists. If it doesn't, then create it
    r = self.service.tables().list(projectId=your_project_id,
                                     datasetId=your_dataset_id).execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if not table_exists:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': your_project_id,
                'datasetId': your_dataset_id
            },
            'schema': schema
        }
        self.service.tables().insert(projectId=your_project_id,
                                     datasetId=your_dataset_id,
                                     body=body).execute()

    # with table created, now we can stream the data
    # to do so we'll use the tabledata().insertall() function.
    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.service.tabledata().insertAll(projectId=your_project_id),
                                       datasetId=your_dataset_id,
                                       tableId=table,
                                         body=body).execute(num_retries=5)

Here my self.service is correspondent to your service object.

An example of input data that we have in our project:

data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}

And its correspondent schema:

schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}

The other way to insert data is to use the job insert function. As you can see in the documentation, it accepts several sources for your data. I have an example of how you can do so by loading the results of a query into another table:

def create_table_from_query(self,
                            query,
                            dest_table,
                            how):
    body = {
        'configuration': {
            'query': {
                'destinationTable': {
                    'projectId': your_project_id,
                    'tableId': dest_table,
                    'datasetId': your_dataset_id
                },
                'writeDisposition': how,
                'query': query,
            },
        }
    }

    response = self.connector.jobs().insert(projectId=self._project_id,
                                            body=body).execute()
    self.wait_job_completion(response['jobReference']['jobId'])

def wait_job_completion(self, job_id):
    while True:
        response = self.connector.jobs().get(projectId=self._project_id,
                                             jobId=job_id).execute()
        if response['status']['state'] == 'DONE':
            return

The how input accepts the available options for this field in the documentation (such as "WRITE_TRUNCATE", or "WRITE_APPEND").

You can load the data from a csv file for instance, in this case, the configuration variable would be defined something along the lines:

"configuration": {
  "load": {
    "fieldDelimiter": "\t"
    "sourceFormat": "CSV"
    "destinationTable": {
      "projectId": your_project_id,
      "tableId": table_id,
      "datasetId": your_dataset_id
    },
    "writeDisposition": "WRITE_TRUNCATE"
    "schema": schema,
    "sourceUris": file_location_in_google_cloud_storage
  },
}

(Using as example a csv file delimited by tabs. It could be a json file as well, the documentation will walk you through the available options).

Running jobs() require some time for it to complete (that's why we created the wait_job_completion method). It should be cheaper though as compared to real time streaming.

Any questions let us know,

Sign up to request clarification or add additional context in comments.

6 Comments

Hi from the documentation I only see you can upload CSV or Query results to table, but I'm wondering how can I append a row (python list) to a table using insert() method?
Maybe this one will help you: googlecloudplatform.github.io/google-cloud-python/stable/…. The python google client has evolved a lot since I last posted this answer, it might be worth learning to use it as well: googlecloudplatform.github.io/google-cloud-python/stable/… Notice that in your case it might be better to use the 'live stream' method as you just want to insert a few lists to BQ.
Links above are broken @WillianFuks
@skeller88 you're right. Here's the correct link for live stream: gcloud-python.readthedocs.io/en/latest/bigquery/…. For reading a CSV from storage, here's an example: gcloud-python.readthedocs.io/en/latest/bigquery/…. I'll update my answer with these links. Thanks for the message :)
@WillianFuks Both, links above and the ones on the answer are broken
|
0

From the GCP Documentation, the easiest way to just insert a couple of rows is the following:

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

1 Comment

403 POST: Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.