Subversion Repositories Projects

Rev

Rev 1563 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
1563 - 1
package dongfang.mkt.comm;
2
 
3
import java.io.IOException;
4
import java.io.InputStream;
5
import java.io.OutputStream;
6
 
7
import dongfang.mkt.frames.RequestFrame;
8
import dongfang.mkt.frames.ResponseFrame;
9
 
10
/**
11
 * Thread safe!
12
 *
13
 * @author dongfang
14
 */
15
public class FrameQueue {
16
        private final MKInputStream input;
17
        private final MKOutputStream output;
18
        // private List responseQueue;
19
        private ResponseFrame lastResponseFrame;
1698 - 20
        private volatile boolean doQueue = true;
1563 - 21
 
22
        class Receiver extends Thread {
23
                public void run() {
24
                        while (doQueue) {
25
                                try {
26
                                        ResponseFrame f = input.getNextFrame();
27
                                        synchronized (FrameQueue.this.input) {
28
                                                lastResponseFrame = f;
29
                                                FrameQueue.this.input.notifyAll();
30
                                        }
31
                                } catch (IOException ex) {
32
                                        System.err.println(ex);
33
                                }
34
                        }
35
                        System.out.println("Receiver terminated.");
36
                }
37
        }
38
 
39
        public FrameQueue(MKConnection port) throws IOException {
40
                super();
41
                this.input = new MKInputStream (port.getInputStream());
42
                this.output = new MKOutputStream(port.getOutputStream());
43
                new Receiver().start();
44
        }
45
 
46
        public FrameQueue(InputStream in, OutputStream out) throws IOException {
47
                super();
48
                this.input = new MKInputStream (in);
49
                this.output = new MKOutputStream(out);
1698 - 50
                Receiver r = new Receiver();
51
                r.setDaemon(false);
52
                r.start();
1563 - 53
        }
54
 
55
        public void sendRequest(RequestFrame f) throws IOException {
56
                synchronized (this.output) {
57
                        output.write(f);
58
                }
59
        }
60
 
61
        public ResponseFrame getResponseFor(RequestFrame f, int maxwait) throws IOException {
62
                ResponseFrame response;
63
                long timeout = System.currentTimeMillis() + maxwait;
64
                synchronized (input) {
65
                        while ((response = responseTo(f)) == null && System.currentTimeMillis() < timeout) {
66
                                try {
67
                                        input.wait(100);
68
                                } catch (InterruptedException ex) {
69
                                }
70
                        }
71
                }
72
                return response;
73
        }
74
 
75
        public void kill() {
76
                doQueue = false;
77
        }
78
 
79
        private ResponseFrame responseTo(RequestFrame f) {
80
                synchronized (this.input) {
81
                        if (lastResponseFrame != null && lastResponseFrame.isResponseTo(f)) {
82
                                ResponseFrame result = lastResponseFrame;
83
                                lastResponseFrame = null;
84
                                return result;
85
                        }
86
                        return null;
87
                }
88
        }
89
}