Subversion Repositories Projects

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
1568 - 1
package dongfang.mkt.main;
2
 
3
import java.io.IOException;
4
import java.io.InputStream;
5
import java.io.OutputStream;
6
import java.net.ServerSocket;
7
import java.net.Socket;
8
import java.net.SocketAddress;
9
import java.util.HashMap;
10
import java.util.HashSet;
11
import java.util.Map;
12
import java.util.Set;
13
 
14
import dongfang.mkt.comm.MKConnection;
15
import dongfang.mkt.comm.serial.RXTXSerialPort;
16
 
17
public class MKTCPServer {
18
        private MKConnection mkConnection;
19
        private ServerSocket serverSocket;
20
        private Map<SocketAddress, Socket> connections = new HashMap<SocketAddress, Socket>();
21
 
22
        private int port;
23
 
24
        public MKTCPServer(MKConnection mkConnection, int port) throws IOException {
25
                this.mkConnection = mkConnection;
26
                serverSocket = new ServerSocket(port);
27
                this.port = port;
28
        }
29
 
30
        public void start() {
31
                ClientConnectionServer svr = new ClientConnectionServer();
32
                svr.setDaemon(true);
33
                svr.start();
34
                new Manifold().start();
35
                System.out.println("Waiting for connections at port " + port);
36
        }
37
 
38
        class ClientConnectionServer extends Thread {
39
                public void run() {
40
                        while (true) {
41
                                try {
42
                                        // accept() will block, preventing performance loss bc of inf looping.
43
                                        Socket clientSocket = serverSocket.accept();
44
                                        System.out.println("Accepted a connection from " + clientSocket.getRemoteSocketAddress());
45
                                        connections.put(clientSocket.getRemoteSocketAddress(), clientSocket);
46
                                        new Mux(clientSocket.getInputStream()).start();
47
                                } catch (IOException ex) {
48
                                        // ignore.
49
                                        System.err.println(ex);
50
                                }
51
                        }
52
                }
53
        }
54
 
55
        /*
56
         * This thread accepts new client connections but does not do anything for
57
         * transferring data.
58
         */
59
 
60
        void send(byte[] buf, int len) throws IOException {
61
                synchronized (mkConnection.getOutputStream()) {
62
                        String snak = new String(buf, 0, len);
63
                        //System.out.println("Sending on behalf of client: " + snak);
64
                        mkConnection.getOutputStream().write(buf, 0, len);
65
                }
66
        }
67
 
68
        /*
69
         * For each client, this takes the client's output and sends frames on
70
         * (without mixing them up) to the common connection. An instance of this
71
         * must be made and started for each client.
72
         */
73
        class Mux extends Thread {
74
                InputStream input;
75
                byte[] buffer = new byte[1024];
76
 
77
                Mux(InputStream input) {
78
                        this.input = input;
79
                }
80
 
81
                public void run() {
82
                        while (true) {
83
                                try {
84
                                        int r;
85
                                        while ((r = input.read()) != '#') {
86
                                                // can't be! There should be only legal frames coming.
87
                                        }
88
                                        buffer[0] = '#';
89
                                        int c = 1;
90
                                        while ((r = input.read()) != '\r') {
91
                                                buffer[c++] = (byte) r;
92
                                        }
93
                                        buffer[c++] = '\r';
94
                                        send(buffer, c);
95
                                } catch (IOException ex) {
96
                                }
97
                        }
98
                }
99
        }
100
 
101
        class Manifold extends Thread {
102
                public void run() {
103
                        byte[] buf = new byte[1024];
104
                        int read;
105
                        InputStream is = null;
106
                        try {
107
                                is = mkConnection.getInputStream();
108
                        } catch (IOException ex) {
109
                                throw new RuntimeException(ex);
110
                        }
111
                        while(true) {
112
                                try {
113
                                read = is.read(buf, 0, buf.length);
114
                                String snak = new String(buf, 0, read);
115
                                Set<SocketAddress> keys = new HashSet<SocketAddress>(connections.keySet());
116
 
117
                                // Will read() block or return zero when no data is available?
118
                                if (read > 0) {
119
                                        //System.out.println("Received: " + snak);
120
                                        for(SocketAddress a: keys) {
121
                                                try {
122
                                                        Socket connection = connections.get(a);
123
                                                        OutputStream os = connection.getOutputStream();
124
                                                        os.write(buf, 0, read);
125
                                                } catch (IOException ex) {
126
                                                        // ignore errors writing to each client. 
127
                                                }
128
                                        }
129
                                } else {
130
                                        try {
131
                                                Thread.sleep(10);
132
                                        } catch (InterruptedException ex) {}
133
                                }
134
                                } catch (IOException ex) {
135
                                        // ignore.
136
                                }
137
                        }
138
                }
139
        }
140
 
141
        public static void main(String[] args) throws IOException {
142
                int port = Integer.parseInt(args[0]);
143
                MKConnection c = new RXTXSerialPort();
144
                c.init(null);
145
                MKTCPServer server = new MKTCPServer(c, port);
146
                server.start();
147
        }
148
}