001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.tcp;
020    
021    import java.io.IOException;
022    import java.net.InetAddress;
023    import java.net.InetSocketAddress;
024    import java.net.ServerSocket;
025    import java.net.Socket;
026    import java.net.SocketTimeoutException;
027    import java.net.URI;
028    import java.net.URISyntaxException;
029    import java.net.UnknownHostException;
030    
031    import javax.jms.JMSException;
032    
033    import org.activemq.io.WireFormat;
034    import org.activemq.transport.TransportServerChannelSupport;
035    import org.activemq.util.JMSExceptionHelper;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
040    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
041    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
042    
043    /**
044     * Binds to a well known port and listens for Sockets ...
045     *
046     * @version $Revision: 1.1.1.1 $
047     */
048    public class TcpTransportServerChannel extends TransportServerChannelSupport implements Runnable {
049        private static final Log log = LogFactory.getLog(TcpTransportServerChannel.class);
050        protected static final int DEFAULT_BACKLOG = 500;
051        private WireFormat wireFormat;
052        private Thread serverSocketThread;
053        private ServerSocket serverSocket;
054        private SynchronizedBoolean closed;
055        private SynchronizedBoolean started;
056        private boolean useAsyncSend = false;
057        private int maxOutstandingMessages = 10;
058        private int backlog = DEFAULT_BACKLOG;
059    
060        /**
061         * Default Constructor
062         *
063         * @param bindAddr
064         * @throws JMSException
065         */
066        public TcpTransportServerChannel(WireFormat wireFormat, URI bindAddr) throws JMSException {
067            super(bindAddr);
068            this.wireFormat = wireFormat;
069            closed = new SynchronizedBoolean(false);
070            started = new SynchronizedBoolean(false);
071            try {
072                serverSocket = createServerSocket(bindAddr);
073                serverSocket.setSoTimeout(2000);
074                updatePhysicalUri(bindAddr);
075            }
076            catch (Exception se) {
077                System.out.println(se);
078                se.printStackTrace();
079                throw JMSExceptionHelper.newJMSException("Bind to " + bindAddr + " failed: " + se.getMessage(), se);
080            }
081        }
082    
083        public TcpTransportServerChannel(WireFormat wireFormat, ServerSocket serverSocket) throws JMSException {
084            super(serverSocket.getInetAddress().toString());
085            this.wireFormat = wireFormat;
086            this.serverSocket = serverSocket;
087            closed = new SynchronizedBoolean(false);
088            started = new SynchronizedBoolean(false);
089            InetAddress address = serverSocket.getInetAddress();
090            try {
091                updatePhysicalUri(new URI("tcp", "", address.getHostName(), 0, "", "", ""));
092            }
093            catch (URISyntaxException e) {
094                throw JMSExceptionHelper.newJMSException("Failed to extract URI: : " + e.getMessage(), e);
095            }
096        }
097    
098        public void start() throws JMSException {
099            super.start();
100            if (started.commit(false, true)) {
101                log.info("Listening for connections at: " + getUrl());
102                serverSocketThread = new Thread(this, toString());
103                serverSocketThread.setDaemon(true);
104                serverSocketThread.start();
105            }
106        }
107    
108        public void stop() throws JMSException {
109            if (closed.commit(false, true)) {
110                super.stop();
111                try {
112                    if (serverSocket != null) {
113                        serverSocket.close();
114                        if (serverSocketThread != null) {
115                            serverSocketThread.join();
116                            serverSocketThread = null;
117                        }
118                    }
119                }
120                catch (Throwable e) {
121                    throw JMSExceptionHelper.newJMSException("Failed to stop: " + e, e);
122                }
123            }
124        }
125    
126        public InetSocketAddress getSocketAddress() {
127            return (InetSocketAddress) serverSocket.getLocalSocketAddress();
128        }
129    
130        /**
131         * @return pretty print of this
132         */
133        public String toString() {
134            return "TcpTransportServerChannel@" + getUrl();
135        }
136    
137        /**
138         * pull Sockets from the ServerSocket
139         */
140        public void run() {
141            while (!closed.get()) {
142                Socket socket = null;
143                try {
144                    socket = serverSocket.accept();
145                    if (socket != null) {
146                        if (closed.get()) {
147                            socket.close();
148                        }
149                        else {
150                            // have thread per channel for sending messages and a thread for receiving them
151                            PooledExecutor executor = null;
152                            if (useAsyncSend) {
153                                executor = new PooledExecutor(new BoundedBuffer(maxOutstandingMessages), 1);
154                                executor.waitWhenBlocked();
155                                executor.setKeepAliveTime(1000);
156                            }
157                            TcpTransportChannel channel = createTransportChannel(socket, executor);
158                            addClient(channel);
159                        }
160                    }
161                }
162                catch (SocketTimeoutException ste) {
163                    //expect this to happen
164                }
165                catch (Throwable e) {
166                    if (!closed.get()) {
167                        log.warn("run()", e);
168                    }
169                }
170            }
171        }
172    
173        protected TcpTransportChannel createTransportChannel(Socket socket, PooledExecutor executor) throws JMSException {
174            TcpTransportChannel channel = new TcpTransportChannel(this,wireFormat.copy(), socket, executor);
175            return channel;
176        }
177    
178        // Properties
179        //-------------------------------------------------------------------------
180        public boolean isUseAsyncSend() {
181            return useAsyncSend;
182        }
183    
184        public void setUseAsyncSend(boolean useAsyncSend) {
185            this.useAsyncSend = useAsyncSend;
186        }
187    
188        public int getMaxOutstandingMessages() {
189            return maxOutstandingMessages;
190        }
191    
192        public void setMaxOutstandingMessages(int maxOutstandingMessages) {
193            this.maxOutstandingMessages = maxOutstandingMessages;
194        }
195    
196        public int getBacklog() {
197            return backlog;
198        }
199    
200        public void setBacklog(int backlog) {
201            this.backlog = backlog;
202        }
203    
204        public WireFormat getWireFormat() {
205            return wireFormat;
206        }
207    
208        public void setWireFormat(WireFormat wireFormat) {
209            this.wireFormat = wireFormat;
210        }
211    
212        // Implementation methods
213        //-------------------------------------------------------------------------
214        /**
215         * In cases where we construct ourselves with a zero port we need to regenerate the URI with the real physical port
216         * so that people can connect to us via discovery
217         */
218        protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException {
219            URI newURI = new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket
220                    .getLocalPort(), bindAddr.getPath(), bindAddr.getQuery(), bindAddr.getFragment());
221            setUrl(newURI.toString());
222        }
223    
224        /**
225         * Factory method to create a new ServerSocket
226         *
227         * @throws UnknownHostException
228         * @throws IOException
229         */
230        protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException {
231            ServerSocket answer = null;
232            String host = bind.getHost();
233            host = (host == null || host.length() == 0) ? "localhost" : host;
234            InetAddress addr = InetAddress.getByName(host);
235            if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
236                answer = new ServerSocket(bind.getPort(), backlog);
237            }
238            else {
239                answer = new ServerSocket(bind.getPort(), backlog, addr);
240            }
241            return answer;
242        }
243    }