1

I am running flink from within eclipse where necessary jars have been fetched by Maven. My machine has a processor with eight cores and the streaming application I have to write reads lines from its input and calculates some statistics.

When I run the program on my machine, I expected flink to use all the cores of the CPU as well-threaded code. However, when I watch the cores, I see that only one core is being used. I tried many things and left in the following code my last try, i.e. setting the parallelism of the environment. I also tried to set it for the stream alone and so on.

public class SemSeMi {


    public static void main(String[] args) throws Exception {
        System.out.println("Starting Main!");

        System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
                .getLocalFileSystem().getWorkingDirectory());

        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        env.setParallelism(8);

        env.socketTextStream("localhost", 9999).flatMap(new SplitterX());

        env.execute("Something");       
    }

    public static class SplitterX implements
            FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // Do Nothing!

        }
    }
} 

I fed the programm with data using netcat:

 nc -lk 9999 < fileName

The question is how to make the program scale locally and use all available cores?

1 Answer 1

2

You don't have to specify the degree of parallelism explicitly. Jobs which are run with the default setting will set the parallelism automatically to the number of available cores.

In your case, the source will be run with parallelism of 1 since reading from a socket cannot be distributed. However, for the flatMap operation the system will instantiate 8 instances. If you turn on logging, then you will also see it. Now the input data is distributed to the flatMap tasks in a round-robin fashion. Each of the flatMap tasks is executed by an individual thread.

I would suspect that the reason why you only see load on a single core is because the SplitterX does not do any work. Try the following code which counts the number of characters in each String and then prints the result to the console:

public static void main(String[] args) throws Exception {
    System.out.println("Starting Main!");

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
        .getLocalFileSystem().getWorkingDirectory());

    StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print();

    env.execute("Something");
}

public static class SplitterX implements
    FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence,
                        Collector<Tuple2<String, Integer>> out) throws Exception {
        out.collect(Tuple2.of(sentence, sentence.length()));

    }
}

The numbers at the start of each line tell you which task printed the result.

Sign up to request clarification or add additional context in comments.

1 Comment

I intentionally emptied the program to isolate the problem. In my full version there is much to process. I am receiving the task number as expected, but the whole load is still on one core. To see it, you need to brake the output by printing only once for every million lines or so!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.