1

I am trying to read data from a kafka topic do some processing and dump the data into elasticsearch. But I could not find example in python ti use Elastisearch as sink. Can anyone help me with a snippet for the same.

# add kafka connector dependency
    kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
                                    'flink-sql-connector-kafka_2.11-1.14.0.jar')

   
    tbl_env.get_config()\
            .get_configuration()\
            .set_string("pipeline.jars", "file://{}".format(kafka_jar))

Following is the error..

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:399)
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:583)
        ... 31 more
1
  • Were you able to find a solution? Commented Apr 6, 2022 at 5:39

1 Answer 1

-1

Refer to https://nightlies.apache.org/flink/flink-docs-release-1.14/api/python/pyflink.datastream.html#pyflink.datastream.connectors.JdbcSink

kafka to mysql

import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf, TableFunction, ScalarFunction


env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
   env,
   environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

sourceKafkaDdl = """
    create table sourceKafka(
        ID varchar comment '',
        TRUCK_ID varchar comment '',
        SPEED varchar comment '',
        GPS_TIME varchar comment ''
    )comment 'get from kafka' 
    with(
        'connector' = 'kafka',
        'topic' = 'pyflink_test',        
        'properties.bootstrap.servers' = '***:9092',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
    """

mysqlSinkDdl = """
    CREATE TABLE mysqlSink (
        id varchar, 
        truck_id varchar
    ) 
    with (
        'connector.type' = 'jdbc',  
        'connector.url' = 'jdbc:mysql://***:***/test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false' ,
        'connector.username' = '**' ,
        'connector.password' = '**', 
        'connector.table' = 'mysqlsink' ,
        'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,
        'connector.write.flush.interval' = '5s', 
        'connector.write.flush.max-rows' = '1'
    )
"""

t_env.execute_sql(sourceKafkaDdl)
t_env.execute_sql(mysqlSinkDdl)

t_env.from_path('sourceKafka')\
   .select("ID,TRUCK_ID")\
   .insert_into("mysqlSink")

t_env.execute("pyFlink_mysql")
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for the help. I was able to get it working in my Ubuntu vm. But the kafka connector jar is not recognised in my python script whn i run in windows. Any idea why that might be? I have edited my question to show how im adding the jar.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.