001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.vm; 019 020 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 021 import EDU.oswego.cs.dl.util.concurrent.Channel; 022 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 import org.activemq.broker.BrokerConnector; 026 import org.activemq.message.Packet; 027 import org.activemq.message.PacketListener; 028 import org.activemq.transport.TransportChannelListener; 029 import org.activemq.transport.TransportChannelSupport; 030 031 import javax.jms.JMSException; 032 033 /** 034 * A VM implementation of a TransportChannel 035 * 036 * @version $Revision: 1.1.1.1 $ 037 */ 038 public class VmTransportChannel extends TransportChannelSupport implements Runnable { 039 040 private static final Log log = LogFactory.getLog(VmTransportChannel.class); 041 private static final Object TERMINATE = new Object(); 042 private static int lastThreadId = 0; // To number the threads. 043 044 // properties 045 private Channel sendChannel; 046 private Channel receiveChannel; 047 private int sendCapacity = 10; 048 private int receiveCapacity = 10; 049 private boolean asyncSend = false; 050 051 // state 052 private SynchronizedBoolean closed; 053 private SynchronizedBoolean started; 054 private Thread thread; //need to change this - and use a thread pool 055 private PacketListener sendListener; 056 private VmTransportChannel clientSide; 057 058 public VmTransportChannel() { 059 closed = new SynchronizedBoolean(false); 060 started = new SynchronizedBoolean(false); 061 } 062 063 public VmTransportChannel(Channel sendChannel, Channel receiveChannel) { 064 this(); 065 this.sendChannel = sendChannel; 066 this.receiveChannel = receiveChannel; 067 } 068 069 public VmTransportChannel(int capacity) { 070 this(new BoundedLinkedQueue(capacity), new BoundedLinkedQueue(capacity)); 071 } 072 073 public void start() throws JMSException { 074 if (started.commit(false, true)) { 075 if (isAsyncSend()) { 076 // lets force the lazy construction 077 // as we sometimes need to create these early when 078 // wiring together with a server side channel 079 getSendChannel(); 080 getReceiveChannel(); 081 082 thread = new Thread(this, "VM Transport: " + getNextThreadId()); 083 if (isServerSide()) { 084 thread.setDaemon(true); 085 } 086 thread.start(); 087 } 088 } 089 } 090 091 public void stop() { 092 if (closed.commit(false, true)) { 093 super.stop(); 094 try { 095 // to close the channel, lets send a null 096 if (sendChannel != null) { 097 sendChannel.put(TERMINATE); 098 } 099 if (receiveChannel != null) { 100 receiveChannel.put(TERMINATE); 101 } 102 103 if (thread != null) { 104 // lets wait for the receive thread to terminate 105 thread.join(); 106 } 107 } 108 catch (Exception e) { 109 log.trace(toString() + " now closed with exception: " + e); 110 } 111 } 112 } 113 114 public void forceDisconnect() { 115 throw new IllegalStateException("Disconnection not applicable for VM transport"); 116 } 117 118 /** 119 * Asynchronously send a Packet 120 * 121 * @param packet 122 * @throws JMSException 123 */ 124 public void asyncSend(Packet packet) throws JMSException { 125 if (sendChannel != null) { 126 while (true) { 127 try { 128 sendChannel.put(packet); 129 break; 130 } 131 catch (InterruptedException e) { 132 // continue 133 } 134 } 135 } 136 else { 137 if (sendListener == null) { 138 if (clientSide != null) { 139 sendListener = clientSide.createPacketListenerSender(); 140 } 141 } 142 if (sendListener != null) { 143 sendListener.consume(packet); 144 } 145 else { 146 throw new JMSException("No sendListener available"); 147 } 148 } 149 } 150 151 152 public boolean isMulticast() { 153 return false; 154 } 155 156 /** 157 * reads packets from a Socket 158 */ 159 public void run() { 160 while (!closed.get()) { 161 try { 162 Object answer = receiveChannel.take(); 163 if (answer == TERMINATE) { 164 log.trace("The socket peer is now closed"); 165 stop(); 166 return; 167 } 168 else if (answer != null) { 169 Packet packet = (Packet) answer; 170 // we might have just got a packet in but we've already shut down 171 if (closed.get()) { 172 break; 173 } 174 doConsumePacket(packet); 175 } 176 } 177 catch (InterruptedException e) { 178 // continue 179 } 180 } 181 } 182 183 /** 184 * pretty print for object 185 * 186 * @return String representation of this object 187 */ 188 public String toString() { 189 return "VmTransportChannel: " + sendChannel; 190 } 191 192 /** 193 * Connects the client side transport channel with the broker 194 */ 195 public void connect(BrokerConnector brokerConnector) throws JMSException { 196 TransportChannelListener listener = (TransportChannelListener) brokerConnector; 197 VmTransportChannel serverSide = createServerSide(); 198 listener.addClient(serverSide); 199 serverSide.start(); 200 } 201 202 /** 203 * Creates the server side version of this client side channel. On the server side 204 * the client's side sendChannel is the receiveChannel and vice versa 205 * 206 * @return 207 */ 208 public VmTransportChannel createServerSide() throws JMSException { 209 VmTransportChannel channel = new VmTransportChannel(getReceiveChannel(), getSendChannel()); 210 channel.clientSide = this; 211 return channel; 212 } 213 214 public void setPacketListener(PacketListener listener) { 215 super.setPacketListener(listener); 216 if (clientSide != null) { 217 clientSide.sendListener = listener; 218 219 } 220 } 221 222 /** 223 * Can this wireformat process packets of this version 224 * @param version the version number to test 225 * @return true if can accept the version 226 */ 227 public boolean canProcessWireFormatVersion(int version){ 228 return true; 229 } 230 231 /** 232 * Does the transport support wire format version info 233 * @return 234 */ 235 public boolean doesSupportWireFormatVersioning(){ 236 return false; 237 } 238 239 /** 240 * @return the current version of this wire format 241 */ 242 public int getCurrentWireFormatVersion(){ 243 return -1; 244 } 245 246 /** 247 * some transports/wire formats will implement their own fragementation 248 * @return true unless a transport/wire format supports it's own fragmentation 249 */ 250 public boolean doesSupportMessageFragmentation(){ 251 return false; 252 } 253 254 255 /** 256 * Some transports/wireformats will not be able to understand compressed messages 257 * @return true unless a transport/wire format cannot understand compression 258 */ 259 public boolean doesSupportMessageCompression(){ 260 return false; 261 } 262 263 // Properties 264 //------------------------------------------------------------------------- 265 public int getReceiveCapacity() { 266 return receiveCapacity; 267 } 268 269 public void setReceiveCapacity(int receiveCapacity) { 270 this.receiveCapacity = receiveCapacity; 271 } 272 273 public int getSendCapacity() { 274 return sendCapacity; 275 } 276 277 public void setSendCapacity(int sendCapacity) { 278 this.sendCapacity = sendCapacity; 279 } 280 281 public boolean isAsyncSend() { 282 return asyncSend; 283 } 284 285 public void setAsyncSend(boolean asyncSend) { 286 this.asyncSend = asyncSend; 287 } 288 289 public Channel getSendChannel() { 290 if (isAsyncSend()) { 291 if (sendChannel == null) { 292 sendChannel = createChannel(getSendCapacity()); 293 } 294 } 295 return sendChannel; 296 } 297 298 public void setSendChannel(Channel sendChannel) { 299 this.sendChannel = sendChannel; 300 } 301 302 public Channel getReceiveChannel() { 303 if (isAsyncSend()) { 304 if (receiveChannel == null) { 305 receiveChannel = createChannel(getReceiveCapacity()); 306 } 307 } 308 return receiveChannel; 309 } 310 311 public void setReceiveChannel(Channel receiveChannel) { 312 this.receiveChannel = receiveChannel; 313 } 314 315 // Implementation methods 316 //------------------------------------------------------------------------- 317 protected static synchronized int getNextThreadId() { 318 return lastThreadId++; 319 } 320 321 protected Channel createChannel(int capacity) { 322 return new BoundedLinkedQueue(capacity); 323 } 324 325 /** 326 * Creates a sender PacketListener which handles any receipts then delegates 327 * to the ultimate PacketListener (typically the JMS client) 328 * 329 * @return 330 */ 331 protected PacketListener createPacketListenerSender() { 332 return new PacketListener() { 333 public void consume(Packet packet) { 334 doConsumePacket(packet, getPacketListener()); 335 } 336 }; 337 } 338 339 protected void doClose(Exception ex) { 340 if (!closed.get()) { 341 JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage()); 342 jmsEx.setLinkedException(ex); 343 onAsyncException(jmsEx); 344 stop(); 345 } 346 } 347 348 public PacketListener getSendListener() { 349 return sendListener; 350 } 351 352 public void setSendListener(PacketListener sendListener) { 353 this.sendListener = sendListener; 354 } 355 }