0

I have a for loop that should update a pandas data frame from a postgres table that is updated by another thread every 5 seconds.

if I run the code without the for loop I get what I want which is just the latest update time. However, if I run the code by using the for loop the results do not update and remain stuck on the first risults.

Why is this happening and how can I fix the problem?

metadata = MetaData(bind=None)
table = Table(
    'datastore', 
    metadata, 
    autoload=True, 
    autoload_with=engine
)

stmt = select([
    table.columns.date,
    table.columns.open,
    table.columns.high,
    table.columns.low,
    table.columns.close
]).where(and_(table.columns.date_ == datetime.today().strftime('%Y-%m-%d') and table.columns.close != 0))
#]).where(and_(table.columns.date_ == '2023-01-12' and table.columns.close != 0))
    
connection = engine.connect()

for x in range(1000000):
    data_from_db = pd.DataFrame(connection.execute(stmt).fetchall())  
    data_from_db = data_from_db[data_from_db['close'] != 0]
    print(data_from_db.date.iloc[-1])
    time.sleep(5)

I'm also trying the psycopg2 library and the problem is always there:

for x in range(1000000):
    conn = psycopg2.connect(
                           host='localhost', 
                           database='ABC',  
                           user='postgres',
                           password='*******')
    cur = conn.cursor()
    cur.execute("select max(date) from public.datastore")

    y = cur.fetchall()
    print(y)
    time.sleep(5)

2 Answers 2

1

The issue can be caused by some of the following:

  1. transaction isolation level (meaning that your other thread which updates your table may have a session not yet closed so that you current script reads an old data)
  2. you may have caching applied for same statements

For the 1st factor try to set isolation_level="READ UNCOMMITTED" to have a "dirty"/fresh reads

engine = create_engine(
    "your_dsn path",
    isolation_level="READ UNCOMMITTED"
)

with engine.connect() as conn:
    for x in range(1000000):
        data_from_db = pd.DataFrame(conn.execute(stmt).fetchall()) 
        print(data_from_db.date.iloc[-1])
        time.sleep(5)

For the 2nd factor you can try:

# disable caching for this connection
with engine.connect().execution_options(compiled_cache=None) as conn:
    # your loop here
    data_from_db = pd.DataFrame(conn.execute(stmt).fetchall())
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you Roman, I've tried your solution and rewritten the code with psycopg2 and the problem is still there.
0

I was using Jupyter notebooks.

One cell was inserting data from an API and another cell was reading from the same db using a different thread.

It looks like that if I perform these two operations using two different notebooks all works.

I was simply supposed to use threads.

import psycopg2
import time

def function_test():
    while True:
        while True:
            conn_ = psycopg2.connect(
                                   host='#####', 
                                   database='####',  
                                   user='####',
                                   password='#####')
            cur = conn_.cursor()
            cur.execute("SELECT date FROM public.datastore order by date desc limit 1")
            query_results = cur.fetchone()
            print(query_results)
            del(query_results)
            del(cur)
            break
        conn_.commit()
        conn_.close()
        time.sleep(5)
 
thread1 = Thread(target = function_test, args = ())
thread1.start()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.