6

I am practising some entry-level java 8 lambda functionality.

Given a list of messages, each containing a message offset, where all offsets must form a consecutive list of integers, I'm trying to find gaps to warn about. I get the feeling this all should be well doable with a nice lambda. But I can't get my head around it.

So, there's this working snippet:

private void warnAboutMessageGaps(final List<Message> messages) {

    final List<Long> offsets = messages.stream()
            .sorted(comparingLong(Message::getOffset))
            .map(Message::getOffset)
            .collect(toList())
            ;

    for (int i = 0; i < offsets.size() - 1; i++) {
        final long currentOffset = offsets.get(i);
        final long expectedNextOffset = offsets.get(i) + 1;
        final long actualNextOffset = offsets.get(i + 1);
        if (currentOffset != expectedNextOffset) {
            LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1);
        }
    }
}

What I can't figure out is how to make it so that I can do the "compare with previous/next object" in the lambda. Any pointers would be appreciated.

/edit: Suggestions about StreamEx and other third-party solutions, while appreciated, are not what I was looking for.

3
  • 5
    This is something streams are not very good at, because it wouldn't parallelize very well if you had to look at previous or next objects in the stream. Commented Mar 15, 2017 at 13:26
  • 1
    Seems like you need symmetric difference between two sets and guava can help with that: Set<Long> all = LongStream.range(offsets.get(0), offsets.get(offsets.size() - 1)).boxed().collect(Collectors.toSet()); System.out.println(Sets.symmetricDifference(all, ImmutableSet.copyOf(offsets))); Commented Mar 15, 2017 at 13:35
  • Why .sorted(comparingLong(Message::getOffset)) .map(Message::getOffset) instead of .map(Message::getOffset).sorted()? Commented Mar 16, 2017 at 11:49

6 Answers 6

3

You can do it with StreamEx using a pairMap method:

StreamEx.of(messages)
        .sorted(Comparator.comparingLong(Message::getOffset))
        .pairMap((prev, next) -> new Message[] {prev, next})
        .forEach(prevNext -> {
            long currentOffset = prevNext[0].getOffset();
            long expectedNextOffset = prevNext[0].getOffset() + 1;
            long actualNextOffset = prevNext[1].getOffset();
            if (currentOffset != expectedNextOffset) {
                LOG.error(
                    "Missing offset(s) found in messages: missing from {} to {}",
                    currentOffset + 1, actualNextOffset - 1);
            }
        });
Sign up to request clarification or add additional context in comments.

Comments

3

Sometimes, attempting to do everything with lambda expressions makes solutions more complicated. You can use:

messages.stream()
    .mapToLong(Message::getOffset)
    .sorted()
    .forEachOrdered(new LongConsumer() {
        boolean first=true;
        long expected;
        public void accept(long value) {
            if(first) first=false;
            else if(value!=expected)
                LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                          expected, value);
            expected=value+1;
        }
    });

but note that regardless of how fluent the stream chain may look like, sorted() is a stateful intermediate operation which creates and uses a backing array behind the scenes. You’re not loosing anything, if you use that array explicitly:

long[] l = messages.stream().mapToLong(Message::getOffset).toArray();
Arrays.sort(l);
for(int ix=1; ix<l.length; ix++) {
    long value = l[ix], expected = l[ix-1]+1;
    if(value!=expected)
        LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                  expected, value);
}

It’s hard to find a simpler solution. But if you want to reduce the amount of memory needed, you can use a BitSet instead of an array:

OptionalLong optMin = messages.stream().mapToLong(Message::getOffset).min();
if(!optMin.isPresent()) return;
long min = optMin.getAsLong();
BitSet bset = messages.stream()
    .mapToLong(Message::getOffset)
    .collect(BitSet::new, (bs,l) -> bs.set((int)(l-min)), BitSet::or);
for(int set=0, clear; set>=0; ) {
    clear = bset.nextClearBit(set);
    set = bset.nextSetBit(clear);
    if(set >= 0)
        LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                  min+clear, min+set);
}

This will reduce the used memory significantly in the cases where no gaps or reasonably small gaps, compared to the value range of the offsets, occur. It fails when the distance between the smallest offset and the largest offset is greater than Integer.MAX_VALUE.

You might check that beforehand, which also opens the opportunity to short-cut if there are no gaps at all:

LongSummaryStatistics stat = messages.stream()
    .mapToLong(Message::getOffset).summaryStatistics();
if(stat.getCount()==0 ||
   // all solutions assume that there are no duplicates, in this case,
   // the following test allows to prove that there are no gaps:
   stat.getMax()-stat.getMin()==messages.size()-1) {
    return;
}

if(stat.getMax()-stat.getMin()>Integer.MAX_VALUE) {
    // proceed with array based test
    …
}
else {
    long min = stat.getMin();
    // proceed with BitSet based test
    …

2 Comments

Thanks for that very complete answer. Yes, I know that I shouldn't make those lambda expressions my hammer and then expect every challenge to behave like a nail :) The thing is, this did look very much like a nail to me. But that's probably because I'm simply awestruck by the usefulness of this particular hammer. I will play with some of your suggestions to learn from, although I went with a non-lambda approach in my actual code.
"Sometimes, attempting to do everything with lambda expressions makes solutions more complicated." Obviously the problem is to solve with one sorted() and one reduce()so no…
2

What about it:

        List<Long> offsets = messages.stream()
                .sorted(comparingLong(Message::getOffset))
                .map(Message::getOffset)
                .collect(toList());

        IntStream.range(1, offsets.size())
                .mapToObj(i -> new Pair<>(offsets.get(i - 1), offsets.get(i)))
                .forEach(pair -> {
                    final long currentOffset = pair.getKey();
                    final long expectedNextOffset = pair.getKey() + 1;
                    final long actualNextOffset = pair.getValue();
                    if (actualNextOffset != expectedNextOffset) {
                        LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1);
                    }
                });

