5

I have two threads that I'm dealing with Java NIO for non-blocking sockets. This is what the threads are doing:

Thread 1: A loop that calls on the select() method of a selector. If any keys are available, they are processed accordingly.

Thread 2: Occasionally registers a SocketChannel to the selector by calling register().

The problem is, unless the timeout for select() is very small (like around 100ms), the call to register() will block indefinitely. Even though the channel is configured to be nonblocking, and the javadocs state that the Selector object is thread safe (but it's selection keys are not, I know).

So anyone have any ideas on what the issue could be? The application works perfectly if I put everything in one thread. No problems occur then, but I'd really like to have separate threads. Any help is appreciated. I've posted my example code below:

Change the select(1000) to select(100) and it'll work. Leave it as select() or select(1000) and it won't.


import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();

public static void init() { initialized = true;

try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); }

Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); }

public static void shutdown() { initialized = false; }

private static void readData() { try { int numKeys = recvSelector.select(1000);

if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator();

while(i.hasNext())
{
 SelectionKey key = i.next();
 i.remove();

 if (key.isValid() && key.isReadable())
 {
  DatagramChannel channel = (DatagramChannel) key.channel();

  // allocate every time we receive so that it's a copy that won't get erased
  final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
  channel.receive(buffer);
  buffer.flip();
  final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();

  // let user handle event on a dedicated thread
  eventQueue.execute(new Runnable()
  {
   @Override
   public void run() 
   {
    subscriber.onData(buffer);
   }       
  });
 }
}

} } catch (IOException e) { System.err.println(e); }
}

public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } }

public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } }

public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } }

public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }


import java.nio.ByteBuffer;

public interface SocketSubscriber { public void onData(ByteBuffer data); }

Example usage:


public class Test implements SocketSubscriber
{
 public static void main(String[] args) throws Exception
 {
  UDPSocket.init();
  UDPSocket test = new UDPSocket("localhost", 1234);
  test.addListener(new Test());
  UDPSocket test2 = new UDPSocket("localhost", 4321);
  test2.addListener(new Test());
  System.out.println("Listening...");
  ByteBuffer buffer = ByteBuffer.allocate(500);
  test.send(buffer);
  buffer.rewind();
  test2.send(buffer);
  System.out.println("Data sent...");
  Thread.sleep(5000);
  UDPSocket.shutdown();
 }

@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }

1

2 Answers 2

6

The Selector has several documented levels of internal synchronization, and you are running into them all. Call wakeup() on the selector before you call register(). Make sure the select() loop works correctly if there are zero selected keys, which is what will happen on wakeup().

Sign up to request clarification or add additional context in comments.

7 Comments

I hit the same issue as the poster, but I'm not entirely happy with this solution. Because of thread scheduling, there's nothing preventing wakeup and reselect before the register could happen.
@DavidEhrmann if you have control on the code of the thread calling select() you could implement some kind of control to prevent immediate reselecting, ugly e.g. if (select()==0) sleep(1); would probably yield enough for the wakeup(); register(); to be executed before the next select() happens. The solution still stands (+1), the problem comes from a "missing method" wakeupAndRegister() that would perform atomically.
@DavidEhrmann I'm not happy with the whole design. Why three levels of synchonization are specified to occur, and hardwired into the abstract base classes, is a mystery to me, as is why the mechanics of registering from a separate thread are left almost entirely to us apart from this strange wakeup() method.
@DavidEhrmann see also stackoverflow.com/questions/12822298/… for your issue
Since the issue is able to cause deadlocks, doing things like "sleep(1)" is a very bad idea. Multithreading can not be done "good" or "bad"... Just "right" or not. Every occurence of "sleep" is potentially bad anyway.
|
4

I ran into the same issue today (that is "wakeupAndRegister" not being available). I hope my solution might be helpful:

Create a sync object:

Object registeringSync = new Object();

Register a channel by doing:

synchronized (registeringSync) {
  selector.wakeup();  // Wakes up a CURRENT or (important) NEXT select
  // !!! Might run into a deadlock "between" these lines if not using the lock !!!
  // To force it, insert Thread.sleep(1000); here
  channel.register(selector, ...);
}

The thread should do the following:

public void run() {    
  while (initialized) {
    if (selector.select() != 0) {  // Blocks until "wakeup"
      // Iterate through selected keys
    }
    synchronized (registeringSync) { }  // Cannot continue until "register" is complete
  }
}

3 Comments

There are already three levels of synchronization, and this is the problem. Adding a fourth doesn't solve anything.
It does. If not doing the additional lock, the code can run into a deadlock in case the thread enters a new select "between" wakeup and register (added a comment there).
There is no deadlock. A deadlock would require that both threads are blocked waiting for each other's lock. Don't misuse standard terminology.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.