4

I am new to multithreading & socket programming in Java. I would like to know what is the best way to implement 2 threads - one for receiving a socket and one for sending a socket. If what I am trying to do sounds absurd, pls let me know why! The code is largely inspired from Sun's tutorials online.I want to use Multicast sockets so that I can work with a multicast group.

class Server extends Thread
{

    static protected MulticastSocket socket = null;
    protected BufferedReader in = null;
    public InetAddress group;

    private static class Receive implements Runnable
    {

        public void run()
        {
            try
            {
                byte[] buf = new byte[256];
                DatagramPacket pkt = new DatagramPacket(buf,buf.length);
                socket.receive(pkt);
                String received = new String(pkt.getData(),0,pkt.getLength());
                System.out.println("From server@" + received);          
                Thread.sleep(1000);
            }
            catch (IOException e)
            { 
                System.out.println("Error:"+e);
            }   
            catch (InterruptedException e)
            { 
                System.out.println("Error:"+e);
            }   

        }

    }


    public Server() throws IOException
    {
        super("server");
        socket = new MulticastSocket(4446);
        group = InetAddress.getByName("239.231.12.3");
        socket.joinGroup(group);
    }

    public void run()
    {

        while(1>0)
        {   
            try
            {
                byte[] buf = new byte[256];
                DatagramPacket pkt = new DatagramPacket(buf,buf.length);        
                //String msg = reader.readLine();
                String pid = ManagementFactory.getRuntimeMXBean().getName();
                buf = pid.getBytes();
                pkt = new DatagramPacket(buf,buf.length,group,4446);
                socket.send(pkt);
                Thread t = new Thread(new Receive());
                t.start();

                while(t.isAlive())
                { 
                    t.join(1000);
                }
                sleep(1);
            }
            catch (IOException e)
            { 
                System.out.println("Error:"+e);
            }   
            catch (InterruptedException e)
            { 
                System.out.println("Error:"+e);
            }   

        }
        //socket.close();
    }

    public static void main(String[] args) throws IOException
    {
        new Server().start();
        //System.out.println("Hello");
    }

}
3
  • @Ravi, I fixed your formatting, but you should edit the class names... make them start with a capital letter. It's painful to read your code when your class names start with lower case letters. Commented Apr 22, 2010 at 0:49
  • @Xepoch: My final goal is to implement certain protocols in a distributed system @Lirik: Am sorry about the class names! I have fixed them now. Commented Apr 22, 2010 at 12:44
  • Good to heed an answer that uses ExecuterService when dealing with udp sockets Commented May 29, 2014 at 5:12

4 Answers 4

9

First thing is first: your classes should start with a capital letter per the Java Naming Conventions:

Class names should be nouns, in mixed case with the first letter of each internal word capitalized. Try to keep your class names simple and descriptive. Use whole words-avoid acronyms and abbreviations (unless the abbreviation is much more widely used than the long form, such as URL or HTML).

Second: Try to break down the code into coherent sections and organize them around some common feature that you're dealing with... perhaps around the functionality or the model you're programming.

The (basic) model for the server is that the only thing it does is receive socket connections... the server relies on a handler to handle those connections and that's it. If you try to build that model it would look something like this:

class Server{
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public Server(int port, int poolSize) throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }

    public void serve() {
      try {
        while(true) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // receive the datagram packets
    }
 }

Third: I would recommend that you look at some existing examples.

