001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.jrms; 019 020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 021 import com.sun.multicast.reliable.RMException; 022 import com.sun.multicast.reliable.transport.RMPacketSocket; 023 import com.sun.multicast.reliable.transport.SessionDoneException; 024 import com.sun.multicast.reliable.transport.TransportProfile; 025 import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.activemq.io.WireFormat; 029 import org.activemq.message.Packet; 030 import org.activemq.transport.TransportChannelSupport; 031 import org.activemq.util.IdGenerator; 032 033 import javax.jms.JMSException; 034 import java.io.IOException; 035 import java.net.DatagramPacket; 036 import java.net.InetAddress; 037 import java.net.URI; 038 039 /** 040 * A JRMS implementation of a TransportChannel 041 * 042 * @version $Revision$ 043 */ 044 public class JRMSTransportChannel extends TransportChannelSupport implements Runnable { 045 046 private static final int SOCKET_BUFFER_SIZE = 32 * 1024; 047 private static final Log log = LogFactory.getLog(JRMSTransportChannel.class); 048 049 private WireFormat wireFormat; 050 private SynchronizedBoolean closed; 051 private SynchronizedBoolean started; 052 private Thread thread; //need to change this - and use a thread pool 053 // need to see our own messages 054 private RMPacketSocket socket; 055 private IdGenerator idGenerator; 056 private String channelId; 057 private int port; 058 private InetAddress inetAddress; 059 private Object lock; 060 061 /** 062 * Construct basic helpers 063 */ 064 protected JRMSTransportChannel(WireFormat wireFormat) { 065 this.wireFormat = wireFormat; 066 idGenerator = new IdGenerator(); 067 channelId = idGenerator.generateId(); 068 closed = new SynchronizedBoolean(false); 069 started = new SynchronizedBoolean(false); 070 lock = new Object(); 071 } 072 073 /** 074 * Connect to a remote Node - e.g. a Broker 075 * 076 * @param remoteLocation 077 * @throws JMSException 078 */ 079 public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException { 080 this(wireFormat); 081 try { 082 this.port = remoteLocation.getPort(); 083 this.inetAddress = InetAddress.getByName(remoteLocation.getHost()); 084 LRMPTransportProfile profile = new LRMPTransportProfile(inetAddress, port); 085 profile.setTTL((byte) 1); 086 profile.setOrdered(true); 087 this.socket = profile.createRMPacketSocket(TransportProfile.SEND_RECEIVE); 088 } 089 catch (Exception ioe) { 090 ioe.printStackTrace(); 091 JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe); 092 jmsEx.setLinkedException(ioe); 093 throw jmsEx; 094 } 095 } 096 097 /** 098 * close the channel 099 */ 100 public void stop() { 101 if (closed.commit(false, true)) { 102 super.stop(); 103 try { 104 socket.close(); 105 } 106 catch (Exception e) { 107 log.trace(toString() + " now closed"); 108 } 109 } 110 } 111 112 /** 113 * start listeneing for events 114 * 115 * @throws JMSException if an error occurs 116 */ 117 public void start() throws JMSException { 118 if (started.commit(false, true)) { 119 thread = new Thread(this, toString()); 120 if (isServerSide()) { 121 thread.setDaemon(true); 122 } 123 thread.start(); 124 } 125 } 126 127 /** 128 * Asynchronously send a Packet 129 * 130 * @param packet 131 * @throws JMSException 132 */ 133 public void asyncSend(Packet packet) throws JMSException { 134 try { 135 DatagramPacket dpacket = createDatagramPacket(packet); 136 137 // lets sync to avoid concurrent writes 138 //synchronized (lock) { 139 socket.send(dpacket); 140 //} 141 } 142 catch (RMException rme) { 143 JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage()); 144 jmsEx.setLinkedException(rme); 145 throw jmsEx; 146 } 147 catch (IOException e) { 148 JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage()); 149 jmsEx.setLinkedException(e); 150 throw jmsEx; 151 } 152 } 153 154 155 public boolean isMulticast() { 156 return true; 157 } 158 159 /** 160 * reads packets from a Socket 161 */ 162 public void run() { 163 try { 164 while (!closed.get()) { 165 DatagramPacket dpacket = socket.receive(); 166 Packet packet = wireFormat.readPacket(channelId, dpacket); 167 if (packet != null) { 168 doConsumePacket(packet); 169 } 170 } 171 log.trace("The socket peer is now closed"); 172 //doClose(new IOException("Socket peer is now closed")); 173 stop(); 174 } 175 catch (SessionDoneException e) { 176 // this isn't really an exception, it just indicates 177 // that the socket has closed normally 178 log.trace("Session completed", e); 179 stop(); 180 } 181 catch (RMException ste) { 182 doClose(ste); 183 } 184 catch (IOException e) { 185 doClose(e); 186 } 187 } 188 189 /** 190 * Can this wireformat process packets of this version 191 * @param version the version number to test 192 * @return true if can accept the version 193 */ 194 public boolean canProcessWireFormatVersion(int version){ 195 return wireFormat.canProcessWireFormatVersion(version); 196 } 197 198 /** 199 * @return the current version of this wire format 200 */ 201 public int getCurrentWireFormatVersion(){ 202 return wireFormat.getCurrentWireFormatVersion(); 203 } 204 205 protected DatagramPacket createDatagramPacket() { 206 DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE); 207 answer.setPort(port); 208 answer.setAddress(inetAddress); 209 return answer; 210 } 211 212 protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException { 213 DatagramPacket answer = wireFormat.writePacket(channelId, packet); 214 answer.setPort(port); 215 answer.setAddress(inetAddress); 216 return answer; 217 } 218 219 private void doClose(Exception ex) { 220 if (!closed.get()) { 221 JMSException jmsEx = new JMSException("Error reading socket: " + ex); 222 jmsEx.setLinkedException(ex); 223 onAsyncException(jmsEx); 224 stop(); 225 } 226 } 227 228 /** 229 * pretty print for object 230 * 231 * @return String representation of this object 232 */ 233 public String toString() { 234 return "JRMSTransportChannel: " + socket; 235 } 236 237 public void forceDisconnect() { 238 // TODO: implement me. 239 throw new RuntimeException("Not yet Implemented."); 240 } 241 }