I am trying to upload a JSON file to Azure Cosmos DB - Gremlin API. I created a partition key (PK) as /LOCATIONSTATE in Cosmos DB which is one of the tags in JSON. Below is the python script I am using to load the data. I have rechecked twice and there are no null values in the JSON data for PK. How can I fix this error?
I have tried changing the PK value to some other JSON tag like /LOCATIONZIP or /LOCATIONLOOKUPID but the error remains the same. Is Cosmos not able to read the data because of ""? Please help!
import json
import requests
from gremlin_python.driver import client, serializer
import uuid # To generate unique IDs if 'id' is missing
from azure.storage.filedatalake import DataLakeServiceClient
# Replace with your own details
endpoint = ""
database = ""
graph = ""
primary_key = ""
# ADLS connection details
adls_account_url = ""
adls_file_system = "json"
adls_file_path = ""
# Azure Data Lake Storage Client
def get_adls_client():
service_client = DataLakeServiceClient(account_url=adls_account_url, credential="")
return service_client
def read_json_file_from_adls(file_name):
service_client = get_adls_client()
file_system_client = service_client.get_file_system_client(file_system=adls_file_system)
file_client = file_system_client.get_file_client(file_name)
file_content = file_client.download_file()
return json.loads(file_content.readall().decode('utf-8'))
# Initialize the Gremlin client
gremlin_client = client.Client(
endpoint,
'g',
username=f"/dbs/{database}/colls/{graph}",
password=primary_key,
message_serializer=serializer.GraphSONSerializersV2d0()
)
# Function to process a single JSON file and create the graph
def process_json_file(file_name):
# Read the JSON file from ADLS
json_data = read_json_file_from_adls(file_name)
print(f"Processing file: {file_name}")
# Iterate over each record in the JSON array
for record in json_data["XYZ"]:
# Use LOCATIONSTATE as the partition key
location_state = record.get("LOCATIONSTATE", "").strip()
if not location_state:
location_state = str(uuid.uuid4()) # Fallback to a generated unique ID
record["LOCATIONSTATE"] = location_state # Optionally update the record with the new LOCATIONSTATE
location_name = record.get("LOCATIONNAME", "").replace("'", "\\'")
location_city = record.get("LOCATIONCITY", "").replace("'", "\\'")
location_state = record.get("LOCATIONSTATE", "").replace("'", "\\'")
# Create LOCATIONSTATE vertex
try:
create_location_state_query = (
f"g.addV('location_state').property('id', '{location_state}')"
f".property('zip', '{location_state}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_state_query).all().result()
print(f"Created location_state vertex: {location_state}")
except Exception as e:
print(f"Error creating location_state vertex {location_state}: {e}")
continue
# Create LOCATIONNAME vertex and add edge
location_name_id = str(uuid.uuid4())
try:
create_location_name_query = (
f"g.addV('location_name').property('id', '{location_name_id}')"
f".property('name', '{location_name}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_name_query).all().result()
edge_location_name_query = f"g.V('{location_state}').addE('name').to(g.V('{location_name_id}'))"
gremlin_client.submit(edge_location_name_query).all().result()
print(f"Created edge: {location_state} -> {location_name}")
except Exception as e:
print(f"Error creating location_name or edge for {location_state}: {e}")
# Create LOCATIONCITY vertex and add edge
location_city_id = str(uuid.uuid4())
try:
create_location_city_query = (
f"g.addV('location_city').property('id', '{location_city_id}')"
f".property('city', '{location_city}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_city_query).all().result()
edge_location_city_query = f"g.V('{location_state}').addE('city').to(g.V('{location_city_id}'))"
gremlin_client.submit(edge_location_city_query).all().result()
print(f"Created edge: {location_state} -> {location_city}")
except Exception as e:
print(f"Error creating location_city or edge for {location_state}: {e}")
# Create LOCATIONSTATE vertex and add edge
location_state_id = str(uuid.uuid4())
try:
create_location_state_query = (
f"g.addV('location_state').property('id', '{location_state_id}')"
f".property('state', '{location_state}')"
f".property('partitionKey', '{location_state}')"
)
gremlin_client.submit(create_location_state_query).all().result()
edge_location_state_query = f"g.V('{location_state}').addE('state').to(g.V('{location_state_id}'))"
gremlin_client.submit(edge_location_state_query).all().result()
print(f"Created edge: {location_state} -> {location_state}")
except Exception as e:
print(f"Error creating location_state or edge for {location_state}: {e}")
# Process the JSON file
try:
process_json_file(adls_file_path)
except Exception as e:
print(f"Error processing the file: {e}")
# Close the Gremlin client
gremlin_client.close()
Here is the sample JSON file:
{
"XYZ Grocery": [
{
"Section" : "XYZ",
"LOCATIONLOOKUPID" : "123",
"LOCATIONNAME" : "abc",
"LOCATIONSTREETADDRESS1" : "def",
"LOCATIONSTREETADDRESS2" : null,
"LOCATIONSTREETADDRESS3" : null,
"LOCATIONCITY" : "hunter",
"LOCATIONSTATE" : "IL",
"LOCATIONZIP" : "R34",
"LOCATIONCOUNTRY" : "USA",
"LOCATIONURL" : "",
"LOCATIONTYPE" : "",
"CONTACTTYPE" : "",
"CONTACTISPRIMARY" : "0",
"CONTACTJOBTITLE" : "",
"CONTACTTITLE" : "",
"CONTACTNICKNAME" : null
},
{
"Section" : "XYZ",
"LOCATIONLOOKUPID" : "456",
"LOCATIONNAME" : "mno",
"LOCATIONSTREETADDRESS1" : "stu",
"LOCATIONSTREETADDRESS2" : null,
"LOCATIONSTREETADDRESS3" : null,
"LOCATIONCITY" : "hunter",
"LOCATIONSTATE" : "IL",
"LOCATIONZIP" : "60511",
"LOCATIONCOUNTRY" : "USA",
"LOCATIONURL" : "",
"LOCATIONTYPE" : "",
"CONTACTTYPE" : "",
"CONTACTISPRIMARY" : "0",
"CONTACTJOBTITLE" : "",
"CONTACTTITLE" : "",
"CONTACTNICKNAME" : null
}

XYZ Groceryas the top-level key, but your script refers toABC.