1

I am currently trying to operate a SocketChannel from one thread (I previously achieved what I wanted to do with two thread and regular sockets, but two threads per client seemed a little excessive). I want to be able to read when there is data to read (selector works fine for that). I only want to write when there are items in a blocking queue (in my example, I have frame queue).

        @Override
        public void run() {
            super.run();

            SelectionKey readKey = null;
            try {
                final int interests = SelectionKey.OP_READ;
                socketChannel.configureBlocking(false);
                readKey = socketChannel.register(selector, interests);
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    close();
                } catch (Exception e1) {
                    throw new RuntimeException("FAILURE");
                }
            }

            if (null != readKey) {
                while (running) {
                    try {
                        System.out.println("LOOP ENTRY");
                        selector.select();

                        if (readKey.isReadable()) {
                            System.out.println("IS READABLE");
                        }

                        if (readKey.isWritable() && (null != framesQueues.peek())) {
                            System.out.println("IS WRITEABLE");
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

Now, what it does is, it loops furiously without stopping, and that's obviously bad. I'm looking for a way I could have my selector wakeup when there is an item in my blocking queue, or when there are bytes to read. Is there a tooling in NIO that allows that?

If not, what could I implement? Am I doomed of using two threads per client? It's for a hobby project, but I'm trying to achieve as low latency as possible, so looping with a sleep is not what I want.

4
  • if it is a hobby project then why would two threads per client be excessive? Commented May 12, 2017 at 2:14
  • Because it seems inefficient, and I'm trying to get the most performance out of it. I probably won't need to scale, but I want a solution that would allow me to scale (for the challenge). However, if two threads per client is the most efficient solution, then that's what it is! It seems the most natural to me at least, and it allows me to have a good performance for each client, that needs to be able to read and write asap. Commented May 12, 2017 at 2:18
  • ThreadPool ? Commented May 12, 2017 at 2:20
  • I would still need two threads per client no? Commented May 12, 2017 at 2:22

1 Answer 1

2

I was messing around with nio sockets, and I threw something together which is hopefully easy enough to understand. All you need to do is telnet localhost 5050. I don't have access to the rest of your code, so I have no idea what you're missing. I would assume that you aren't clearing the selected keys from the selector though, or possible not changing the interest ops to (READ) only once you are finished writing.

public static void main(String... args) throws IOException {
    final Selector selector = Selector.open();

    //every 10 seconds this thread will go through all the connections and
    //send "(x times) (date) to every client
    new Thread() {
        public void run() {
            for (int i = 0; selector.isOpen(); i++) {
                for (SelectionKey key : selector.keys()) {
                    if (key.channel() instanceof SocketChannel) {
                        ((Queue<ByteBuffer>) key.attachment()).add(ByteBuffer.wrap((i + " - " + new Date() + "\n").getBytes()));
                        key.interestOps(OP_READ | OP_WRITE); //enable write flag
                    }
                }

                selector.wakeup(); //wakeup so it can get to work and begin writing
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                }
            }
        }
    }.start();


    //create server on port 5050
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.bind(new InetSocketAddress(5050));
    server.register(selector, OP_ACCEPT);

    //reusable buffer
    final ByteBuffer readBuffer = ByteBuffer.allocate(0x1000);

    while (selector.isOpen()) {
        int selected = selector.select();
        System.out.println("Selected " + selected + (selected == 1 ? " key." : " keys."));
        if (selected > 0) {
            for (SelectionKey key : selector.selectedKeys()) {
                if (key.isValid() && key.isReadable()) {
                    System.out.println("Readable: " + key.channel());
                    SocketChannel socket = ((SocketChannel) key.channel());
                    readBuffer.clear();
                    int read = socket.read(readBuffer);
                    if (read == -1) {
                        System.out.println("Socket Closed " + key.channel());
                        socket.close();
                        continue; //socket is closed. continue loop
                    }

                    //we will add what the client sent to the queue to echo it back
                    if (read > 0) {
                        readBuffer.flip();
                        ByteBuffer buffer = ByteBuffer.allocate(readBuffer.remaining());
                        ((Queue<Buffer>) key.attachment()).add(buffer.put(readBuffer).flip());
                        key.interestOps(OP_WRITE | OP_READ); //enable write flag
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    System.out.println("Writable: " + key.channel());
                    SocketChannel socket = (SocketChannel) key.channel();

                    //retrieve attachment(ArrayBlockingQueue<ByteBuffer>)
                    Queue<Buffer> dataToWrite = (Queue<Buffer>) key.attachment();

                    //only remove from queue once we have completely written
                    //this is why we call peek first, and only remove once (buffer.remaining() == 0)
                    for (ByteBuffer buffer; (buffer = (ByteBuffer) dataToWrite.peek()) != null;) {
                        socket.write(buffer);
                        if (buffer.remaining() == 0) dataToWrite.remove();
                        else break; //can not write anymore. Wait for next write event
                    }

                    //once queue is empty we need to stop watching for write events
                    if (dataToWrite.isEmpty()) key.interestOps(OP_READ);
                }

                if (key.isValid() && key.isAcceptable()) {
                    System.out.println("Acceptable: " + key.channel());
                    SocketChannel socket = ((ServerSocketChannel) key.channel()).accept();
                    socket.configureBlocking(false);

                    //add a ArrayBlockingQueue<ByteBuffer> as an attachment for the socket
                    socket.register(selector, OP_READ, new ArrayBlockingQueue<ByteBuffer>(1000));
                }
            }
            selector.selectedKeys().clear(); //must clear all when finished or loop will continue selecting nothing
        }
    }
}
Sign up to request clarification or add additional context in comments.

1 Comment

This is exactly what I was looking for. I was not clearing the keys, therefore it was always looping, and I did not know I could change the interest keys. Now I don't need to block on the blocking queue, I can just change the interest whenever I add an item to queue. Since a duplex socket is always writable I found it a little useless to have a write interest set, but with manual triggers like that it's perfect. Thanks a lot!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.