Subversion Repositories Projects

Rev

Blame | Last modification | View Log | RSS feed

package dongfang.mkt.comm.tcp;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import dongfang.mkt.comm.MKConnection;
import dongfang.mkt.comm.serial.RXTXSerialPort;

public class MKTCPServer {
        private MKConnection mkConnection;
        private ServerSocket serverSocket;
        private Map<SocketAddress, Socket> connections = new HashMap<SocketAddress, Socket>();
       
        private int port;

        public MKTCPServer(MKConnection mkConnection, int port) throws IOException {
                this.mkConnection = mkConnection;
                serverSocket = new ServerSocket(port);
                this.port = port;
        }
       
        public void start() {
                ClientConnectionServer svr = new ClientConnectionServer();
                svr.setDaemon(true);
                svr.start();
                new Manifold().start();
                System.out.println("Waiting for connections at port " + port);
        }

        class ClientConnectionServer extends Thread {
                public void run() {
                        while (true) {
                                try {
                                        // accept() will block, preventing performance loss bc of inf looping.
                                        Socket clientSocket = serverSocket.accept();
                                        System.out.println("Accepted a connection from " + clientSocket.getRemoteSocketAddress());
                                        connections.put(clientSocket.getRemoteSocketAddress(), clientSocket);
                                        new Mux(clientSocket.getInputStream()).start();
                                } catch (IOException ex) {
                                        // ignore.
                                        System.err.println(ex);
                                }
                        }
                }
        }

        /*
         * This thread accepts new client connections but does not do anything for
         * transferring data.
         */


        void send(byte[] buf, int len) throws IOException {
                synchronized (mkConnection.getOutputStream()) {
                        String snak = new String(buf, 0, len);
                        //System.out.println("Sending on behalf of client: " + snak);
                        mkConnection.getOutputStream().write(buf, 0, len);
                }
        }

        /*
         * For each client, this takes the client's output and sends frames on
         * (without mixing them up) to the common connection. An instance of this
         * must be made and started for each client.
         */

        class Mux extends Thread {
                InputStream input;
                byte[] buffer = new byte[1024];

                Mux(InputStream input) {
                        this.input = input;
                }

                public void run() {
                        while (true) {
                                try {
                                        int r;
                                        while ((r = input.read()) != '#') {
                                                // can't be! There should be only legal frames coming.
                                        }
                                        buffer[0] = '#';
                                        int c = 1;
                                        while ((r = input.read()) != '\r') {
                                                buffer[c++] = (byte) r;
                                        }
                                        buffer[c++] = '\r';
                                        send(buffer, c);
                                } catch (IOException ex) {
                                }
                        }
                }
        }

        class Manifold extends Thread {
                public void run() {
                        byte[] buf = new byte[1024];
                        int read;
                        InputStream is = null;
                        try {
                                is = mkConnection.getInputStream();
                        } catch (IOException ex) {
                                throw new RuntimeException(ex);
                        }
                        while(true) {
                                try {
                                read = is.read(buf, 0, buf.length);
                                String snak = new String(buf, 0, read);
                                Set<SocketAddress> keys = new HashSet<SocketAddress>(connections.keySet());

                                // Will read() block or return zero when no data is available?
                                if (read > 0) {
                                        //System.out.println("Received: " + snak);
                                        for(SocketAddress a: keys) {
                                                try {
                                                        Socket connection = connections.get(a);
                                                        OutputStream os = connection.getOutputStream();
                                                        os.write(buf, 0, read);
                                                } catch (IOException ex) {
                                                        // ignore errors writing to each client.
                                                }
                                        }
                                } else {
                                        try {
                                                Thread.sleep(10);
                                        } catch (InterruptedException ex) {}
                                }
                                } catch (IOException ex) {
                                        // ignore.
                                }
                        }
                }
        }
       
        public static void main(String[] args) throws IOException {
                int port = Integer.parseInt(args[0]);
                MKConnection c = new RXTXSerialPort();
                c.init(null);
                MKTCPServer server = new MKTCPServer(c, port);
                server.start();
        }
}