0

I'm trying to use FastAPI with SQLModel for writing to a Postgresql database. In this case I want to store the inference obtained from a transformer model for natural language processing into the database.

As seen below, I defined an SQLModel for the inference request which consists of a text (a string) along with some candidate labels (a list of strings) to be scored according to which label frames more the entered text. Then in the Inference object, the database table is defined itself, inference's results which correspond to a dictionary of string and float key-values elements, there is a foreign key for binding with a user entity.

from datetime import datetime
from sqlmodel import Field, SQLModel, Relationship
from typing import Optional

class InferenceBase(SQLModel):
    text: str = Field(nullable=False, index=True)
    candidate_labels: list[str] = Field(nullable=False, index=True)

class Inference(InferenceBase, table=True):
    id: Optional[int] = Field(default=None, nullable=False, primary_key=True)
    result: dict[str, float]
    created_at: Optional[datetime]
    updated_at: Optional[datetime]
    created_by_id: Optional[int] = Field(default=None, foreign_key="user.id")
    created_by: "User" = Relationship(
        sa_relationship_kwargs={
            "lazy": "selectin",
            "primaryjoin": "Inference.created_by_id == User.id",
        }
    )

The next file define some schemas for delivering or presenting the data when required fro the API.

from app.models.inference import InferenceBase
from app.models.user import UserBase
from pydantic import BaseModel
from typing import Optional

class IInferenceCreate(InferenceBase):
    result: dict[str, float]


class IInferenceRead(InferenceBase):
    id: int
    result: dict[str, float]


class IInferenceUpdate(BaseModel):
    text: Optional[str] = None
    candidate_labels: Optional[list[str]] = None


class IInferenceReadWithUsers(IInferenceRead):
    user: UserBase

The next file, define a CRUD for managing table's writing and reading.

from app.crud.base_sqlmodel import CRUDBase
from sqlmodel.ext.asyncio.session import AsyncSession

from app.schemas.inference import IInferenceCreate, IInferenceUpdate
from app.models.inference import Inference
from datetime import datetime


class CRUDInference(CRUDBase[Inference, IInferenceCreate, IInferenceUpdate]):
    async def create_inference(
        self, db_session: AsyncSession, *, obj_in: IInferenceCreate, user_id: int
    ) -> Inference:
        db_obj = Inference.from_orm(obj_in)
        db_obj.created_at = datetime.utcnow()
        db_obj.updated_at = datetime.utcnow()
        db_obj.created_by_id = user_id
        db_session.add(db_obj)
        await db_session.commit()
        await db_session.refresh(db_obj)
        return db_obj


inference = CRUDInference(Inference)

This file is in charged of managing endpoints for the inference object, when a prediction is requested and a text and cadidate labels are entered into the API, it is supposed to write the inference result along with data requested and some other data, for this first I comsumed data into a function that retrieves the inferece result and then I generate an SQLModel value for writing into the database, and it returns data written into the database.

from app.schemas.common import (
    IPostResponseBase,
)
from app.utils.nlp import analyze_text
from app.schemas.inference import (
    IInferenceCreate,
    IInferenceRead,
)
from sqlmodel.ext.asyncio.session import AsyncSession
from fastapi import APIRouter, Depends
from app.api import deps
from app import crud
from app.models import Inference
from app.models import InferenceBase
from app.models.user import User

router = APIRouter()


@router.post("/predict/", response_model=IPostResponseBase[IInferenceRead])
async def predict(
    request: InferenceBase,
    db_session: AsyncSession = Depends(deps.get_db),
    current_user: User = Depends(deps.get_current_active_user),
):
    text = request.text
    labels = request.candidate_labels

    result = await analyze_text(text, labels)
    text = result[0]
    candidate_labels = result[1]
    res = result[2]

    inference = IInferenceCreate(
        text=text, candidate_labels=candidate_labels, result=res
    )

    my_inference_on_db = await crud.inference.create_inference(
        db_session, obj_in=inference, user_id=current_user.id
    )

    return IPostResponseBase(data=my_inference_on_db)

The thing is that I realized database is not beeing written and I got a server error. This is part of the error log I got:

