0

I have made a SQL query which duplicates a row (actually delete and insert new ones) in a table while splitting its original TSRANGE field by given TIMESTAMPS. It works perfectly as far as I tested it. Below is a demo to illustrate what I mean and here is a fiddle for convenience:

-- demo initialisation
CREATE TABLE random_table (
    uid VARCHAR(36) PRIMARY KEY,
    id VARCHAR(20),
    tsrange_field TSRANGE
);

CREATE EXTENSION pgcrypto;  -- needed for `gen_random_uuid` function

INSERT INTO public.random_table (uid, id, tsrange_field)
VALUES (gen_random_uuid(), 'random_id', tsrange('2000-01-01', '2020-01-01', '[)'));


-- actual query
WITH splitters AS (
    SELECT uid, datetime
    FROM random_table
        JOIN unnest(ARRAY['2015-04-15'::timestamp, '2016-04-15'::timestamp, '2017-01-01'::timestamp, '2017-04-15'::timestamp]) datetime
            ON tsrange_field @> datetime
    WHERE id = 'random_id'
        AND (lower(random_table.tsrange_field) IS NULL OR lower(random_table.tsrange_field) != datetime)
        AND (upper(random_table.tsrange_field) IS NULL OR upper(random_table.tsrange_field) != datetime)
), to_be_splitted AS (
    DELETE FROM random_table
    USING splitters
    WHERE splitters.uid = random_table.uid
    RETURNING random_table.uid, id, tsrange_field
)
INSERT INTO random_table (uid, id, tsrange_field)
SELECT DISTINCT ON (id, tsrange_field)
    gen_random_uuid() AS uid, id,
    unnest(ARRAY[
        tsrange(
            CASE
                WHEN LAG(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime) IS NOT NULL
                    THEN LAG(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime)
                ELSE lower(tsrange_field)
            END,
            splitters.datetime,
            '[)'
        ),
        tsrange(
            splitters.datetime,
            CASE
                WHEN LEAD(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime) IS NOT NULL
                    THEN LEAD(splitters.datetime) OVER (PARTITION BY splitters.uid ORDER BY splitters.datetime)
                ELSE upper(tsrange_field)
            END,
            '[)'
        )
    ]) AS tsrange_field
FROM to_be_splitted JOIN splitters ON to_be_splitted.uid = splitters.uid
ORDER BY tsrange_field
RETURNING *;

Now I want to translate it in sqlalchemy, and this is where comes my problem. I produced the following code:

# pip install psycopg2, sqlalchemy
from datetime import datetime

from sqlalchemy import (and_, case, cast, column, Column, create_engine, delete, func,
                        insert, MetaData, or_, select, Table, VARCHAR)
from sqlalchemy.dialects.postgresql import array, TSRANGE, ARRAY

METADATA = MetaData()

RANDOM_TABLE = Table(
    'random_table', METADATA,
    Column('uid', VARCHAR(36), primary_key=True),
    Column('id', VARCHAR(20)),
    Column('tsrange_field', TSRANGE)
)

engine = create_engine('postgresql://test:test@localhost:5432/test')

def split_row(id, *datetimes):
    # this function contains the translation attempt
    splits = func.unnest([dt for dt in datetimes]).alias('datetime')
    datetime_col = column('datetime')

    splitters = (
        select([RANDOM_TABLE.c.uid, datetime_col])
        .select_from(RANDOM_TABLE.join(
            splits,
            onclause=RANDOM_TABLE.c.tsrange_field.op('@>')(datetime_col)
        ))
        .where(and_(
            RANDOM_TABLE.c.id == id,
            or_(func.lower(RANDOM_TABLE.c.tsrange_field) == None,
                func.lower(RANDOM_TABLE.c.tsrange_field).op('!=')(datetime_col)),
            or_(func.upper(RANDOM_TABLE.c.tsrange_field) == None,
                func.upper(RANDOM_TABLE.c.tsrange_field).op('!=')(datetime_col)),
        ))
    ).cte('splitters')

    to_be_split = (
        delete(RANDOM_TABLE)
        .where(splitters.c.uid == RANDOM_TABLE.c.uid)
        .returning(RANDOM_TABLE.c.uid, RANDOM_TABLE.c.tsrange_field)
    ).cte('to_be_split')

    window_params = {'partition_by': column('uid'),
                     'order_by': datetime_col}
    previous_splitter = func.lag(datetime_col).over(**window_params)
    next_splitter = func.lead(datetime_col).over(**window_params)

    lower_bound_case = case(
        [(previous_splitter != None, previous_splitter)],
        else_=func.lower(column('tsrange_field'))
    )
    upper_bound_case = case(
        [(next_splitter != None, next_splitter)],
        else_=func.upper(column('tsrange_field'))
    )

    split_tsranges = [
        func.tsrange(lower_bound_case, datetime_col, '[)'),
        func.tsrange(datetime_col, upper_bound_case, '[)')
    ]

    split_query = select([
        func.gen_random_uuid().label('uid'),
        func.unnest(split_tsranges).alias('tsrange_field')  # does not work
    ]).distinct(
        column('tsrange_field')
    ).select_from(
        to_be_split
        .join(splitters, onclause=to_be_split.c.uid == splitters.c.uid)
    ).order_by(
        column('tsrange_field')
    )

    whole_query = (
        insert(RANDOM_TABLE)
        .from_select([column('uid'), column('tsrange_field')], split_query)
        .returning(column('uid'), column('tsrange_field'))
    )
    return whole_query


