Subversion Repositories Projects

Rev

Rev 1563 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

package dongfang.mkt.comm;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import dongfang.mkt.frames.RequestFrame;
import dongfang.mkt.frames.ResponseFrame;

/**
 * Thread safe!
 *
 * @author dongfang
 */

public class FrameQueue {
        private final MKInputStream input;
        private final MKOutputStream output;
        // private List responseQueue;
        private ResponseFrame lastResponseFrame;
        private volatile boolean doQueue = true;

        class Receiver extends Thread {
                public void run() {
                        while (doQueue) {
                                try {
                                        ResponseFrame f = input.getNextFrame();
                                        synchronized (FrameQueue.this.input) {
                                                lastResponseFrame = f;
                                                FrameQueue.this.input.notifyAll();
                                        }
                                } catch (IOException ex) {
                                        System.err.println(ex);
                                }
                        }
                        System.out.println("Receiver terminated.");
                }
        }

        public FrameQueue(MKConnection port) throws IOException {
                super();
                this.input = new MKInputStream (port.getInputStream());
                this.output = new MKOutputStream(port.getOutputStream());
                new Receiver().start();
        }

        public FrameQueue(InputStream in, OutputStream out) throws IOException {
                super();
                this.input = new MKInputStream (in);
                this.output = new MKOutputStream(out);
                Receiver r = new Receiver();
                r.setDaemon(false);
                r.start();
        }

        public void sendRequest(RequestFrame f) throws IOException {
                synchronized (this.output) {
                        output.write(f);
                }
        }

        public ResponseFrame getResponseFor(RequestFrame f, int maxwait) throws IOException {
                ResponseFrame response;
                long timeout = System.currentTimeMillis() + maxwait;
                synchronized (input) {
                        while ((response = responseTo(f)) == null && System.currentTimeMillis() < timeout) {
                                try {
                                        input.wait(100);
                                } catch (InterruptedException ex) {
                                }
                        }
                }
                return response;
        }

        public void kill() {
                doQueue = false;
        }

        private ResponseFrame responseTo(RequestFrame f) {
                synchronized (this.input) {
                        if (lastResponseFrame != null && lastResponseFrame.isResponseTo(f)) {
                                ResponseFrame result = lastResponseFrame;
                                lastResponseFrame = null;
                                return result;
                        }
                        return null;
                }
        }
}