1

I'm using Scala and Flink 1.0-SNAPSHOT to perform a leftOuterJoin on a DataSet, and I get the following exception:

    11:54:15,921 INFO  org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241)
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)

I use a simple Scala case class as the type of the DataSet:

case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String)

I use the following method to generate the case class instances:

def getRawValuesFromZipFile(fileName: String) : Array[RawValue]

I initialise the environment and create the DataSet[RawValue] the following way:

val env = ExecutionEnvironment.createLocalEnvironment(4)
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip"))
rawValues.print

I suspect a serialisation issue is causing the error, I'm using Scala 2.10.5 and Java 7 system libraries to compile the project. I'm using Eclipse, the project was generated by the sample project generation script.

Any help or hints on resolving the issue would be greatly appreciated :-) Thanks, Daniel

4
  • How big is the data that you read from your zip file? Commented Oct 30, 2015 at 11:42
  • About 22Mb compressed, 90Mb uncompressed and 3065295 RawValue records. Thanks! Commented Oct 30, 2015 at 12:29
  • If I run it on a hard-coded sample Array[RawValue] of 4 element, the join runs fine. Investigating further. Commented Oct 30, 2015 at 12:45
  • Also, if I do a take(20000), it also works. Probably I'm hitting heap limits with the full set of input data. Now I need to figure out how I can split up the input data in a meaningful and workable way :-) Thank you for the hint :-) Commented Oct 30, 2015 at 12:52

1 Answer 1

3

The env.fromCollection() call might not really be suited for your use case. It breaks if the data becomes to big because the data is shipped with the job. Data is not read in parallel on the worker nodes.

You could look at this: https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#read-compressed-files and see if it works for your case. It only supports gzip, but maybe you could compress your data with that format instead.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.