40

I'm interested in separating a stream into two or more substreams, and processing the elements in different ways. For example, a (large) text file might contain lines of type A and lines of type B, in which case I'd like to do something like:

File.lines(path)
.filter(line -> isTypeA(line))
.forEachTrue(line -> processTypeA(line))
.forEachFalse(line -> processTypeB(line))

The previous is my attempt at abstracting the situation. In reality I have a very large text file where each line is testing against a regex; if the line passes, then it is processed, whereas if it is rejected, then I want to update a counter. This further processing on rejected strings is why I don't simply use filter.

Is there any reasonable way to do this with streams, or will I have to fallback to loops? (I would like this to run in parallel as well, so streams are my first choice).

1
  • 4
    You can use a partitioningBy collector but you'll have a temporary map data holder. Commented Mar 7, 2016 at 21:39

8 Answers 8

28

Java 8 streams weren't designed to support this kind of operation. From the jdk:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream.

If you can store it in memory you can use Collectors.partitioningBy if you have just two types and go by with a Map<Boolean, List>. Otherwise use Collectors.groupingBy.

Sign up to request clarification or add additional context in comments.

Comments

24

Simply test each element, and act accordingly.

lines.forEach(line -> {
    if (isTypeA(line)) processTypeA(line);
    else processTypeB(line);
});

This behavior could be hidden in a helper method:

public static <T> Consumer<T> branch(Predicate<? super T> test, 
                                     Consumer<? super T> t, 
                                     Consumer<? super T> f) {
    return o -> {
        if (test.test(o)) t.accept(o);
        else f.accept(o);
    };
}

Then the usage would look like this:

lines.forEach(branch(this::isTypeA, this::processTypeA, this::processTypeB));

Tangential Note

The Files.lines() method does not close the underlying file, so you must use it like this:

try (Stream<String> lines = Files.lines(path, encoding)) {
  lines.forEach(...);
}

Variables of Stream type throw up a bit of a red flag for me, so I prefer to manage a BufferedReader directly:

try (BufferedReader lines = Files.newBufferedReader(path, encoding)) {
    lines.lines().forEach(...);
}

2 Comments

this won't work, because lambda body should be a statement. You will have here compiler error - boolean cannot be converted to void
@VolodymyrReda Thanks! I have updated with a working solution.
6

While side effects in behavioral parameters are discouraged, they are not forbidden, as long as there’s no interference, so the simplest, though not cleanest solution is to count right in the filter:

AtomicInteger rejected=new AtomicInteger();
Files.lines(path)
    .filter(line -> {
        boolean accepted=isTypeA(line);
        if(!accepted) rejected.incrementAndGet();
        return accepted;
})
// chain processing of matched lines

As long as you are processing all items, the result will be consistent. Only if you are using a short-circuiting terminal operation (in a parallel stream), the result will become unpredictable.

Updating an atomic variable may not be the most efficient solution, but in the context of processing lines from a file, the overhead will likely be negligible.

