2

Does there exist an elegant way to register callbacks when the last element of the stream has been processed and the stream is "consumed" fully?

Especially when the stream source (like a DB cursor, an Iterator<T>, or a custom Supplier<T>) is finite and can explicitly know when there is no more data.

Example:

public Stream<Row> query(String sql){
   Connection c = dataSource.openConnection();
   Stream<Row> rows = MyDB.query(sql).stream();
   c.close();
   return rows;
}

Now, it's futile to immediately close the connection, rather it would be great to schedule connection closure when the stream is fully consumed.

I know there is onClose() API but this relies on consumers explicitly call close() on the stream.

1

3 Answers 3

4

Call onClose, and document that the returned stream must be closed by caller.

/**
 * The returned stream must be closed.
 */
public Stream<Row> query(String sql){
    Connection c = dataSource.openConnection();
    return MyDB.query(sql).stream().onClose(() -> {
        try {
            c.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    });
}
Sign up to request clarification or add additional context in comments.

10 Comments

Does this imply the consumer of the stream has to call close() on the stream explicitly? For example, say the stream is constructed with an Iterator as the source. So, will the onClose automatically be called when the source Iterator's hasNext() return false ?
@S.D. Yes, caller must call close, or use try-with-resources, same as caller would need to do on any other resource-bound stream, like those returned by Files.lines(). There is no automatic "stream used up" callback, because a stream might never be used up.
@ernest_k Because a terminal operation may not fully consume the stream. Say you have a stream named s, and the following code: s.limit(10).forEach(...). That only consumes the first 10 values, so you can then do something else to process the rest. Only you know for sure when you're done with the stream, so it's your responsibility to close the stream.
@Andreas do you know any actual stream source allowing to “process the rest”?
@Slaw I suppose, the language is that weak, because Stream is an interface. The reference implementation will throw an exception when you try to use it more than once. A different thing could be a source that can return a new stream, however, there is no guaranty that the stream will fetch only those elements from the source, it consumes conceptionally, especially parallel streams do prefetching. For example, BufferedReader explicitly states, that it is in an undefined state, once its lines() stream has been processed. So no “process the rest” here. I don’t know of any counter-example.
|
4

You want to register a stream close handler:

public Stream<Row> query(String sql){
    Connection c = dataSource.openConnection();
    return MyDB.query(sql).stream().onClose(() -> c.close());
}

A caller of this method must excplicitely close the stream!

(Note: I left out exception handling here.)

Comments

0

A utility that works mostly, except when the returned stream is not iterated to the end:

public <T> Stream<T> wrap(Stream<T> stream, Runnable onEnd) {
    final Object endSignal = new Object();
    return Stream.concat(stream, Stream.of(endSignal))
            .peek(i -> {
                if(i == endSignal){onEnd.run();}
            })
            .filter(i -> i != endSignal)
            .map(i -> (T) i);
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.