Subversion Repositories Projects

Rev

Rev 1698 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
1563 - 1
package dongfang.mkt.comm;
2
import java.io.IOException;
3
import java.io.InputStream;
4
import java.io.OutputStream;
5
 
6
import dongfang.mkt.frames.RequestFrame;
7
import dongfang.mkt.frames.ResponseFrame;
8
 
9
/**
10
 * Thread safe!
11
 *
12
 * @author dongfang
13
 */
14
public class FrameQueue {
15
        private final MKInputStream input;
16
        private final MKOutputStream output;
17
        // private List responseQueue;
18
        private ResponseFrame lastResponseFrame;
1698 - 19
        private volatile boolean doQueue = true;
1563 - 20
 
21
        class Receiver extends Thread {
22
                public void run() {
23
                        while (doQueue) {
24
                                try {
25
                                        ResponseFrame f = input.getNextFrame();
26
                                        synchronized (FrameQueue.this.input) {
27
                                                lastResponseFrame = f;
28
                                                FrameQueue.this.input.notifyAll();
29
                                        }
30
                                } catch (IOException ex) {
31
                                        System.err.println(ex);
32
                                }
33
                        }
34
                        System.out.println("Receiver terminated.");
35
                }
36
        }
37
 
38
        public FrameQueue(MKConnection port) throws IOException {
39
                super();
40
                this.input = new MKInputStream (port.getInputStream());
41
                this.output = new MKOutputStream(port.getOutputStream());
42
                new Receiver().start();
43
        }
44
 
45
        public FrameQueue(InputStream in, OutputStream out) throws IOException {
46
                super();
47
                this.input = new MKInputStream (in);
48
                this.output = new MKOutputStream(out);
1698 - 49
                Receiver r = new Receiver();
50
                r.setDaemon(false);
51
                r.start();
1563 - 52
        }
53
 
54
        public void sendRequest(RequestFrame f) throws IOException {
1794 - 55
                synchronized(this.output) {
1563 - 56
                        output.write(f);
57
                }
58
        }
59
 
60
        public ResponseFrame getResponseFor(RequestFrame f, int maxwait) throws IOException {
61
                ResponseFrame response;
62
                long timeout = System.currentTimeMillis() + maxwait;
63
                synchronized (input) {
64
                        while ((response = responseTo(f)) == null && System.currentTimeMillis() < timeout) {
65
                                try {
66
                                        input.wait(100);
67
                                } catch (InterruptedException ex) {
68
                                }
69
                        }
70
                }
71
                return response;
72
        }
73
 
74
        public void kill() {
75
                doQueue = false;
76
        }
77
 
78
        private ResponseFrame responseTo(RequestFrame f) {
79
                synchronized (this.input) {
80
                        if (lastResponseFrame != null && lastResponseFrame.isResponseTo(f)) {
81
                                ResponseFrame result = lastResponseFrame;
82
                                lastResponseFrame = null;
83
                                return result;
84
                        }
85
                        return null;
86
                }
87
        }
88
}