001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.tcp;
020    
021    import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
022    import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
023    import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
024    import EDU.oswego.cs.dl.util.concurrent.Executor;
025    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
026    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.activemq.io.WireFormat;
030    import org.activemq.io.WireFormatLoader;
031    import org.activemq.message.Packet;
032    import org.activemq.transport.TransportChannelSupport;
033    import org.activemq.transport.TransportStatusEvent;
034    import org.activemq.util.JMSExceptionHelper;
035    
036    import javax.jms.JMSException;
037    import java.io.BufferedInputStream;
038    import java.io.DataInputStream;
039    import java.io.DataOutputStream;
040    import java.io.EOFException;
041    import java.io.IOException;
042    import java.io.InterruptedIOException;
043    import java.net.InetAddress;
044    import java.net.InetSocketAddress;
045    import java.net.Socket;
046    import java.net.SocketAddress;
047    import java.net.SocketException;
048    import java.net.SocketTimeoutException;
049    import java.net.URI;
050    import java.net.UnknownHostException;
051    
052    /**
053     * A tcp implementation of a TransportChannel
054     *
055     * @version $Revision: 1.2 $
056     */
057    public class TcpTransportChannel extends TransportChannelSupport implements Runnable {
058        private static final int DEFAULT_SOCKET_BUFFER_SIZE = 64 * 1024;
059        private static final Log log = LogFactory.getLog(TcpTransportChannel.class);
060        protected Socket socket;
061        protected DataOutputStream dataOut;
062        protected DataInputStream dataIn;
063    
064        private WireFormatLoader wireFormatLoader;
065        private SynchronizedBoolean closed;
066        private SynchronizedBoolean started;
067        private Object outboundLock;
068        private Executor executor;
069        private Thread thread;
070        private boolean useAsyncSend = false;
071        private int soTimeout = 10000;
072        private int socketBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
073        private BoundedChannel exceptionsList;
074        private TcpTransportServerChannel serverChannel;
075    
076        /**
077         * Construct basic helpers
078         *
079         * @param wireFormat
080         */
081        protected TcpTransportChannel(WireFormat wireFormat) {
082            super(wireFormat);
083            this.wireFormatLoader = new WireFormatLoader(wireFormat);
084            closed = new SynchronizedBoolean(false);
085            started = new SynchronizedBoolean(false);
086            // there's not much point logging all exceptions, lets just keep a few around
087            exceptionsList = new BoundedLinkedQueue(10);
088            outboundLock = new Object();
089            setUseAsyncSend(useAsyncSend);
090            super.setCachingEnabled(true);
091        }
092    
093        /**
094         * Connect to a remote Node - e.g. a Broker
095         *
096         * @param wireFormat
097         * @param remoteLocation
098         * @throws JMSException
099         */
100        public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
101            this(wireFormat);
102            try {
103                this.socket = createSocket(remoteLocation);
104                initializeStreams();
105            }
106            catch (Exception ioe) {
107                throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed. " + "URI was: "
108                        + remoteLocation + " Reason: " + ioe, ioe);
109            }
110        }
111    
112        /**
113         * Connect to a remote Node - e.g. a Broker
114         *
115         * @param wireFormat
116         * @param remoteLocation
117         * @param localLocation  - e.g. local InetAddress and local port
118         * @throws JMSException
119         */
120        public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
121            this(wireFormat);
122            try {
123                this.socket = createSocket(remoteLocation, localLocation);
124                initializeStreams();
125            }
126            catch (Exception ioe) {
127                throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
128            }
129        }
130    
131       /**
132        * Initialize from a ServerSocket
133        * @param serverChannel
134        * @param wireFormat
135        * @param socket
136        * @param executor
137        * @throws JMSException
138        */
139        public TcpTransportChannel(TcpTransportServerChannel serverChannel,WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
140            this(wireFormat);
141            this.socket = socket;
142            this.executor = executor;
143            this.serverChannel = serverChannel;
144            setServerSide(true);
145            try {
146                initialiseSocket(socket);
147                initializeStreams();
148            }
149            catch (IOException ioe) {
150                throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
151            }
152        }
153    
154        public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
155            this(wireFormat);
156            this.socket = socket;
157            this.executor = executor;
158            try {
159                initialiseSocket(socket);
160                initializeStreams();
161            }
162            catch (IOException ioe) {
163                throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
164            }
165        }
166    
167        /**
168         * start listeneing for events
169         *
170         * @throws JMSException if an error occurs
171         */
172        public void start() throws JMSException {
173            if (started.commit(false, true)) {
174                thread = new Thread(this, toString());
175                try {
176                    if (isServerSide()) {
177                        thread.setDaemon(true);
178                        readWireFormat();
179                        getWireFormat().registerTransportStreams(dataOut, dataIn);
180                        getWireFormat().initiateServerSideProtocol();
181                    }
182                    else {
183                        getWireFormat().registerTransportStreams(dataOut, dataIn);
184                        thread.setPriority(Thread.NORM_PRIORITY + 2);
185                    }
186                    //enable caching on the wire format
187                    currentWireFormat.setCachingEnabled(isCachingEnabled());
188                    thread.start();
189                    //send the wire format
190                    if (!isServerSide()) {
191                        getWireFormat().initiateClientSideProtocol();
192                    }
193                                    fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.CONNECTED));
194                }
195                catch (EOFException e) {
196                    doClose(e);
197                }
198                catch (IOException e) {
199                    JMSException jmsEx = new JMSException("start failed: " + e.getMessage());
200                    jmsEx.initCause(e);
201                    jmsEx.setLinkedException(e);
202                    throw jmsEx;
203                }
204            }
205        }
206    
207        protected void readWireFormat() throws JMSException, IOException {
208            WireFormat wf = wireFormatLoader.getWireFormat(dataIn);
209            if (wf != null) {
210                setWireFormat(wf);
211            }
212        }
213    
214        /**
215         * close the channel
216         */
217        public void stop() {
218            if (closed.commit(false, true)) {
219                super.stop();
220                try {
221                    if (executor != null) {
222                        stopExecutor(executor);
223                    }
224                    closeStreams();
225                    socket.close();
226                }
227                catch (Exception e) {
228                    log.warn("Caught while closing: " + e + ". Now Closed", e);
229                }
230            }
231            closed.set(true);
232            if (this.serverChannel != null){
233                this.serverChannel.removeClient(this);
234            }
235        }
236    
237        public void forceDisconnect() {
238            log.debug("Forcing disconnect");
239            if (socket != null && socket.isConnected()) {
240                try {
241                    socket.close();
242                }
243                catch (IOException e) {
244                    // Ignore
245                }
246            }
247        }
248    
249        /**
250         * Asynchronously send a Packet
251         *
252         * @param packet
253         * @throws JMSException
254         */
255        public void asyncSend(final Packet packet) throws JMSException {
256            if (executor != null) {
257                try {
258                    executor.execute(new Runnable() {
259                        public void run() {
260                            try {
261                                if (!isClosed()) {
262                                    doAsyncSend(packet);
263                                }
264                            }
265                            catch (JMSException e) {
266                                try {
267                                    exceptionsList.put(e);
268                                }
269                                catch (InterruptedException e1) {
270                                    log.warn("Failed to add element to exception list: " + e1);
271                                }
272                            }
273                        }
274                    });
275                }
276                catch (InterruptedException e) {
277                    log.info("Caught: " + e, e);
278                }
279                try {
280                    JMSException e = (JMSException) exceptionsList.poll(0);
281                    if (e != null) {
282                        throw e;
283                    }
284                }
285                catch (InterruptedException e1) {
286                    log.warn("Failed to remove element to exception list: " + e1);
287                }
288            }
289            else {
290                doAsyncSend(packet);
291            }
292        }
293    
294        /**
295         * @return false
296         */
297        public boolean isMulticast() {
298            return false;
299        }
300    
301        /**
302         * reads packets from a Socket
303         */
304        public void run() {
305            log.trace("TCP consumer thread starting");
306            int count = 0;
307            while (!isClosed()) {
308                if (isServerSide() && ++count > 500) {
309                    count = 0;
310                    Thread.yield();
311                }
312                try {
313                    Packet packet = getWireFormat().readPacket(dataIn);
314                    if (packet != null) {
315                        doConsumePacket(packet);
316                    }
317                }
318                catch (SocketTimeoutException e) {
319                    //onAsyncException(JMSExceptionHelper.newJMSException(e));
320                }
321                catch (InterruptedIOException e) {
322                    // TODO confirm that this really is a bug in the AS/400 JVM
323                    // Patch for AS/400 JVM
324                    // lets ignore these exceptions
325                    // as they typically just indicate the thread was interupted
326                    // while waiting for input, not that the socket is in error
327                    //onAsyncException(JMSExceptionHelper.newJMSException(e));
328                }
329                catch (IOException e) {
330                    doClose(e);
331                }
332            }
333        }
334    
335        public boolean isClosed() {
336            return closed.get();
337        }
338    
339        /**
340         * pretty print for object
341         *
342         * @return String representation of this object
343         */
344        public String toString() {
345            return "TcpTransportChannel: " + socket;
346        }
347    
348        /**
349         * @return the socket used by the TcpTransportChannel
350         */
351        public Socket getSocket() {
352            return socket;
353        }
354    
355        /**
356         * Can this wireformat process packets of this version
357         *
358         * @param version the version number to test
359         * @return true if can accept the version
360         */
361        public boolean canProcessWireFormatVersion(int version) {
362            return getWireFormat().canProcessWireFormatVersion(version);
363        }
364    
365        /**
366         * @return the current version of this wire format
367         */
368        public int getCurrentWireFormatVersion() {
369            return getWireFormat().getCurrentWireFormatVersion();
370        }
371    
372        // Properties
373        //-------------------------------------------------------------------------
374    
375        /**
376         * @return true if packets are enqueued to a separate queue before dispatching
377         */
378        public boolean isUseAsyncSend() {
379            return useAsyncSend;
380        }
381    
382        /**
383         * set the useAsync flag
384         *
385         * @param useAsyncSend
386         */    
387         public void setUseAsyncSend(boolean useAsyncSend) {
388            this.useAsyncSend = useAsyncSend;
389            try {
390                if (useAsyncSend && executor==null ) {
391                    PooledExecutor pe = new PooledExecutor(new BoundedBuffer(10), 1);
392                    pe.waitWhenBlocked();
393                    pe.setKeepAliveTime(1000);
394                    executor = pe;
395                }
396                else if (!useAsyncSend && executor != null) {
397                    stopExecutor(executor);
398                }
399            }
400            catch (Exception e) {
401                log.warn("problem closing executor", e);
402            }
403        }
404    
405        
406    
407        /**
408         * @return the current so timeout used on the socket
409         */
410        public int getSoTimeout() {
411            return soTimeout;
412        }
413    
414        /**
415         * set the socket so timeout
416         *
417         * @param soTimeout
418         * @throws JMSException
419         */
420        public void setSoTimeout(int soTimeout) throws JMSException {
421            this.soTimeout = soTimeout;
422            if (this.socket != null){
423                try {
424                    socket.setSoTimeout(soTimeout);
425                }
426                catch (SocketException e) {
427                    JMSException jmsEx = new JMSException("Failed to set soTimeout: ", e.getMessage());
428                    jmsEx.setLinkedException(e);
429                    throw jmsEx;
430                }
431            }
432        }
433        
434        /**
435         * @param noDelay The noDelay to set.
436         */
437        public void setNoDelay(boolean noDelay) {
438            super.setNoDelay(noDelay);
439            if (socket != null){
440                try {
441                    socket.setTcpNoDelay(noDelay);
442                }
443                catch (SocketException e) {
444                   log.warn("failed to set noDelay on the socket");//should never happen
445                }
446            }
447        }
448    
449        /**
450         * @return Returns the socketBufferSize.
451         */
452        public int getSocketBufferSize() {
453            return socketBufferSize;
454        }
455        /**
456         * @param socketBufferSize The socketBufferSize to set.
457         */
458        public void setSocketBufferSize(int socketBufferSize) {
459            this.socketBufferSize = socketBufferSize;
460        }
461        // Implementation methods
462        //-------------------------------------------------------------------------
463        /**
464         * Actually performs the async send of a packet
465         *
466         * @param packet
467         * @return a response or null
468         * @throws JMSException
469         */
470        protected Packet doAsyncSend(Packet packet) throws JMSException {
471            Packet response = null;
472            try {
473                synchronized (outboundLock) {
474                    response = getWireFormat().writePacket(packet, dataOut);
475                    dataOut.flush();
476                }
477            }
478            catch (IOException e) {
479    //            if (closed.get()) {
480    //                log.trace("Caught exception while closed: " + e, e);
481    //            }
482    //            else {
483                    JMSException exception = JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
484                    onAsyncException(exception);
485                    throw exception;
486    //            }
487            }
488            catch (JMSException e) {
489                if (isClosed()) {
490                    log.trace("Caught exception while closed: " + e, e);
491                }
492                else {
493                    throw e;
494                }
495            }
496            return response;
497        }
498    
499        protected void doClose(Exception ex) {
500            if (!isClosed()) {
501                if (!pendingStop) {
502                    setPendingStop(true);
503                    setTransportConnected(false);
504                    if (ex instanceof EOFException) {
505                        if (!isServerSide() && !isUsedInternally()){
506                            log.warn("Peer closed connection", ex);
507                        }
508                        fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
509                        onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
510                    }
511                    else {
512                        fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
513                        onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
514                    }
515                }
516                stop();
517            }
518        }
519    
520        /**
521         * Configures the socket for use
522         * @param sock
523         * @throws SocketException
524         */
525        protected void initialiseSocket(Socket sock) throws SocketException {
526            try {
527                sock.setReceiveBufferSize(socketBufferSize);
528                sock.setSendBufferSize(socketBufferSize);
529            }
530            catch (SocketException se) {
531                log.debug("Cannot set socket buffer size = " + socketBufferSize, se);
532            }
533            sock.setSoTimeout(soTimeout);
534            sock.setTcpNoDelay(isNoDelay());
535        }
536        
537        protected void initializeStreams() throws IOException{
538            BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream(),8192);
539            this.dataIn = new DataInputStream(buffIn);
540            TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(),8192);
541            this.dataOut = new DataOutputStream(buffOut);
542        }
543    
544        protected void closeStreams() throws IOException {
545            if (dataOut != null) {
546                dataOut.close();
547            }
548            if (dataIn != null) {
549                dataIn.close();
550            }
551        }
552    
553        /**
554         * Factory method to create a new socket
555         *
556         * @param remoteLocation the URI to connect to
557         * @return the newly created socket
558         * @throws UnknownHostException
559         * @throws IOException
560         */
561        protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException {
562            SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
563            Socket sock = new Socket();
564            initialiseSocket(sock);
565            sock.connect(sockAddress);
566            return sock;
567        }
568    
569        /**
570         * Factory method to create a new socket
571         *
572         * @param remoteLocation
573         * @param localLocation
574         * @return @throws IOException
575         * @throws IOException
576         * @throws UnknownHostException
577         */
578        protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
579            SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
580            SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
581            Socket sock = new Socket();
582            initialiseSocket(sock);
583            sock.bind(localAddress);
584            sock.connect(sockAddress);
585            return sock;
586        }
587    
588    }