2

I have a usecase, where I have a source of data, let's say: every second, a new string is coming from that data source.

I want to create a pipeline, where if a new string arrives, it's pushed through that pipeline for processing.

I'd guess the Java 8 introduced Stream API could do exactly that, since it has convenient functions for processing data of an arbitrary collection, however I'd like to skip the part where I collect my data to a separate collection and dispatch the arriving data straight to the Stream I just created.

Is there any way to do that?

9
  • 6
    Java 8 streams are not for that, have a look at Reactive frameworks. Commented Jul 15, 2018 at 14:40
  • That should be possible. If you want to try, implement the Spliterator interface and use StreamSupport.stream(Spliterator, boolean) to create your stream. Commented Jul 15, 2018 at 14:44
  • You could do that with java streams, but it would be a hack. Commented Jul 15, 2018 at 14:44
  • 3
    The problems start with the part you have omitted from your question. What are you gonna do with your stream? “dispatch the arriving data straight to the Stream” is not an end in itself. Start discussing about the actual goal and you’ll learn why the stream is not the best choice. And no, reactive programming hasn’t to be “big”. Tasks like “whenever a new item arrives, do xyz” are the simplest example of it. Commented Jul 16, 2018 at 9:42
  • 2
    See java.util.concurrent.Flow Commented Jul 16, 2018 at 13:45

2 Answers 2

6

In order to do what you described you'd need some kind of blocking. I'd use a BlockingQueue (any kind would do - if you want to avoid collections, use SynchronousQueue, which has no internal state at all), and create an infinite Stream from it using Stream.generate.

Example:

class StreamableQueue<T> {

    private BlockingQueue<T> dataSource;

    Stream<T> asStream() {
        return Stream.generate(this::takeFromDataSource);
    }

    private T takeFromDataSource() {
        try {
            return dataSource.take();
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }
}

Of course, the BlockingQueue provided as dataSource to this class would need to be fed elements from a different thread.


EDIT: A minor addition - instead of using try-catch, you can use:

Sign up to request clarification or add additional context in comments.

Comments

0

Okay, so the answer to this particular case has become a bit different.

While the answer of Tomasz Linkowski seemed like a really nice solution, the main problem with it was the fact that my function is more sequential then BlockingQueues would suggest, which was detrimental for readability.

So I came up with Stream.Builder, which is actually just what I need and nothing more.

2 Comments

It is not as much about the data source being sequential as about being finite, because the only way to create a Stream using Stream.Builder is to know all data it should contain. This is something you have misspecified in your question, apparently. But I'm glad you've found a solution to your problem! :)
I'm sorry if I wasn't upfront enough. The I'd like to skip the part where I collect my data to a separate collection part made the solution easy to find, since almost the exact same words are used in the Stream Builder documentation

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.