001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.activeio;
020    import java.io.DataInputStream;
021    import java.io.DataOutputStream;
022    import java.io.EOFException;
023    import java.io.IOException;
024    import java.net.SocketException;
025    
026    import javax.jms.JMSException;
027    
028    import org.activeio.AsyncChannel;
029    import org.activeio.AsyncChannelListener;
030    import org.activeio.adapter.PacketByteArrayOutputStream;
031    import org.activeio.adapter.PacketInputStream;
032    import org.activeio.net.SocketMetadata;
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.activemq.io.WireFormat;
036    import org.activemq.message.Packet;
037    import org.activemq.transport.TransportChannelSupport;
038    import org.activemq.transport.TransportStatusEvent;
039    import org.activemq.util.JMSExceptionHelper;
040    
041    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
042    
043    /**
044     * A tcp implementation of a TransportChannel
045     * 
046     * @version $Revision: 1.1.1.1 $
047     */
048    public class ActiveIOTransportChannel extends TransportChannelSupport implements AsyncChannelListener {
049    
050        private static final Log log = LogFactory.getLog(ActiveIOTransportChannel.class);
051        private final Object writeLock = new Object();
052        private final AsyncChannel asynchChannel;
053        private final SynchronizedBoolean closed = new SynchronizedBoolean(false);
054        private final PacketByteArrayOutputStream outputBuffer = new PacketByteArrayOutputStream();
055        private final DataOutputStream dataOut = new DataOutputStream(outputBuffer);
056        
057        private final PacketAggregator aggregator = new PacketAggregator() {
058            protected void packetAssembled(org.activeio.Packet packet) {
059                try {
060                    Packet p = getWireFormat().readPacket(new DataInputStream(new PacketInputStream(packet)));
061                    if( p!=null ) {
062                        doConsumePacket(p);
063                    }
064                } catch (IOException e) {
065                    onPacketError(e);
066                }
067            }
068        };
069    
070        public ActiveIOTransportChannel(WireFormat wireFormat, AsyncChannel asynchChannel) {
071            super(wireFormat);
072            this.asynchChannel = asynchChannel;
073            asynchChannel.setAsyncChannelListener(this);
074            
075            // Enable TcpNoDelay if possible
076            SocketMetadata socket = (SocketMetadata) asynchChannel.narrow(SocketMetadata.class);
077            if(socket!=null) {
078                            try {
079                                            socket.setTcpNoDelay(true);
080                            } catch (SocketException e) {
081                            }
082            }
083        }
084    
085        public void start() throws JMSException {
086            try {
087                asynchChannel.start();
088            } catch (IOException e) {
089                throw JMSExceptionHelper.newJMSException(e.getMessage(),e);
090            }
091        }
092        
093        public void stop() {
094            if (closed.commit(false, true)) {
095                    super.stop();
096                    asynchChannel.dispose();
097            }
098        }
099    
100        public void forceDisconnect() {
101            log.debug("Forcing disconnect");
102            asynchChannel.dispose();
103        }
104    
105            
106        public void asyncSend(Packet packet) throws JMSException {
107            doAsyncSend(packet);
108        }
109    
110        protected Packet doAsyncSend(Packet packet) throws JMSException {
111            Packet response = null;
112            try {
113                synchronized (writeLock) {
114                    response = getWireFormat().writePacket(packet, dataOut);
115                    dataOut.flush();
116                    asynchChannel.write( outputBuffer.getPacket() );
117                    asynchChannel.flush();
118                    outputBuffer.reset();
119                }
120            }
121            catch (IOException e) {
122                if (closed.get()) {
123                    log.trace("Caught exception while closed: " + e, e);
124                }
125                else {
126                    throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
127                }
128            }
129            catch (JMSException e) {
130                if (closed.get()) {
131                    log.trace("Caught exception while closed: " + e, e);
132                }
133                else {
134                    throw e;
135                }
136            }
137            return response;
138        }
139    
140        public void onPacket(org.activeio.Packet packet) {
141            try {
142                aggregator.addRawPacket(packet);
143            } catch (IOException e) {
144                onPacketError(e);
145            }
146        }
147    
148        public void onPacketError(IOException ex) {
149            if (!closed.get()) {
150                if (!pendingStop){
151                    setPendingStop(true);
152                    setTransportConnected(false);
153                    if (ex instanceof EOFException && isServerSide() == false) {
154                        log.warn("Peer closed connection", ex);
155                    }
156                    else {
157                        onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
158                    }
159                    fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
160                }
161                stop();
162            }
163        }
164    
165        public AsyncChannel getAsyncChannel() {
166            return asynchChannel;
167        }
168        
169        /**
170         * @return the current version of this wire format
171         */
172        public int getCurrentWireFormatVersion() {
173            return getWireFormat().getCurrentWireFormatVersion();
174        }
175        
176    }