Updated per comments:
OK Ravi, there are some big issues with your code and some minor issues with it:

  1. I assume that the Receive class is your client... you should pull that out as a separate program (with its own main class) and run your server and multiple clients at the same time. Spawning a new "client thread" from your server for every new UDP package you send is a disturbing idea (big issue).

  2. When you make your client application, you should make it run the receiving code in its own while loop (minor issue), e.g.:

    public class Client extends Thread
    {
        public Client(/*..*/)
        {
            // initialize your client
        }
    
        public void run()
        {
            while(true)
            {
                // receive UDP packets
                // process the UDP packets
            }
        }
    
        public static void main(String[] args) throws IOException
        {
            // start your client
            new Client().start();
        }
    }
    
  3. You should only need just one thread per client and one thread per server (you technically don't even a separate thread in there since main has its own thread), so you might not find the ExecutorService that useful.

Otherwise your approach is correct... but I would still recommend that you check out some of examples.

Sign up to request clarification or add additional context in comments.

10 Comments

the start() method on the server class in Ravi's example is part of the Thread class, which the server class extends. Other than that, this is a great answer.
@Lirik & others: Thanks, but really all I am looking for is if the approach I have taken is the right one (or an acceptable one) - since I have already looked at many tutorials for multithreading...
@Lirik: The first thing I did was to build a simple Client-Server model using UDP and Datagrams. I understand what you are trying to say, but what if I have multiple nodes and each node needs to send & receive packets? I am basically trying to model a distributed system where each node can behave either as a server or a client depending on the circumstance..so a message can arrive any time at a node and depending on the message I need to send a multicast...
@Ravi, ok... then I don't see anything particularly wrong with your implementation. Is there a specific reason that you need to create multiple new threads to receive packets? I would pull that out into a separate thread altogether, but I don't know what's your motivation for doing it that way.
|
2

Wanting to create threads in an application is not absurd! You won't need exactly 2 threads, but I think you're talking about 2 classes that implement the Runnable interface.

The threading API has gotten better since Java 1.5 and you don't need to mess with java.lang.Thread anymore. You can simply create a java.util.concurrent.Executor and submit Runnable instances to it.

The book Java Concurrency in Practice uses that exact problem - creating a threaded socket server - and walks through several iterations of the code to show the best way to do it. Check out the free sample chapter, which is great. I won't copy/paste the code here, but look specifically at listing 6.8.

3 Comments

Thanks Drew, I didn't know you could do that! I am gonna take a look at the concurrentExecutor right away
Careful!!! While it is always perfectly okay to perform a blocking operation in a spawned thread (which will then simply block for a while), it can be deadly to do that in a Runnable instance passed to java.util.concurrent.Executor. Why? Because Executor does not guarantee to run the code on another thread. It may as well run the code on the calling thread. From the documentation: "However, the Executor interface does not strictly require that execution be asynchronous.". So you may as well block your main thread that way and you can easily dead lock the whole program.
Good call out that the implementation matters. An example of a synchronous impl. would be Spring's SyncTaskExecutor
1

It's a good thing Eclipse's history works even for a day back :) Thanks to that, I am able to give both Ravi a working example and Lirik his answer on leakage.

Let me first start of by stating that I have no clue what is causing this leak, but if I leave it long enough, it will fail on a OutOfMemoryError.

Second, I left the working code commented out for Ravi for a working basic example of my UDP server. The timeout was there to test how long my firewall would kill the receivers end (30 seconds). Just remove anything with the pool, and you're good to go.

So here is, a working but leaking version of my example threaded UDP server.

public class TestServer {

private static Integer TIMEOUT = 30;
private final static int MAX_BUFFER_SIZE = 8192;
private final static int MAX_LISTENER_THREADS = 5;
private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ");

private int mPort;
private DatagramSocket mSocket;

// You can remove this for a working version
private ExecutorService mPool;

public TestServer(int port) {
    mPort = port;
    try {
        mSocket = new DatagramSocket(mPort);
        mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE);
        mSocket.setSendBufferSize(MAX_BUFFER_SIZE);
        mSocket.setSoTimeout(0);

        // You can uncomment this for a working version
        //for (int i = 0; i < MAX_LISTENER_THREADS; i++) {
        //  new Thread(new Listener(mSocket)).start();
        //}

        // You can remove this for a working version
        mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS);

    } catch (IOException e) {
        e.printStackTrace();
    }
}

// You can remove this for a working version
public void start() {
    try {
        try {
            while (true) {
                mPool.execute(new Listener(mSocket));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    } finally {
        mPool.shutdown();
    }
}

private class Listener implements Runnable {

    private final DatagramSocket socket;

    public Listener(DatagramSocket serverSocket) {
        socket = serverSocket;
    }

    private String readLn(DatagramPacket packet) throws IOException {
        socket.receive(packet);
        return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine();
    }

    private void writeLn(DatagramPacket packet, String string) throws IOException {
        packet.setData(string.concat("\r\n").getBytes());
        socket.send(packet);
    }

    @Override
    public void run() {
        DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE);
        String s;
        while (true) {
            try {
                packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE);
                s = readLn(packet);
                System.out.println(DateFormat.format(new Date()) + " Received: " + s);
                Thread.sleep(TIMEOUT * 1000);
                writeLn(packet, s);
                System.out.println(DateFormat.format(new Date()) + " Sent: " + s);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public static void main(String[] args) {
    if (args.length == 1) {
        try {
            TIMEOUT = Integer.parseInt(args[0]);
        } catch (Exception e) {
            TIMEOUT = 30;
        }
    }
    System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT);
    //new TestServer(4444);
    new TestServer(4444).start();
}
}

btw. @Lirik, I witnessed this behavior first in Eclipse, after which I tested it from the command line. And again, I have NO clue what is causing it ;) sorry...

Comments

0

2 threads is fine. One reader another writer. Remember that with UDP you should not spawn new handler threads (unless what you're doing takes a long time), I recommend throwing the incoming messages into a processing Queue. The same for the send, have a send thread that blocks on an incoming Queue for UDP send.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.