If you want a clean, parallel friendly solution, one general approach is to implement a Collector which can combine the processing of two collect operations based on a condition. This requires that you are able to express the downstream operation as a collector, but most stream operations can be expressed as collector (and the trend is going towards the possibility to express all operation that way, i.e. Java 9 will add the currently missing filtering and flatMapping.

You’ll need a pair type to hold two results, so assuming a sketch like

class Pair<A,B> {
    final A a;
    final B b;
    Pair(A a, B b) {
        this.a=a;
        this.b=b;
    }
}

the combining collector implementation will look like

public static <T, A1, A2, R1, R2> Collector<T, ?, Pair<R1,R2>> conditional(
        Predicate<? super T> predicate,
        Collector<T, A1, R1> whenTrue, Collector<T, A2, R2> whenFalse) {
    Supplier<A1> s1=whenTrue.supplier();
    Supplier<A2> s2=whenFalse.supplier();
    BiConsumer<A1, T> a1=whenTrue.accumulator();
    BiConsumer<A2, T> a2=whenFalse.accumulator();
    BinaryOperator<A1> c1=whenTrue.combiner();
    BinaryOperator<A2> c2=whenFalse.combiner();
    Function<A1,R1> f1=whenTrue.finisher();
    Function<A2,R2> f2=whenFalse.finisher();
    return Collector.of(
        ()->new Pair<>(s1.get(), s2.get()),
        (p,t)->{
            if(predicate.test(t)) a1.accept(p.a, t); else a2.accept(p.b, t);
        },
        (p1,p2)->new Pair<>(c1.apply(p1.a, p2.a), c2.apply(p1.b, p2.b)),
        p -> new Pair<>(f1.apply(p.a), f2.apply(p.b)));
}

and can be used, for example for collecting matching items into a list and counting the non-matching, like this:

Pair<List<String>, Long> p = Files.lines(path)
  .collect(conditional(line -> isTypeA(line), Collectors.toList(), Collectors.counting()));
List<String> matching=p.a;
long nonMatching=p.b;

The collector is parallel friendly and allows arbitrarily complex delegate collectors, but note that with the current implementation, the stream returned by Files.lines might not perform so well with parallel processing, compare to “Reader#lines() parallelizes badly due to nonconfigurable batch size policy in its spliterator”. Improvements are scheduled for the Java 9 release.

Comments

3

The way I'd deal with this is not to split this up at all, but rather, write

Files.lines(path)
   .map(line -> {
      if (condition(line)) {
        return doThingA(line);
      } else {
        return doThingB(line);
      }
   })...

Details vary depending on exactly what you want to do and how you plan to do it.

2 Comments

Only if doThing{A,B} are actually functions. If the intent is to have different side-effects for types A and B, this is not the way you want to do it.
@BrianGoetz Then I'd use peek, probably.
3

Here's an approach (which ignores the cautions about forcing conditional processing into a stream) that wraps a predicate and consumer into a single predicate-with-side-effect:

public static class StreamProc {

    public static <T> Predicate<T> process( Predicate<T> condition, Consumer<T> operation ) {
        Predicate<T> p = t -> { operation.accept(t); return false; };
        return (t) -> condition.test(t) ? p.test(t) : true;
    }

}

Then filter the stream:

someStream
    .filter( StreamProc.process( cond1, op1 ) )
    .filter( StreamProc.process( cond2, op2 ) )
    ...
    .collect( ... )

Elements remaining in the stream have not yet been processed.

For example, a typical filesystem traversal using external iteration looks like

File[] files = dir.listFiles();
for ( File f : files ) {
    if ( f.isDirectory() ) {
        this.processDir( f );
    } else if ( f.isFile() ) {
        this.processFile( f );
    } else {
        this.processErr( f );
    }
}

With streams and internal iteration this becomes

Arrays.stream( dir.listFiles() )
    .filter( StreamProc.process( f -> f.isDirectory(), this::processDir ) )
    .filter( StreamProc.process( f -> f.isFile(), this::processFile ) )
    .forEach( f -> this::processErr );

I would like Stream to implement the process method directly. Then we could have

Arrays.stream( dir.listFiles() )
    .process( f -> f.isDirectory(), this::processDir ) )
    .process( f -> f.isFile(), this::processFile ) )
    .forEach( f -> this::processErr );

Thoughts?

Comments

2

It seems that in reality you do want to process each line, but process it differently based on some condition (type).

I think this is more or less functional way to implement it would be:

public static void main(String[] args) {
    Arrays.stream(new int[] {1,2,3,4}).map(i -> processor(i).get()).forEach(System.out::println);
}

static Supplier<Integer> processor(int i) {
    return tellType(i) ? () -> processTypeA(i) : () -> processTypeB(i);
}

static boolean tellType(int i) {
    return i % 2 == 0;
}

static int processTypeA(int i) {
    return i * 100;
}

static int processTypeB(int i) {
    return i * 10;
}

Comments

1

Well, you can simply do

Counter counter = new Counter();
File.lines(path)
    .forEach(line -> {
        if (isTypeA(line)) {
            processTypeA(line);
        }
        else {
            counter.increment();
        }
    });

Not very functional-style, but it does it in a similar way as your example. Of course, if parallel, both Counter.increment() and processTypeA() have to be thread-safe.

Comments

0

@tom

What about this:

Arrays.stream( dir.listFiles() )
    .peek(  f -> { if(f.isDirectory()) { processDir(f); }} )
    .peek(  f -> { if(f.isFile())      { processFile(f);}}) )
    .forEach( f -> this::processErr );

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.