5

I am new to Flink and I was following the SocketWindowWordCount example.

I am using Scala 2.11.8 and Flink 1.3.2 and try to run it on EMR, when I run the following code, it threw errors:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeinfo.TypeInformation

The main class looks like this:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkStreamingPOC {

  def main(args: Array[String]) : Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.readTextFile("s3a://somebucket/prefix")
    val counts = stream.flatMap{ _.split("\\W+") }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

build.sbt looks like this:

scalaVersion := "2.11.8"

val flinkVersion = "1.3.2"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion
)

I tried to import org.apache.flink.api.scala._ and org.apache.flink.streaming.api.scala._ but still got the same error message. Please suggest, thanks!

1
  • 1
    Try to add <dependency> <groupId>org.apache.flink</groupId <artifactId>flink-core</artifactId><version>1.3.2</version></dependency>, TypeInformation actually is in this package. Commented Oct 27, 2017 at 6:41

5 Answers 5

11

If you use IDEA, you can include dependencies with "Provided" scope.

enter image description here

Sign up to request clarification or add additional context in comments.

Comments

1

Open the build.sbt file and remove the provided from the dependencies

Comments

0

You might be having the same issue as I was having which basically involves adding the jar to /lib folder, please see here for more details. In case of Amazon EMR you are using flink Dashboard . As you can see /opt have all the required jars which you need to copy in lib folder

enter image description here

Comments

0

I encountered the same problem with class AverageSensorReadings in project: https://github.com/streaming-with-flink/examples-scala. It's a maven project, so I commented out all <scope>provided</scope> for every dependency in the pom file and it works.

Comments

0

The fat jar you built from a Flink project is supposed to run inside flink cluster environment, thus all Flink related dependencies would be provided by the environment.

Other answers suggest to simply comment out provided scope from the dependencies, thus include these dependencies into the fat jar. This may work, but not correct.

If you are running the jar with command like java --classpath target/your-project-jar.jar your.package.SocketWindowWordCount, then you are outside the Flink cluster environment.

The correct way is to use flink run ... command like ./bin/flink run -c your.package.SocketWindowWordCount target/your-project-jar.jar.

Try ./bin/flink run --help for details.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.