Blame |
Last modification |
View Log
| RSS feed
package dongfang.mkt.comm.tcp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import dongfang.mkt.comm.MKConnection;
import dongfang.mkt.comm.serial.RXTXSerialPort;
public class MKTCPServer
{
private MKConnection mkConnection
;
private ServerSocket serverSocket
;
private Map<SocketAddress,
Socket> connections =
new HashMap<SocketAddress,
Socket>();
private int port
;
public MKTCPServer
(MKConnection mkConnection,
int port
) throws IOException {
this.
mkConnection = mkConnection
;
serverSocket =
new ServerSocket(port
);
this.
port = port
;
}
public void start
() {
ClientConnectionServer svr =
new ClientConnectionServer
();
svr.
setDaemon(true);
svr.
start();
new Manifold
().
start();
System.
out.
println("Waiting for connections at port " + port
);
}
class ClientConnectionServer
extends Thread {
public void run
() {
while (true) {
try {
// accept() will block, preventing performance loss bc of inf looping.
Socket clientSocket = serverSocket.
accept();
System.
out.
println("Accepted a connection from " + clientSocket.
getRemoteSocketAddress());
connections.
put(clientSocket.
getRemoteSocketAddress(), clientSocket
);
new Mux
(clientSocket.
getInputStream()).
start();
} catch (IOException ex
) {
// ignore.
System.
err.
println(ex
);
}
}
}
}
/*
* This thread accepts new client connections but does not do anything for
* transferring data.
*/
void send
(byte[] buf,
int len
) throws IOException {
synchronized (mkConnection.
getOutputStream()) {
String snak =
new String(buf,
0, len
);
//System.out.println("Sending on behalf of client: " + snak);
mkConnection.
getOutputStream().
write(buf,
0, len
);
}
}
/*
* For each client, this takes the client's output and sends frames on
* (without mixing them up) to the common connection. An instance of this
* must be made and started for each client.
*/
class Mux
extends Thread {
InputStream input
;
byte[] buffer =
new byte[1024];
Mux
(InputStream input
) {
this.
input = input
;
}
public void run
() {
while (true) {
try {
int r
;
while ((r = input.
read()) !=
'#') {
// can't be! There should be only legal frames coming.
}
buffer
[0] =
'#';
int c =
1;
while ((r = input.
read()) !=
'\r') {
buffer
[c++
] =
(byte) r
;
}
buffer
[c++
] =
'\r';
send
(buffer, c
);
} catch (IOException ex
) {
}
}
}
}
class Manifold
extends Thread {
public void run
() {
byte[] buf =
new byte[1024];
int read
;
InputStream is =
null;
try {
is = mkConnection.
getInputStream();
} catch (IOException ex
) {
throw new RuntimeException(ex
);
}
while(true) {
try {
read = is.
read(buf,
0, buf.
length);
String snak =
new String(buf,
0, read
);
Set<SocketAddress> keys =
new HashSet<SocketAddress>(connections.
keySet());
// Will read() block or return zero when no data is available?
if (read
> 0) {
//System.out.println("Received: " + snak);
for(SocketAddress a: keys
) {
try {
Socket connection = connections.
get(a
);
OutputStream os = connection.
getOutputStream();
os.
write(buf,
0, read
);
} catch (IOException ex
) {
// ignore errors writing to each client.
}
}
} else {
try {
Thread.
sleep(10);
} catch (InterruptedException ex
) {}
}
} catch (IOException ex
) {
// ignore.
}
}
}
}
public static void main
(String[] args
) throws IOException {
int port =
Integer.
parseInt(args
[0]);
MKConnection c =
new RXTXSerialPort
();
c.
init(null);
MKTCPServer server =
new MKTCPServer
(c, port
);
server.
start();
}
}