001    /**
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.activeio;
020    
021    import java.io.IOException;
022    
023    import javax.jms.JMSException;
024    
025    import org.activeio.AcceptListener;
026    import org.activeio.AsyncChannel;
027    import org.activeio.AsyncChannelServer;
028    import org.activeio.Channel;
029    import org.activeio.adapter.SynchToAsynchChannelAdapter;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.activemq.io.WireFormat;
033    import org.activemq.transport.TransportServerChannelSupport;
034    import org.activemq.util.JMSExceptionHelper;
035    
036    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
037    
038    /**
039     * Binds to a well known port and listens for Sockets ...
040     * 
041     * @version $Revision: 1.1.1.1 $
042     */
043    public class ActiveIOTransportServerChannel extends TransportServerChannelSupport implements AcceptListener {
044        private static final Log log = LogFactory.getLog(ActiveIOTransportServerChannel.class);
045    
046        private final WireFormat wireFormat;
047    
048        private final AsyncChannelServer server;
049    
050        private final SynchronizedBoolean closed = new SynchronizedBoolean(false);
051    
052        /**
053         * @param wireFormat
054         * @param server
055         */
056        public ActiveIOTransportServerChannel(WireFormat wireFormat, AsyncChannelServer server) {
057            super(server.getBindURI());
058            this.wireFormat = wireFormat;
059            this.server = server;
060            server.setAcceptListener(this);
061        }
062    
063        public void start() throws JMSException {
064            try {
065                super.start();
066                server.start();
067            } catch (IOException e) {
068                throw JMSExceptionHelper.newJMSException(e.getMessage(), e);
069            }
070        }
071    
072        public void stop() throws JMSException {
073            if (closed.commit(false, true)) {
074                super.stop();
075                server.dispose();
076            }
077        }
078    
079        /**
080         * @return pretty print of this
081         */
082        public String toString() {
083            return "ActiveIOTransportServerChannel@" + getUrl();
084        }
085    
086        public void onAccept(Channel c) {
087            if (closed.get()) {
088                c.dispose();
089                return;
090            }
091    
092            AsyncChannel channel = SynchToAsynchChannelAdapter.adapt(c);
093            // If the channel is not allready buffered.. lets buffer it.
094            //if (channel.narrow(WriteBufferedAsynchChannel.class) == null
095            //        && channel.narrow(WriteBufferedSynchChannel.class) == null) {
096            //   channel = new WriteBufferedAsynchChannel(channel);
097            //}
098            addClient(new ActiveIOTransportChannel(wireFormat.copy(), channel));
099        }
100    
101        public void onAcceptError(IOException error) {
102            if (!closed.get()) {
103                log.warn("Caught exception accepting connection: " + error, error);
104                try {
105                    stop();
106                } catch (JMSException e) {
107                }
108            }
109    
110        }
111    
112    }