001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.udp; 020 021 import java.io.IOException; 022 import java.net.DatagramPacket; 023 import java.net.DatagramSocket; 024 import java.net.InetAddress; 025 import java.net.SocketTimeoutException; 026 import java.net.URI; 027 028 import javax.jms.JMSException; 029 030 import org.activemq.io.WireFormat; 031 import org.activemq.message.Packet; 032 import org.activemq.transport.TransportChannelSupport; 033 import org.activemq.transport.TransportStatusEvent; 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 037 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 038 039 /** 040 * A UDP implementation of a TransportChannel 041 * 042 * @version $Revision: 1.1.1.1 $ 043 */ 044 public class UdpTransportChannel extends TransportChannelSupport implements Runnable { 045 private static final int SOCKET_BUFFER_SIZE = 32 * 1024; 046 private static final int SO_TIMEOUT = 5000; 047 private static final Log log = LogFactory.getLog(UdpTransportChannel.class); 048 protected DatagramSocket socket; 049 protected int port; 050 protected InetAddress inetAddress; 051 private WireFormat wireFormat; 052 private SynchronizedBoolean closed; 053 private SynchronizedBoolean started; 054 private Thread thread; //need to change this - and use a thread pool 055 056 /** 057 * Construct basic helpers 058 */ 059 protected UdpTransportChannel(WireFormat wireFormat) { 060 this.wireFormat = wireFormat; 061 closed = new SynchronizedBoolean(false); 062 started = new SynchronizedBoolean(false); 063 } 064 065 public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException { 066 this(wireFormat, remoteLocation, remoteLocation.getPort()); 067 } 068 069 public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException { 070 this(wireFormat); 071 try { 072 this.port = port; 073 this.inetAddress = InetAddress.getByName(remoteLocation.getHost()); 074 this.socket = createSocket(remoteLocation.getPort()); 075 //log.info("Creating multicast socket on port: " + port + " on 076 // host: " + remoteLocation.getHost()); 077 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE); 078 socket.setSendBufferSize(SOCKET_BUFFER_SIZE); 079 connect(); 080 // now lets update the port so that sends will go elsewhere 081 } 082 catch (Exception ioe) { 083 JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe); 084 jmsEx.setLinkedException(ioe); 085 throw jmsEx; 086 } 087 } 088 089 /** 090 * @param socket 091 * @throws JMSException 092 */ 093 public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException { 094 this(wireFormat); 095 this.socket = socket; 096 this.port = socket.getPort(); 097 this.inetAddress = socket.getInetAddress(); 098 try { 099 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE); 100 socket.setSendBufferSize(SOCKET_BUFFER_SIZE); 101 } 102 catch (IOException ioe) { 103 JMSException jmsEx = new JMSException("Initialization of TransportChannel failed"); 104 jmsEx.setLinkedException(ioe); 105 throw jmsEx; 106 } 107 } 108 109 public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException { 110 this(wireFormat, socket); 111 this.port = port; 112 } 113 114 /** 115 * close the channel 116 */ 117 public void stop() { 118 if (closed.commit(false, true)) { 119 super.stop(); 120 try { 121 socket.close(); 122 } 123 catch (Exception e) { 124 log.trace(toString() + " now closed"); 125 } 126 } 127 } 128 129 public void forceDisconnect() { 130 log.debug("Forcing disconnect"); 131 if (socket != null && socket.isConnected()) { 132 socket.close(); 133 } 134 setTransportConnected(false); 135 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED)); 136 } 137 138 /** 139 * start listeneing for events 140 * 141 * @throws JMSException if an error occurs 142 */ 143 public void start() throws JMSException { 144 if (started.commit(false, true)) { 145 thread = new Thread(this, toString()); 146 if (isServerSide()) { 147 thread.setDaemon(true); 148 } 149 thread.start(); 150 } 151 } 152 153 /** 154 * Asynchronously send a Packet 155 * 156 * @param packet 157 * @throws JMSException 158 */ 159 public void asyncSend(Packet packet) throws JMSException { 160 try { 161 if (log.isDebugEnabled()) { 162 log.debug("Sending packet: " + packet); 163 } 164 DatagramPacket dpacket = createDatagramPacket(packet); 165 // lets sync to avoid concurrent writes 166 //synchronized (lock) { 167 socket.send(dpacket); 168 //} 169 } 170 catch (IOException e) { 171 JMSException jmsEx = new JMSException("asyncSend failed " + e); 172 jmsEx.setLinkedException(e); 173 onAsyncException(jmsEx); 174 throw jmsEx; 175 } 176 } 177 178 public boolean isMulticast() { 179 return false; 180 } 181 182 /** 183 * reads packets from a Socket 184 */ 185 public void run() { 186 DatagramPacket dpacket = createDatagramPacket(); 187 while (!closed.get()) { 188 try { 189 socket.setSoTimeout(SO_TIMEOUT); 190 while (!socket.isClosed()) { 191 socket.setSoTimeout(0); 192 socket.receive(dpacket); 193 if (dpacket.getLength() > 0) { 194 Packet packet = wireFormat.readPacket(getClientID(), dpacket); 195 if (packet != null) { 196 doConsumePacket(packet); 197 } 198 } 199 } 200 log.trace("The socket peer is now closed"); 201 doClose(new IOException("Socket peer is now closed")); 202 } 203 catch (SocketTimeoutException ste) { 204 //continue; 205 } 206 catch (IOException e) { 207 doClose(e); 208 } 209 } 210 } 211 212 /** 213 * Can this wireformat process packets of this version 214 * 215 * @param version the version number to test 216 * @return true if can accept the version 217 */ 218 public boolean canProcessWireFormatVersion(int version) { 219 return wireFormat.canProcessWireFormatVersion(version); 220 } 221 222 /** 223 * @return the current version of this wire format 224 */ 225 public int getCurrentWireFormatVersion() { 226 return wireFormat.getCurrentWireFormatVersion(); 227 } 228 229 /** 230 * @return 231 */ 232 protected DatagramPacket createDatagramPacket() { 233 DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE); 234 if (port >= 0) { 235 answer.setPort(port); 236 } 237 answer.setAddress(inetAddress); 238 return answer; 239 } 240 241 protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException { 242 /* 243 * if (packet instanceof ActiveMQMessage) { ActiveMQMessage message = (ActiveMQMessage) packet; 244 * System.out.println(">>> about to send message with clientID: " + message.getJMSClientID()); } 245 */ 246 DatagramPacket answer = wireFormat.writePacket(getClientID(), packet); 247 if (port >= 0) { 248 answer.setPort(port); 249 } 250 answer.setAddress(inetAddress); 251 return answer; 252 } 253 254 private void doClose(Exception ex) { 255 if (!closed.get()) { 256 JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage()); 257 jmsEx.setLinkedException(ex); 258 onAsyncException(jmsEx); 259 stop(); 260 } 261 } 262 263 protected void connect() throws IOException { 264 //socket.connect(inetAddress, port); 265 } 266 267 protected DatagramSocket createSocket(int port) throws IOException { 268 return new DatagramSocket(port, inetAddress); 269 } 270 271 /** 272 * pretty print for object 273 * 274 * @return String representation of this object 275 */ 276 public String toString() { 277 return "UdpTransportChannel: " + socket; 278 } 279 }