5

There are two operations in my RPC method:

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("SELECT ... FROM MyTable");
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATA MyTable ...");

Another RPC method can change DB before operation "my_rpc" will be done (between two awaits of SQL queries). How to avoid this situation?

Code of self.Engine (calls with engine aiopg.sa.create_engine):

class ConnectionContextManager(object):
    def __init__(self, engine):
        self.conn = None
        self.engine = engine

    async def __aenter__(self):
        if self.engine:
            self.conn = await self.engine.acquire()
            return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        try:
            self.engine.release(self.conn)
            self.conn.close()
        finally:
            self.conn = None
            self.engine = None
2
  • can you show your code for self.Engine ? Commented Aug 1, 2016 at 16:19
  • @jsbueno I added code Commented Aug 1, 2016 at 16:53

2 Answers 2

5

Firstly, aiopg works in autocommit mode, meaning that you have to use transaction in manual mode. Read more details.

Secondly, you have to use SELECT FOR UPDATE for lock row that you read in first statement. SELECT FOR UPDATE locks select rows while until the transaction completes. Read more details.

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("BEGIN")
        await conn.execute("SELECT ... FROM MyTable WHERE some_clause = some_value FOR UPDATE")
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATE MyTable SET some_clause=...")
        await conn.execute("""COMMIT""")
Sign up to request clarification or add additional context in comments.

Comments

2

It looks like the only way to avoid confusion is to have each transaction to take place in a separate database connection (Python side cursors won't do) The way to do that is to have a connection pool - and have your Engine method deliver a different connection for each "async thread".

That would be easier if the connector to the Postgresql itself would be async-aware (which driver are you using, btw?). Or a database-wrapper layer above it. If it is not, you will have to implement this connection pool yourself. I think Sqlalchemy connection pools will work just right fot that case, as, independent of being used in a co-routine, a connection will only be freed at the end of the async with block.

3 Comments

I have not tested my code yet. But there are used separate connections from the "aiopg.sa". Maybe this is just what I need?
Yes . since you are getting your connection with the call in async with and using a connector ready for asyncio, that should be all you need.
I think so not all... you have to use transaction manually and also SELECT FOR UPDATE

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.