Comments

1

for the present problem, this approach seems to be more suitable

messages.stream().sorted( Comparator.comparingLong( Message::getOffset ) )
  .reduce( (m1, m2) -> {
    if( m1.getOffset() + 1 != m2.getOffset() )
      LOG.error( "Missing offset(s) found in messages: missing from {} to {}", m1.getOffset(), m2.getOffset() );
    return( m2 );
  } );

This solution uses reduce away from its intended use. It solely uses the ability of reduce to go over all the pairs in a stream.
The result of reduce is not used. (It would be impossible to use the result any further, because that would require a mutable reduction.)

7 Comments

This is abusing reduce for a side effect and regarding the side effect, it’s violating the contract which requires it to be associative. For a function f and a sorted stream a b c, being associative means that it must produce the right result when being evaluated, e.g. as f(a, f(b, c)) rather than the f(f(a, b), c) you expect.
Interesting question. I thought != is associativ.
The problem is not the use of !=. The problem is that you are returning the second argument, so in case of f(a, f(b, c)), you are comparing a and c rather than a and b. Obviously, returning the first argument wouldn’t work either. To use reduction for a check of consecutive elements, you would need to return a range, expressed by its first and last element, like map(element -> Range.of(element, element)) .reduce((r1,r2) -> Range.of(r1.first, r2.last))
I know what You mean. But is that not just getting a problem for the use of a parallelStream – what definitly is not possible? Btw: I don't use the result from reduce at all.
With more than two elements, your own reduction function will use the result of a previous evaluation. That’s what I meant with f(a, f(b, c)) vs. f(f(a, b), c). The specification never says that you can expect a particular evaluation order for a sequential stream (such a statement has been deliberately removed from the documentation before the release). So if it happens to do the desired thing, you have a solution that only works with a particular implementation for a sequential stream. Requiring the reader to understand these limitations by looking at it, makes it everything but “easy”.
|
1

For the sake of learning the Java 8 api, you can use a Collector where you essentially comparing each member of the stream in turn, and you use the accumulator class BadPairs to keep track of any gaps in the sequence of offsets.

I have written this to be more verbose than it needs to be in order to help you understand the relationship between the supplier, accumulator, and combiner lambdas.

public class PairedStreamTest {

    private BiConsumer<BadPairs,BadPairs> combiner = (bad1,bad2) -> bad1.add(bad2);

    private Supplier<BadPairs> supplier = BadPairs::new;

    private BiConsumer<BadPairs,Message> accumulator = (bad,msg) -> bad.add(msg);

    @Test
    public void returnsTwoBadPairs_givenInputStreamIsMissingOffsets_forFourAndSix() throws Exception {

        BadPairs badPairs = Stream.of(new Message(1), new Message(2), new Message(3), new Message(5), new Message(7))
                .sorted(comparingLong(Message::getOffset))
                .collect(supplier, accumulator, combiner);

        badPairs.pairs.forEach(pair ->
                LOG.error("Missing offset(s) found in messages: missing from {} to {}", pair.first.offset, pair.second.offset));

        assertTrue(badPairs.pairs.size() == 2);
    }

    // supporting classes for the above test code

    private final Logger LOG = LoggerFactory.getLogger(PairedStreamTest.class);

    class Message {
        public int offset;
        public Message(int i) {
            this.offset = i;
        }
        public Integer getOffset() {
            return this.offset;
        }
    }

    class Pair {
        private Message first;
        private Message second;
        public Pair(Message smaller, Message larger) {
            this.first = smaller;
            this.second = larger;
        }
    }

    class BadPairs {
        public Message previous;
        public Set<Pair> pairs = new HashSet<>();
        public void add(BadPairs other) {
            this.pairs.addAll(other.pairs);
        }
        public void add(Message msg) {
            if(previous != null && previous.offset != msg.offset-1) {
                this.pairs.add(new Pair(previous, msg));
            }
            this.previous = msg;
        }
    }
}

Please excuse the poor use of public member variables, and layout of this Test class. My intention is to initially focus the reader on the @Test case rather than supporting classes.

1 Comment

I was afraid I was going to have to go to something like this, and as I wanted to learn more about custom collectors anyway I'm going to give this a try. Thanks for that verbosity! :) However, it feels incredibly clunky to have to go through all that, for my productino code I'll just stick to the original code I guess.
1

How about:

final List<Long> offsets = messages.stream().map(Message::getOffset).sorted().collect(toList());
IntStream.range(0, offsets.size() - 1).forEach(i -> {
    long currentOffset = offsets.get(i);
    if (offsets.get(i + 1) != currentOffset + 1) {
        LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, offsets.get(i + 1) - 1);
    }
});

Or All in one statement by StreamEx:

StreamEx.of(messages).mapToLong(Message::getOffset).sorted().boxed()
          .pairMap((i, j) -> new long[] { i, j }).filter(a -> a[1] - a[0] > 1)
          .forEach(a -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", a[0] + 1, a[1] - 1));

Or All in one statement by abacus-common:

Stream.of(messages).mapToLong(Message::getOffset).sorted()
          .sliding0(2).filter(e -> e.size() == 2 && e.get(1) - e.get(0) > 1)
          .forEach(e -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", e.get(0) + 1, e.get(1) - 1));

1 Comment

Thank you for all those examples! I'm sure they work, but the "native" example is almost the same as the existing code (just another way of writing a for loop). Thanks though, I hadn't seen that before. About the StreamEx: I saw that when I was googling before, but I wanted to learn Java. (I might have mentioned it though.)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.