Rev 1563 |
Go to most recent revision |
Blame |
Compare with Previous |
Last modification |
View Log
| RSS feed
package dongfang.mkt.comm;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import dongfang.mkt.frames.RequestFrame;
import dongfang.mkt.frames.ResponseFrame;
/**
* Thread safe!
*
* @author dongfang
*/
public class FrameQueue
{
private final MKInputStream input
;
private final MKOutputStream output
;
// private List responseQueue;
private ResponseFrame lastResponseFrame
;
private volatile boolean doQueue =
true;
class Receiver extends Thread {
public void run
() {
while (doQueue
) {
try {
ResponseFrame f = input.
getNextFrame();
synchronized (FrameQueue.
this.
input) {
lastResponseFrame = f
;
FrameQueue.
this.
input.
notifyAll();
}
} catch (IOException ex
) {
System.
err.
println(ex
);
}
}
System.
out.
println("Receiver terminated.");
}
}
public FrameQueue
(MKConnection port
) throws IOException {
super();
this.
input =
new MKInputStream
(port.
getInputStream());
this.
output =
new MKOutputStream
(port.
getOutputStream());
new Receiver().
start();
}
public FrameQueue
(InputStream in,
OutputStream out
) throws IOException {
super();
this.
input =
new MKInputStream
(in
);
this.
output =
new MKOutputStream
(out
);
Receiver r =
new Receiver();
r.
setDaemon(false);
r.
start();
}
public void sendRequest
(RequestFrame f
) throws IOException {
synchronized (this.
output) {
output.
write(f
);
}
}
public ResponseFrame getResponseFor
(RequestFrame f,
int maxwait
) throws IOException {
ResponseFrame response
;
long timeout =
System.
currentTimeMillis() + maxwait
;
synchronized (input
) {
while ((response = responseTo
(f
)) ==
null && System.
currentTimeMillis() < timeout
) {
try {
input.
wait(100);
} catch (InterruptedException ex
) {
}
}
}
return response
;
}
public void kill
() {
doQueue =
false;
}
private ResponseFrame responseTo
(RequestFrame f
) {
synchronized (this.
input) {
if (lastResponseFrame
!=
null && lastResponseFrame.
isResponseTo(f
)) {
ResponseFrame result = lastResponseFrame
;
lastResponseFrame =
null;
return result
;
}
return null;
}
}
}