with engine.connect() as conn:
    query = split_row('random_id', datetime.now())
    import pdb; pdb.set_trace()
    print(conn.execute(query).fetchall())

Unforunately it fails with the following message:

Traceback (most recent call last):
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
psycopg2.ProgrammingError: can't adapt type 'Function'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "stuck.py", line 85, in <module>
    print(conn.execute(query).fetchall())
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 383, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 128, in reraise
    raise value.with_traceback(tb)
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/home/tryph/sql_split/.env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'Function'
[SQL: WITH splitters AS 
(SELECT random_table.uid AS uid, datetime 
FROM random_table JOIN unnest(%(unnest_3)s) AS datetime ON random_table.tsrange_field @> datetime 
WHERE random_table.id = %(id_1)s AND (lower(random_table.tsrange_field) IS NULL OR (lower(random_table.tsrange_field) != datetime)) AND (upper(random_table.tsrange_field) IS NULL OR (upper(random_table.tsrange_field) != datetime))), 
to_be_split AS 
(DELETE FROM random_table USING splitters WHERE splitters.uid = random_table.uid RETURNING random_table.uid, random_table.tsrange_field)
 INSERT INTO random_table (uid, tsrange_field) SELECT DISTINCT ON (tsrange_field) gen_random_uuid() AS uid, tsrange_field.unnest_1 
FROM unnest(%(unnest_2)s) AS tsrange_field, to_be_split JOIN splitters ON to_be_split.uid = splitters.uid ORDER BY tsrange_field RETURNING uid, tsrange_field]
[parameters: {'unnest_2': [<sqlalchemy.sql.functions.Function at 0x7fba18954a20; tsrange>, <sqlalchemy.sql.functions.Function at 0x7fba18954b00; tsrange>], 'unnest_3': [datetime.datetime(2019, 5, 3, 23, 37, 1, 773118)], 'id_1': 'random_id'}]
(Background on this error at: http://sqlalche.me/e/f405)

Looking at the produced SQL, I noticed that the unnest(ARRAY[...]) AS tsrange_field in the SELECT of my original SQL query is rendered in the FROM clause and I can't figure out why. In addition, the message psycopg2.ProgrammingError: can't adapt type 'Function' does not provides a great help and seems not related to the unnest misrendering.

Any hint on what happens and how to fix it would be greatly appreciated.

1 Answer 1

1

As you've noted, the problematic part is

split_query = select([
        func.gen_random_uuid().label('uid'),
        func.unnest(split_tsranges).alias('tsrange_field')  # does not work
    ])

and the problem is that FunctionElement.alias() is used to produce a named alias suitable for FROM clause, and so SQLAlchemy moves it there. Use label('tsrange_field') instead to produce AS tsrange_field. The other problem is that SQLAlchemy passes the list as is to the DB-API driver, which does not know what to do with SQLAlchemy constructs. Wrap the list in a call to array(), so that SQLAlchemy renders an ARRAY literal with the nested expressions:

split_tsranges = array([
        func.tsrange(lower_bound_case, datetime_col, '[)'),
        func.tsrange(datetime_col, upper_bound_case, '[)')
    ])
Sign up to request clarification or add additional context in comments.

1 Comment

I should have seen the alias problem by myself... thanks a lot for this very helpful answer. It now works perfectly

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.