6

I would like to convert an iterator of Strings to Inputstream of bytes. Usually, I can do this by appending all the strings in a StringBuilder and doing: InputStream is = new ByteArrayInputStream(sb.toString().getBytes());

But I want to do it lazily because my iterable is provided by Spark and could be very large in length. I found this example to do it in Scala:

  def rowsToInputStream(rows: Iterator[String], delimiter: String): InputStream = {
  val bytes: Iterator[Byte] = rows.map { row =>
    (row + "\n").getBytes
  }.flatten

  new InputStream {
    override def read(): Int = if (bytes.hasNext) {
      bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
    } else {
      -1
    }
  }
}

But I could not find an easy way to convert this into Java. I converted the iterator to stream using Spliterators.spliteratorUnknownSize but then getBytes outputs an array which could not flatten easily. Overall it became pretty messy.

Is there an elegant way to do this in Java?

1
  • 3
    I'd probably do this by extending ByteArrayInputStream to take in Iterator<String> (and Charset) and handle the array switching internally. Commented Jun 1, 2020 at 18:01

2 Answers 2

5

If you want to have an InputStream supporting fast bulk operations, you should implement the
int read(byte[] b, int off, int len) method, which not only can be called directly by the code reading the InputStream, but is also the backend for the inherited methods

  • int read(byte b[])
  • long skip(long n)
  • byte[] readAllBytes() (JDK 9)
  • int readNBytes(byte[] b, int off, int len) (JDK 9)
  • long transferTo(OutputStream out) (JDK 9)
  • byte[] readNBytes(int len) (JDK 11)
  • void skipNBytes​(long n) (JDK 14)

which will work more efficiently when said method has an efficient implementation.

public class StringIteratorInputStream extends InputStream {
    private CharsetEncoder encoder;
    private Iterator<String> strings;
    private CharBuffer current;
    private ByteBuffer pending;

    public StringIteratorInputStream(Iterator<String> it) {
        this(it, Charset.defaultCharset());
    }
    public StringIteratorInputStream(Iterator<String> it, Charset cs) {
        encoder = cs.newEncoder();
        strings = Objects.requireNonNull(it);
    }

    @Override
    public int read() throws IOException {
        for(;;) {
            if(pending != null && pending.hasRemaining())
                return pending.get() & 0xff;
            if(!ensureCurrent()) return -1;
            if(pending == null) pending = ByteBuffer.allocate(4096);
            else pending.compact();
            encoder.encode(current, pending, !strings.hasNext());
            pending.flip();
        }
    }

    private boolean ensureCurrent() {
        while(current == null || !current.hasRemaining()) {
            if(!strings.hasNext()) return false;
            current = CharBuffer.wrap(strings.next());
        }
        return true;
    }

    @Override
    public int read(byte[] b, int off, int len) {
        // Objects.checkFromIndexSize(off, len, b.length); // JDK 9
        int transferred = 0;
        if(pending != null && pending.hasRemaining()) {
            boolean serveByBuffer = pending.remaining() >= len;
            pending.get(b, off, transferred = Math.min(pending.remaining(), len));
            if(serveByBuffer) return transferred;
            len -= transferred;
            off += transferred;
        }
        ByteBuffer bb = ByteBuffer.wrap(b, off, len);
        while(bb.hasRemaining() && ensureCurrent()) {
            int r = bb.remaining();
            encoder.encode(current, bb, !strings.hasNext());
            transferred += r - bb.remaining();
        }
        return transferred == 0? -1: transferred;
    }
}

A ByteBuffer basically is the combination of the byte buf[];, int pos;, and int count; variables of your solution. However, the pending buffer is only initialized if the caller truly uses the int read() method to read single bytes. Otherwise, the code creates a ByteBuffer that is wrapping the caller provided target buffer, to encode the strings directly into it.

The CharBuffer follows the same concept, just for char sequences. In this code, it will always be a wrapper around one of the strings, rather than a buffer with a storage of its own. So in the best case, this InputStream implementation will encode all iterator provided strings into caller provided buffer(s), without intermediate storage.

This concept does already imply lazy processing, as without intermediate storage, only as much as fitting into the caller provided buffer, in other words, as much as requested by the caller, will be fetched from the iterator.

Sign up to request clarification or add additional context in comments.

1 Comment

Great answer! I didn't see any performance improvement but that might be due to the fact that I'm passing this to CopyManager which is copying it to DB and might not be consuming fast enough. For 2M records it still takes 15sec. But it might help others so, I'll accept it. Thank you for your time.
2

As per @Kayaman's suggestion, I took a page from ByteArrayInputStream and handled switching of byte array using Iterator<String> manually. This one turned to be much more performant than the streams approach:

import java.io.InputStream;
import java.util.Iterator;

public class StringIteratorInputStream extends InputStream {
    protected byte buf[];
    protected int pos;
    protected int count;
    private Iterator<String> rows;

    public StringIteratorInputStream(Iterator<String> rows) {
        this.rows = rows;
        this.count = -1;
    }

    private void init(byte[] buf) {
        this.buf = buf;
        this.pos = 0;
        this.count = buf.length;
    }

    public int read() {
        if (pos < count) {
           return (buf[pos++] & 0xff);
        } else if (rows.hasNext()) {
            init(rows.next().getBytes());
            return (buf[pos++] & 0xff);
        } else {
            return -1;
        }
    }

}

I did not extend ByteArrayInputStream because it's read is synchronized and I didn't need that.

3 Comments

If you care for performance, you should override the read(byte[] b, int off, int len) method.
Could you explain how? I want to do it lazily and I'm not the one reading the input-stream, how will I make sure this method is used?
There is a high likelihood that the code using the InputStream will use this method (or one of the others that delegate to this method). But the int read() method has to be kept anyway. The implementation stays as lazy as your already existing method, just copy more than one byte at once into the target buffer. When the caller provided buffer is large enough, you could let the string write into that buffer directly.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.