001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.transport.udp;
020    
021    import java.io.IOException;
022    import java.net.DatagramPacket;
023    import java.net.DatagramSocket;
024    import java.net.InetAddress;
025    import java.net.SocketTimeoutException;
026    import java.net.URI;
027    
028    import javax.jms.JMSException;
029    
030    import org.activemq.io.WireFormat;
031    import org.activemq.message.Packet;
032    import org.activemq.transport.TransportChannelSupport;
033    import org.activemq.transport.TransportStatusEvent;
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
038    
039    /**
040     * A UDP implementation of a TransportChannel
041     *
042     * @version $Revision: 1.1.1.1 $
043     */
044    public class UdpTransportChannel extends TransportChannelSupport implements Runnable {
045        private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
046        private static final int SO_TIMEOUT = 5000;
047        private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
048        protected DatagramSocket socket;
049        protected int port;
050        protected InetAddress inetAddress;
051        private WireFormat wireFormat;
052        private SynchronizedBoolean closed;
053        private SynchronizedBoolean started;
054        private Thread thread; //need to change this - and use a thread pool
055    
056        /**
057         * Construct basic helpers
058         */
059        protected UdpTransportChannel(WireFormat wireFormat) {
060            this.wireFormat = wireFormat;
061            closed = new SynchronizedBoolean(false);
062            started = new SynchronizedBoolean(false);
063        }
064    
065        public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
066            this(wireFormat, remoteLocation, remoteLocation.getPort());
067        }
068    
069        public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
070            this(wireFormat);
071            try {
072                this.port = port;
073                this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
074                this.socket = createSocket(remoteLocation.getPort());
075                //log.info("Creating multicast socket on port: " + port + " on
076                // host: " + remoteLocation.getHost());
077                socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
078                socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
079                connect();
080                // now lets update the port so that sends will go elsewhere
081            }
082            catch (Exception ioe) {
083                JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
084                jmsEx.setLinkedException(ioe);
085                throw jmsEx;
086            }
087        }
088    
089        /**
090         * @param socket
091         * @throws JMSException
092         */
093        public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException {
094            this(wireFormat);
095            this.socket = socket;
096            this.port = socket.getPort();
097            this.inetAddress = socket.getInetAddress();
098            try {
099                socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
100                socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
101            }
102            catch (IOException ioe) {
103                JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
104                jmsEx.setLinkedException(ioe);
105                throw jmsEx;
106            }
107        }
108    
109        public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
110            this(wireFormat, socket);
111            this.port = port;
112        }
113    
114        /**
115         * close the channel
116         */
117        public void stop() {
118            if (closed.commit(false, true)) {
119                super.stop();
120                try {
121                    socket.close();
122                }
123                catch (Exception e) {
124                    log.trace(toString() + " now closed");
125                }
126            }
127        }
128    
129        public void forceDisconnect() {
130            log.debug("Forcing disconnect");
131            if (socket != null && socket.isConnected()) {
132                socket.close();
133            }
134            setTransportConnected(false);
135            fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
136        }
137    
138        /**
139         * start listeneing for events
140         *
141         * @throws JMSException if an error occurs
142         */
143        public void start() throws JMSException {
144            if (started.commit(false, true)) {
145                thread = new Thread(this, toString());
146                if (isServerSide()) {
147                    thread.setDaemon(true);
148                }
149                thread.start();
150            }
151        }
152    
153        /**
154         * Asynchronously send a Packet
155         *
156         * @param packet
157         * @throws JMSException
158         */
159        public void asyncSend(Packet packet) throws JMSException {
160            try {
161                if (log.isDebugEnabled()) {
162                    log.debug("Sending packet: " + packet);
163                }
164                DatagramPacket dpacket = createDatagramPacket(packet);
165                // lets sync to avoid concurrent writes
166                //synchronized (lock) {
167                socket.send(dpacket);
168                //}
169            }
170            catch (IOException e) {
171                JMSException jmsEx = new JMSException("asyncSend failed " + e);
172                jmsEx.setLinkedException(e);
173                onAsyncException(jmsEx);
174                throw jmsEx;
175            }
176        }
177    
178        public boolean isMulticast() {
179            return false;
180        }
181    
182        /**
183         * reads packets from a Socket
184         */
185        public void run() {
186            DatagramPacket dpacket = createDatagramPacket();
187            while (!closed.get()) {
188                try {
189                    socket.setSoTimeout(SO_TIMEOUT);
190                    while (!socket.isClosed()) {
191                        socket.setSoTimeout(0);
192                        socket.receive(dpacket);
193                        if (dpacket.getLength() > 0) {
194                            Packet packet = wireFormat.readPacket(getClientID(), dpacket);
195                            if (packet != null) {
196                                doConsumePacket(packet);
197                            }
198                        }
199                    }
200                    log.trace("The socket peer is now closed");
201                    doClose(new IOException("Socket peer is now closed"));
202                }
203                catch (SocketTimeoutException ste) {
204                    //continue;
205                }
206                catch (IOException e) {
207                    doClose(e);
208                }
209            }
210        }
211    
212        /**
213         * Can this wireformat process packets of this version
214         *
215         * @param version the version number to test
216         * @return true if can accept the version
217         */
218        public boolean canProcessWireFormatVersion(int version) {
219            return wireFormat.canProcessWireFormatVersion(version);
220        }
221    
222        /**
223         * @return the current version of this wire format
224         */
225        public int getCurrentWireFormatVersion() {
226            return wireFormat.getCurrentWireFormatVersion();
227        }
228    
229        /**
230         * @return
231         */
232        protected DatagramPacket createDatagramPacket() {
233            DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
234            if (port >= 0) {
235                answer.setPort(port);
236            }
237            answer.setAddress(inetAddress);
238            return answer;
239        }
240    
241        protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
242            /*
243             * if (packet instanceof ActiveMQMessage) { ActiveMQMessage message = (ActiveMQMessage) packet;
244             * System.out.println(">>> about to send message with clientID: " + message.getJMSClientID()); }
245             */
246            DatagramPacket answer = wireFormat.writePacket(getClientID(), packet);
247            if (port >= 0) {
248                answer.setPort(port);
249            }
250            answer.setAddress(inetAddress);
251            return answer;
252        }
253    
254        private void doClose(Exception ex) {
255            if (!closed.get()) {
256                JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
257                jmsEx.setLinkedException(ex);
258                onAsyncException(jmsEx);
259                stop();
260            }
261        }
262    
263        protected void connect() throws IOException {
264            //socket.connect(inetAddress, port);
265        }
266    
267        protected DatagramSocket createSocket(int port) throws IOException {
268            return new DatagramSocket(port, inetAddress);
269        }
270    
271        /**
272         * pretty print for object
273         *
274         * @return String representation of this object
275         */
276        public String toString() {
277            return "UdpTransportChannel: " + socket;
278        }
279    }