5

Using Apache Flink version 1.3.2 and Cassandra 3.11, I wrote a simple code to write data into Cassandra using Apache Flink Cassandra connector. The following is the code:

final Collection<String> collection = new ArrayList<>(50);
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<UUID, String>> dataStream = env
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<UUID, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<UUID, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return new Tuple2(
                                UUID.randomUUID(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });
        dataStream.print();
        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                .setHost("127.0.0.1")
                .build();
        env.execute();

Trying to run the same code using Apache Flink 1.4.2 (1.4.x), I got the error:

Error:(36, 22) java: cannot access org.apache.flink.streaming.api.scala.DataStream
  class file for org.apache.flink.streaming.api.scala.DataStream not found

on the line

CassandraSink.addSink(dataStream)
                    .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                    .setHost("127.0.0.1")
                    .build();

I think we have some dependency changes in Apache Flink 1.4.2 and it causes the problem.

I use the following dependencies imported in the code:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

How can I solve the error in Apache Flink version 1.4.2?

Update: In Flink 1.3.2, the class org.apache.flink.streaming.api.scala.DataStream<T> is in Java documents, but in version 1.4.2 there is no such class. see here

I tried the code example in Flink 1.4.2 documents for Cassandra connector but I got the same error, but the example worked with Flink 1.3.2 dependencies!

1 Answer 1

11

Besides all other dependencies make sure you have the Flink Scala dependency:

Maven

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.4.2</version>
</dependency>

Gradle

dependencies {
    compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.2'
..
}

I managed to get your example working with the following dependencies:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

Maven

<dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-cassandra_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>

</dependencies>
Sign up to request clarification or add additional context in comments.

1 Comment

This dependency was the root of all evil:flink-streaming-scala_2.11

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.