2

I am trying to use table and sql api of the flink for a simple example where I read the string from file, convert it to Tuple2 and try to insert it into table. Here is my code.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class table_streaming_test
{
    public static void main (String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
        StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env); 
        env.setParallelism(1);
        DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
         DataStream<Tuple2<String,Integer>> ds=  datastream_in
             .flatMap(new Splitter());  // transformation flatmap
         Table msg=tEnv.fromDataStream(ds).as("symbol,price");
         Table result = msg.select("symbol ='A'");
         DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
         ds2.print();
         env.execute();
    }
public static class Splitter implements FlatMapFunction<String,     Tuple2<String, Integer>> {
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] token= sentence.split(",");
            out.collect(new Tuple2<String, Integer>(token[0],Integer.parseInt(token[1])));
        }
    }
}

Errors are following: (occured at line DataStream<String> ds2 =tEnv.toDataStream(result, String.class); )

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.table.Table.<init>(Lorg/apache/flink/api/table/TableEnvironment;Lorg/apache/flink/api/table/plan/logical/LogicalNode;)V
    at org.apache.flink.api.table.StreamTableEnvironment.ingest(StreamTableEnvironment.scala:97)
    at org.apache.flink.api.java.table.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:61)
    at table_streaming_test.main(table_streaming_test.java:87)

Jar files included are:

  1. Flink-dist_2.10-1.1.3.jar
  2. flink-python_2.10-1.1.3.jar
  3. flink-table_2.10-1.1.3.jar
  4. log4j-1.2.17.jar
  5. slf4j-log4j12-1.7.7.jar
  6. JavaSE-1.7

JavaSE-1.7

5
  • Can you convert the code and the exception from the screenshots into text? There are some parts which are cut off and it would be easier to search and copy. Thanks Commented Dec 21, 2016 at 9:07
  • Sure, Fabian. I have edited the post. Commented Dec 21, 2016 at 10:01
  • Could you please give an example of stocks.txt ? Commented Aug 17, 2017 at 14:55
  • What build tool do you use ? Commented Oct 25, 2017 at 8:23
  • Possible duplicate of java.lang.NoSuchMethodError in Flink Commented Oct 26, 2017 at 12:58

2 Answers 2

8

One possible cause for error "java.lang.NoSuchMethodError" is when you use different version of flink than what you have installed on your system.

For me, I have Flink 1.4.2 and the version I was using was 1.3.2 . So I updated my pom file to have same version and it worked fine.

Sign up to request clarification or add additional context in comments.

Comments

0

I think you need to use shade plugin and follow this step to resolve issue

https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html#resolving-dependency-conflicts-with-flink-using-the-maven-shade-plugin

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.