Is it possible to sink processed stream data into a database using pyflink? All methods to write processed data are limited to save them in the txt, csv or Json formats and there is no way to sink data with database.
1 Answer
You could use SQL DDL within pyflink to define a JDBC table sink that you can then insert into. That will look something like this
my_sink_ddl = """
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
"""
t_env.sql_update(my_sink_ddl)
1 Comment
David Anderson
I don't understand the point about "where they should be placed". A sink is always a terminal node in the dataflow DAG. But no, there is no Flink connector for MongoDB.
write_to_socket()that is a sink (ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/…). I am not sure which version you are using and if this method changed for your version.