1

This code converts pandas to flink table do the transformation than again converting back to pandas. It perfectly works fine when I use filter filter than select but gives me an error when i add group_by and order_by.

import pandas as pd
import numpy as np

f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)

df = pd.read_csv("dataBase/New/candidate.csv")

col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
       'candidate_middle_name', 'candidate_last_name', 'candidate_email',
       'created_date', 'last_modified_date', 'last_modified_by']

table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
    .filter("candidate_first_name === 'Libby'")\
    .group_by("candidate_id, candidate_source_id")\
    .select("candidate_id, candidate_source_id")\
    .order_by("candidate_id").to_pandas()

My error is

Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
    at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
    at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
    at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
    at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)
2
  • I'm pretty sure the SQL is not valid, you cannot do groupBy without any aggs...(altaugh the exception is not mentioned it), if you try remove the groupBy (but keep the orderBy) is it working? Commented Jul 16, 2020 at 13:39
  • no it doesn't work if i keep orderBy and remove groupBy Commented Jul 16, 2020 at 14:12

1 Answer 1

3

If you look in the documentation, you will see that with the Table API, ORDER BY is only supported for batch queries. If you switch to SQL, then you can have streaming queries that sort on an ascending time attribute.

Sorting by anything else in an unbounded streaming query is simply impossible, since sorting requires full knowledge of the input.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.