001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.message; 020 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.ArrayList; 024 import java.util.StringTokenizer; 025 026 import org.activemq.util.BitArray; 027 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; 028 029 /** 030 * Abstract class for a transportable Packet 031 * 032 * @version $Revision: 1.1.1.1 $ 033 */ 034 public abstract class AbstractPacket implements Packet { 035 036 /** 037 * Message flag indexes (used for writing/reading to/from a Stream 038 */ 039 public static final int RECEIPT_REQUIRED_INDEX = 0; 040 public static final int BROKERS_VISITED_INDEX =1; 041 private short id = 0; 042 protected BitArray bitArray; 043 protected transient int cachedHashCode = -1; 044 private boolean receiptRequired; 045 private transient int memoryUsage = 2048; 046 private transient int memoryUsageReferenceCount; 047 048 private CopyOnWriteArraySet brokersVisited; 049 050 protected AbstractPacket(){ 051 this.bitArray = new BitArray(); 052 } 053 054 /** 055 * @return the unique id for this Packet 056 */ 057 public short getId() { 058 return this.id; 059 } 060 061 /** 062 * Set the unique id for this Packet 063 * 064 * @param newId 065 */ 066 public void setId(short newId) { 067 this.id = newId; 068 } 069 070 /** 071 * @return true if a Recipt is required 072 */ 073 public boolean isReceiptRequired() { 074 return this.receiptRequired; 075 } 076 077 /** 078 * @return false since most packets are not receipt packets 079 */ 080 public boolean isReceipt() { 081 return false; 082 } 083 084 /** 085 * Set if a Recipt if required on receiving this Packet 086 * 087 * @param value 088 */ 089 public void setReceiptRequired(boolean value) { 090 this.receiptRequired = value; 091 } 092 093 /** 094 * Retrieve if a JMS Message type or not 095 * 096 * @return true if it is a JMS Message 097 */ 098 public boolean isJMSMessage() { 099 return false; 100 } 101 102 /** 103 * Tests equality with another instance 104 * 105 * @param obj - the other instance to test equality with 106 * @return Returns true if the objects are equilvant 107 */ 108 public boolean equals(Object obj) { 109 boolean result = this == obj; 110 if (!result && obj != null && obj instanceof AbstractPacket) { 111 AbstractPacket other = (AbstractPacket) obj; 112 result = other.id == this.id; 113 } 114 return result; 115 } 116 117 /** 118 * @return Returns hash code for this instance 119 */ 120 public int hashCode() { 121 return this.id; 122 } 123 124 /** 125 * Get a hint about how much memory this Packet is consuming 126 * 127 * @return an aproximation of the current memory used by this instance 128 */ 129 public int getMemoryUsage() { 130 return memoryUsage; 131 } 132 133 /** 134 * Set a hint about how mujch memory this packet is consuming 135 * 136 * @param newMemoryUsage 137 */ 138 public void setMemoryUsage(int newMemoryUsage) { 139 this.memoryUsage = newMemoryUsage; 140 } 141 142 /** 143 * Increment reference count for bounded memory collections 144 * 145 * @return the incremented reference value 146 * @see org.activemq.io.util.MemoryBoundedQueue 147 */ 148 public synchronized int incrementMemoryReferenceCount() { 149 return ++memoryUsageReferenceCount; 150 } 151 152 /** 153 * Decrement reference count for bounded memory collections 154 * 155 * @return the decremented reference value 156 * @see org.activemq.io.util.MemoryBoundedQueue 157 */ 158 public synchronized int decrementMemoryReferenceCount() { 159 return --memoryUsageReferenceCount; 160 } 161 162 /** 163 * @return the current reference count for bounded memory collections 164 * @see org.activemq.io.util.MemoryBoundedQueue 165 */ 166 public synchronized int getMemoryUsageReferenceCount() { 167 return memoryUsageReferenceCount; 168 } 169 170 /** 171 * As the packet passes through the broker add the broker to the visited list 172 * 173 * @param brokerName the name of the broker 174 */ 175 public void addBrokerVisited(String brokerName) { 176 if (brokerName == null || brokerName.trim().equals("")) { 177 throw new IllegalArgumentException("Broker name cannot be empty or null"); 178 } 179 initializeBrokersVisited(); 180 brokersVisited.add(brokerName); 181 } 182 183 /** 184 * clear list of brokers visited 185 */ 186 public void clearBrokersVisited(){ 187 brokersVisited = null; 188 } 189 190 /** 191 * test to see if the named broker has already seen this packet 192 * 193 * @param brokerName the name of the broker 194 * @return true if the packet has visited the broker 195 */ 196 public boolean hasVisited(String brokerName) { 197 if (brokersVisited == null){ 198 return false; 199 } 200 return brokersVisited.contains(brokerName); 201 } 202 203 /** 204 * @return Returns the brokersVisited. 205 */ 206 public String getBrokersVisitedAsString() { 207 String result = ""; 208 if (brokersVisited != null && !brokersVisited.isEmpty()){ 209 for (Iterator i = brokersVisited.iterator(); i.hasNext();){ 210 result += i.next().toString() + ","; 211 } 212 } 213 return result; 214 } 215 216 public void setBrokersVisitedAsString(String value) { 217 initializeBrokersVisited(); 218 StringTokenizer enm = new StringTokenizer(value, ","); 219 while (enm.hasMoreElements()) { 220 brokersVisited.add(enm.nextToken()); 221 } 222 } 223 224 /** 225 * @return pretty print of this Packet 226 */ 227 public String toString() { 228 return getPacketTypeAsString(getPacketType()) + ": id = " + getId(); 229 } 230 231 232 public static String getPacketTypeAsString(int type) { 233 String packetTypeStr = ""; 234 switch (type) { 235 case ACTIVEMQ_MESSAGE: 236 packetTypeStr = "ACTIVEMQ_MESSAGE"; 237 break; 238 case ACTIVEMQ_TEXT_MESSAGE: 239 packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE"; 240 break; 241 case ACTIVEMQ_OBJECT_MESSAGE: 242 packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE"; 243 break; 244 case ACTIVEMQ_BYTES_MESSAGE: 245 packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE"; 246 break; 247 case ACTIVEMQ_STREAM_MESSAGE: 248 packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE"; 249 break; 250 case ACTIVEMQ_MAP_MESSAGE: 251 packetTypeStr = "ACTIVEMQ_MAP_MESSAGE"; 252 break; 253 case ACTIVEMQ_MSG_ACK: 254 packetTypeStr = "ACTIVEMQ_MSG_ACK"; 255 break; 256 case RECEIPT_INFO: 257 packetTypeStr = "RECEIPT_INFO"; 258 break; 259 case CONSUMER_INFO: 260 packetTypeStr = "CONSUMER_INFO"; 261 break; 262 case PRODUCER_INFO: 263 packetTypeStr = "PRODUCER_INFO"; 264 break; 265 case TRANSACTION_INFO: 266 packetTypeStr = "TRANSACTION_INFO"; 267 break; 268 case XA_TRANSACTION_INFO: 269 packetTypeStr = "XA_TRANSACTION_INFO"; 270 break; 271 case ACTIVEMQ_BROKER_INFO: 272 packetTypeStr = "ACTIVEMQ_BROKER_INFO"; 273 break; 274 case ACTIVEMQ_CONNECTION_INFO: 275 packetTypeStr = "ACTIVEMQ_CONNECTION_INFO"; 276 break; 277 case SESSION_INFO: 278 packetTypeStr = "SESSION_INFO"; 279 break; 280 case DURABLE_UNSUBSCRIBE: 281 packetTypeStr = "DURABLE_UNSUBSCRIBE"; 282 break; 283 case RESPONSE_RECEIPT_INFO: 284 packetTypeStr = "RESPONSE_RECEIPT_INFO"; 285 break; 286 case INT_RESPONSE_RECEIPT_INFO: 287 packetTypeStr = "INT_RESPONSE_RECEIPT_INFO"; 288 break; 289 case CAPACITY_INFO: 290 packetTypeStr = "CAPACITY_INFO"; 291 break; 292 case CAPACITY_INFO_REQUEST: 293 packetTypeStr = "CAPACITY_INFO_REQUEST"; 294 break; 295 case WIRE_FORMAT_INFO: 296 packetTypeStr = "WIRE_FORMAT_INFO"; 297 break; 298 case KEEP_ALIVE: 299 packetTypeStr = "KEEP_ALIVE"; 300 break; 301 case CACHED_VALUE_COMMAND: 302 packetTypeStr = "CachedValue"; 303 break; 304 default : 305 packetTypeStr = "UNKNOWN PACKET TYPE: " + type; 306 } 307 return packetTypeStr; 308 } 309 310 /** 311 * A helper method used when implementing equals() which returns true if the objects are identical or equal handling 312 * nulls properly 313 * @param left 314 * @param right 315 * 316 * @return true if the objects are the same or equal or both null 317 */ 318 protected boolean equals(Object left, Object right) { 319 return left == right || (left != null && left.equals(right)); 320 } 321 322 /** 323 * Initializes another message with current values from this instance 324 * 325 * @param other the other ActiveMQMessage to initialize 326 */ 327 protected void initializeOther(AbstractPacket other) { 328 initializeBrokersVisited(); 329 other.id = this.id; 330 other.receiptRequired = this.receiptRequired; 331 other.memoryUsage = this.memoryUsage; 332 CopyOnWriteArraySet set = this.brokersVisited; 333 if (set != null && !set.isEmpty()){ 334 other.brokersVisited = new CopyOnWriteArraySet(set); 335 } 336 } 337 338 synchronized void initializeBrokersVisited(){ 339 if (this.brokersVisited == null){ 340 this.brokersVisited = new CopyOnWriteArraySet(); 341 } 342 } 343 344 /** 345 * @return Returns the brokersVisited. 346 */ 347 public Object[] getBrokersVisited() { 348 if (brokersVisited == null || brokersVisited.isEmpty()){ 349 return null; 350 } 351 return brokersVisited.toArray(); 352 } 353 354 /** 355 * @return Returns the bitArray. 356 */ 357 public BitArray getBitArray() { 358 return bitArray; 359 } 360 /** 361 * @param bitArray The bitArray to set. 362 */ 363 public void setBitArray(BitArray bitArray) { 364 this.bitArray = bitArray; 365 } 366 }