0

I am developing a FastApi which suppose to do some calculations based on a request in JSON format and then sends the response and stores it in several Databricks catalog tables.

So, in the API, I convert the response and also create the tables

What I am struggling with is what would be the correct databricks API endpoint that I should connect to?

As you can see from the code below, I defined:

url = f"{self.databricks_host}/api/2.0/sql/createTable"

but it is not working.

def send_to_dtb_catalog(self, df, table_name):
        # doing some stuff here ....

        # Prepare data payload for Databricks API
        data = {
            "tableName": f"my_database.my_schema.{table_name}",
            "data": df_json
        }   

        # Make HTTP request to Databricks REST API
        # suppose databricks_host and databricks_token are pre-defined 
        url = f"{self.databricks_host}/api/2.0/sql/createTable"

        headers = {
            "Authorization": f"Bearer {self.databricks_token}",
            "Content-Type": "application/json"
        }

        response = requests.post(url, headers=headers, json=data)

Then I will use send_to_dtb_catalog to send the created tables to Databricks catalog tables, something like this

self.send_to_dtb_catalog(table1_df, "table1_databricks")
self.send_to_dtb_catalog(table2_df, "table2_databricks")

I appreciate any help as I am new to both Databricks and API development.

0

1 Answer 1

1

You can use the following API to execute SQL statements.

Execute a SQL statement

Alter your function like below.

Code:

import requests

def send_to_dtb_catalog(df, table_name):
    url = f"{databricks_host}/api/2.0/sql/statements/"

    headers = {
        "Authorization": f"Bearer {databricks_token}",
        "Content-Type": "application/json"
    }
    sql_q = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
        id INT,
        name STRING
        )
    '''

    body = {
        "warehouse_id": "a415c87c62c279a5",
        "statement": sql_q,
        "wait_timeout": "30s",
        "on_wait_timeout": "CANCEL"
    }
    response = requests.post(url, headers=headers, json=body)
    if response.json()['status']['state'] == 'SUCCEEDED':
        print("Inserting values....")
    
        t = df.rdd.map(lambda row: tuple(row)).collect()
        insert_query = f'''
        INSERT INTO {table_name}
        VALUES
        {','.join(map(str, t))}
        '''
    
        body['statement'] = insert_query

        res2 = requests.post(url, headers=headers, json=body)
    return res2

Next, call your function.

Output:

enter image description here

Output of API request:

enter image description here

One more way is using drivers to connect to Databricks.

Refer this on how to connect to the server and execute queries.

Sign up to request clarification or add additional context in comments.

2 Comments

Yes, this was very helpful, using the /api/2.0/sql/statements/ endpoint. I hoped though, I can avoid doing SQL statements in the code, rather use some endpoint for creating tables.
Glad it worked. Even I have searched for direct endpoint to create table but no such endpoints are there. Maybe in future they may release I guess.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.