zero_shot_classification_fastapi_server  | text='Last week I upgraded my iOS version and ever since then my phone has been overheating whenever I use your app.' candidate_labels=['mobile', 'website', 'billing', 'account access'] result={'mobile': 0.946 'billing': 0.011, 'account access': 0.022, 'website': 0.021}
zero_shot_classification_fastapi_server  | <class 'app.schemas.inference.IInferenceCreate'>
zero_shot_classification_fastapi_server  | 2022-03-15 07:13:10,157 INFO sqlalchemy.engine.Engine INSERT INTO inference (text, candidate_labels, result, created_at, updated_at, created_by_id) VALUES (%s, %s, %s, %s, %s, %s) RETURNING inference.id
zero_shot_classification_fastapi_server  | 2022-03-15 07:13:10,157 INFO sqlalchemy.engine.Engine [cached since 1287s ago] ('Last week I upgraded my iOS version and ever since then my phone has been overheating whenever I use your app.', ['mobile', 'website', 'billing', 'account access'], {'mobile': 0.946, 'billing': 0.011, 'account access': 0.022, 'website': 0.021}, datetime.datetime(2022, 3, 15, 7, 13, 10, 156895), datetime.datetime(2022, 3, 15, 7, 13, 10, 156909), 2)
zero_shot_classification_fastapi_server  | 2022-03-15 07:13:10,158 INFO sqlalchemy.engine.Engine ROLLBACK
zero_shot_classification_fastapi_server  | INFO:     172.20.0.1:56274 - "POST /api/v1/predict/ HTTP/1.1" 500 Internal Server Error
zero_shot_classification_fastapi_server  | ERROR:    Exception in ASGI application
zero_shot_classification_fastapi_server  | Traceback (most recent call last):
zero_shot_classification_fastapi_server  |   File "asyncpg/protocol/prepared_stmt.pyx", line 168, in asyncpg.protocol.protocol.PreparedStatementState._encode_bind_msg
zero_shot_classification_fastapi_server  |   File "asyncpg/protocol/codecs/base.pyx", line 206, in asyncpg.protocol.protocol.Codec.encode
zero_shot_classification_fastapi_server  |   File "asyncpg/protocol/codecs/base.pyx", line 111, in asyncpg.protocol.protocol.Codec.encode_scalar
zero_shot_classification_fastapi_server  |   File "asyncpg/pgproto/./codecs/text.pyx", line 29, in asyncpg.pgproto.pgproto.text_encode
zero_shot_classification_fastapi_server  |   File "asyncpg/pgproto/./codecs/text.pyx", line 12, in asyncpg.pgproto.pgproto.as_pg_string_and_size
zero_shot_classification_fastapi_server  | TypeError: expected str, got list
zero_shot_classification_fastapi_server  | 
zero_shot_classification_fastapi_server  | The above exception was the direct cause of the following exception:
zero_shot_classification_fastapi_server  | 
zero_shot_classification_fastapi_server  | Traceback (most recent call last):
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 442, in _prepare_and_execute
zero_shot_classification_fastapi_server  |     self._rows = await prepared_stmt.fetch(*parameters)
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
zero_shot_classification_fastapi_server  |     data = await self.__bind_execute(args, 0, timeout)
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
zero_shot_classification_fastapi_server  |     data, status, _ = await self.__do_execute(
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
zero_shot_classification_fastapi_server  |     return await executor(protocol)
zero_shot_classification_fastapi_server  |   File "asyncpg/protocol/protocol.pyx", line 183, in bind_execute
zero_shot_classification_fastapi_server  |   File "asyncpg/protocol/prepared_stmt.pyx", line 197, in asyncpg.protocol.protocol.PreparedStatementState._encode_bind_msg
zero_shot_classification_fastapi_server  | asyncpg.exceptions.DataError: invalid input for query argument $2: ['mobile', 'website', 'billing', 'accoun... (expected str, got list)
zero_shot_classification_fastapi_server  | 
zero_shot_classification_fastapi_server  | The above exception was the direct cause of the following exception:
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
zero_shot_classification_fastapi_server  |     value = await result
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 454, in _prepare_and_execute
zero_shot_classification_fastapi_server  |     self._handle_exception(error)
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 389, in _handle_exception
zero_shot_classification_fastapi_server  |     self._adapt_connection._handle_exception(error)
zero_shot_classification_fastapi_server  |   File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
zero_shot_classification_fastapi_server  |     raise translated_error from error
zero_shot_classification_fastapi_server  | sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.DataError'>: invalid input for query argument $2: ['mobile', 'website', 'billing', 'accoun... (expected str, got list)
zero_shot_classification_fastapi_server  | [SQL: INSERT INTO inference (text, candidate_labels, result, created_at, updated_at, created_by_id) VALUES (%s, %s, %s, %s, %s, %s) RETURNING inference.id]
zero_shot_classification_fastapi_server  | [parameters: ('Last week I upgraded my iOS version and ever since then my phone has been overheating whenever I use your app.', ['mobile', 'website', 'billing', 'account access'], {'mobile': 0.946, 'billing': 0.011, 'account access': 0.022, 'website': 0.021}, datetime.datetime(2022, 3, 15, 7, 13, 10, 156895), datetime.datetime(2022, 3, 15, 7, 13, 10, 156909), 2)]
zero_shot_classification_fastapi_server  | (Background on this error at: https://sqlalche.me/e/14/dbapi)

From the error log I understand the problem is that database can not storage data types like lists or dictionaries, how can I be abled to storage this data types into the postgres database, based on SQLModel?

1 Answer 1

2

Try using JSON dialect as the field type

from sqlalchemy.dialects.postgresql import JSON
Sign up to request clarification or add additional context in comments.

1 Comment

It works!, I tried this by assigning Field(sa_column=Column(JSON)) to the list and dict attributes.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.