Subversion Repositories Projects

Rev

Rev 1563 | Go to most recent revision | Only display areas with differences | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 1563 Rev 1698
1
package dongfang.mkt.comm;
1
package dongfang.mkt.comm;
2
 
2
 
3
import java.io.IOException;
3
import java.io.IOException;
4
import java.io.InputStream;
4
import java.io.InputStream;
5
import java.io.OutputStream;
5
import java.io.OutputStream;
6
 
6
 
7
import dongfang.mkt.frames.RequestFrame;
7
import dongfang.mkt.frames.RequestFrame;
8
import dongfang.mkt.frames.ResponseFrame;
8
import dongfang.mkt.frames.ResponseFrame;
9
 
9
 
10
/**
10
/**
11
 * Thread safe!
11
 * Thread safe!
12
 *
12
 *
13
 * @author dongfang
13
 * @author dongfang
14
 */
14
 */
15
public class FrameQueue {
15
public class FrameQueue {
16
        private final MKInputStream input;
16
        private final MKInputStream input;
17
        private final MKOutputStream output;
17
        private final MKOutputStream output;
18
        // private List responseQueue;
18
        // private List responseQueue;
19
        private ResponseFrame lastResponseFrame;
19
        private ResponseFrame lastResponseFrame;
20
        private boolean doQueue = true;
20
        private volatile boolean doQueue = true;
21
 
21
 
22
        class Receiver extends Thread {
22
        class Receiver extends Thread {
23
                public void run() {
23
                public void run() {
24
                        while (doQueue) {
24
                        while (doQueue) {
25
                                try {
25
                                try {
26
                                        ResponseFrame f = input.getNextFrame();
26
                                        ResponseFrame f = input.getNextFrame();
27
                                        synchronized (FrameQueue.this.input) {
27
                                        synchronized (FrameQueue.this.input) {
28
                                                lastResponseFrame = f;
28
                                                lastResponseFrame = f;
29
                                                FrameQueue.this.input.notifyAll();
29
                                                FrameQueue.this.input.notifyAll();
30
                                        }
30
                                        }
31
                                } catch (IOException ex) {
31
                                } catch (IOException ex) {
32
                                        System.err.println(ex);
32
                                        System.err.println(ex);
33
                                }
33
                                }
34
                        }
34
                        }
35
                        System.out.println("Receiver terminated.");
35
                        System.out.println("Receiver terminated.");
36
                }
36
                }
37
        }
37
        }
38
 
38
 
39
        public FrameQueue(MKConnection port) throws IOException {
39
        public FrameQueue(MKConnection port) throws IOException {
40
                super();
40
                super();
41
                this.input = new MKInputStream (port.getInputStream());
41
                this.input = new MKInputStream (port.getInputStream());
42
                this.output = new MKOutputStream(port.getOutputStream());
42
                this.output = new MKOutputStream(port.getOutputStream());
43
                new Receiver().start();
43
                new Receiver().start();
44
        }
44
        }
45
 
45
 
46
        public FrameQueue(InputStream in, OutputStream out) throws IOException {
46
        public FrameQueue(InputStream in, OutputStream out) throws IOException {
47
                super();
47
                super();
48
                this.input = new MKInputStream (in);
48
                this.input = new MKInputStream (in);
49
                this.output = new MKOutputStream(out);
49
                this.output = new MKOutputStream(out);
-
 
50
                Receiver r = new Receiver();
-
 
51
                r.setDaemon(false);
50
                new Receiver().start();
52
                r.start();
51
        }
53
        }
52
 
54
 
53
        public void sendRequest(RequestFrame f) throws IOException {
55
        public void sendRequest(RequestFrame f) throws IOException {
54
                synchronized (this.output) {
56
                synchronized (this.output) {
55
                        output.write(f);
57
                        output.write(f);
56
                }
58
                }
57
        }
59
        }
58
 
60
 
59
        public ResponseFrame getResponseFor(RequestFrame f, int maxwait) throws IOException {
61
        public ResponseFrame getResponseFor(RequestFrame f, int maxwait) throws IOException {
60
                ResponseFrame response;
62
                ResponseFrame response;
61
                long timeout = System.currentTimeMillis() + maxwait;
63
                long timeout = System.currentTimeMillis() + maxwait;
62
                synchronized (input) {
64
                synchronized (input) {
63
                        while ((response = responseTo(f)) == null && System.currentTimeMillis() < timeout) {
65
                        while ((response = responseTo(f)) == null && System.currentTimeMillis() < timeout) {
64
                                try {
66
                                try {
65
                                        input.wait(100);
67
                                        input.wait(100);
66
                                } catch (InterruptedException ex) {
68
                                } catch (InterruptedException ex) {
67
                                }
69
                                }
68
                        }
70
                        }
69
                }
71
                }
70
                return response;
72
                return response;
71
        }
73
        }
72
 
74
 
73
        public void kill() {
75
        public void kill() {
74
                doQueue = false;
76
                doQueue = false;
75
        }
77
        }
76
 
78
 
77
        private ResponseFrame responseTo(RequestFrame f) {
79
        private ResponseFrame responseTo(RequestFrame f) {
78
                synchronized (this.input) {
80
                synchronized (this.input) {
79
                        if (lastResponseFrame != null && lastResponseFrame.isResponseTo(f)) {
81
                        if (lastResponseFrame != null && lastResponseFrame.isResponseTo(f)) {
80
                                ResponseFrame result = lastResponseFrame;
82
                                ResponseFrame result = lastResponseFrame;
81
                                lastResponseFrame = null;
83
                                lastResponseFrame = null;
82
                                return result;
84
                                return result;
83
                        }
85
                        }
84
                        return null;
86
                        return null;
85
                }
87
                }
86
        }
88
        }
87
}
89
}
88
 
90