001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.tcp;
020    
021    import java.io.EOFException;
022    import java.io.FilterOutputStream;
023    import java.io.IOException;
024    import java.io.OutputStream;
025    
026    /**
027     * An optimized buffered outputstream for Tcp
028     *
029     * @version $Revision: 1.1.1.1 $
030     */
031    
032    public class TcpBufferedOutputStream extends FilterOutputStream {
033        private final static int BUFFER_SIZE = 4096;
034        private byte[] buf;
035        private int count;
036        private boolean closed;
037    
038        /**
039         * Constructor
040         *
041         * @param out
042         */
043        public TcpBufferedOutputStream(OutputStream out) {
044            this(out, BUFFER_SIZE);
045        }
046    
047        /**
048         * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
049         * buffer size.
050         *
051         * @param out  the underlying output stream.
052         * @param size the buffer size.
053         * @throws IllegalArgumentException if size <= 0.
054         */
055        public TcpBufferedOutputStream(OutputStream out, int size) {
056            super(out);
057            if (size <= 0) {
058                throw new IllegalArgumentException("Buffer size <= 0");
059            }
060            buf = new byte[size];
061        }
062    
063        /**
064         * write a byte on to the stream
065         *
066         * @param b - byte to write
067         * @throws IOException
068         */
069        public void write(int b) throws IOException {
070            checkClosed();
071            if (availableBufferToWrite() < 1) {
072                flush();
073            }
074            buf[count++] = (byte) b;
075        }
076    
077    
078        /**
079         * write a byte array to the stream
080         *
081         * @param b   the byte buffer
082         * @param off the offset into the buffer
083         * @param len the length of data to write
084         * @throws IOException
085         */
086        public void write(byte b[], int off, int len) throws IOException {
087            checkClosed();
088            if (availableBufferToWrite() < len) {
089                flush();
090            }
091            if (buf.length >= len) {
092                System.arraycopy(b, off, buf, count, len);
093                count += len;
094            }
095            else {
096                out.write(b, off, len);
097            }
098        }
099    
100        /**
101         * flush the data to the output stream
102         * This doesn't call flush on the underlying outputstream, because
103         * Tcp is particularly efficent at doing this itself ....
104         *
105         * @throws IOException
106         */
107        public void flush() throws IOException {
108            if (count > 0 && out != null) {
109                out.write(buf, 0, count);
110                count = 0;
111            }
112        }
113    
114        /**
115         * close this stream
116         *
117         * @throws IOException
118         */
119        public void close() throws IOException {
120            super.close();
121            closed = true;
122        }
123    
124    
125        /**
126         * Checks that the stream has not been closed
127         *
128         * @throws IOException
129         */
130        protected void checkClosed() throws IOException {
131            if (closed) {
132                throw new EOFException("Cannot write to the stream any more it has already been closed");
133            }
134        }
135    
136        /**
137         * @return the amount free space in the buffer
138         */
139        private int availableBufferToWrite() {
140            return buf.length - count;
141        }
142    }