001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service; 020 import javax.jms.JMSException; 021 import javax.jms.DeliveryMode; 022 import org.apache.commons.logging.*; 023 import org.activemq.broker.BrokerContainer; 024 import org.activemq.broker.Broker; 025 import org.activemq.message.ActiveMQDestination; 026 import org.activemq.message.ActiveMQMessage; 027 import org.activemq.message.ActiveMQQueue; 028 import org.activemq.store.PersistenceAdapter; 029 import org.activemq.util.IdGenerator; 030 031 /** 032 * Determines how messages are stored in a dead letter queue 033 * 034 * @version $Revision: 1.1.1.1 $ 035 */ 036 public class DeadLetterPolicy { 037 /** 038 * Prefix used by dead letter queues 039 */ 040 public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter."; 041 private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ"; 042 private static final Log log = LogFactory.getLog(DeadLetterPolicy.class); 043 private Broker broker; 044 private String deadLetterPrefix = DEAD_LETTER_PREFIX; 045 private String deadLetterName = DEFAULT_DEAD_LETTER_NAME; 046 private boolean deadLetterEnabled = true; 047 private boolean deadLetterPerDestinationName = true; 048 private boolean storeNonPersistentMessages = true; 049 private boolean noTopicConsumerEnabled = true; 050 private boolean allowDuplicates = false; 051 private boolean useDatabaseLocking = false; 052 private long deadLetterQueueTTL = 0L; 053 private long deadLetterTopicTTL = 0L; 054 private IdGenerator idGenerator = new IdGenerator(); 055 056 /** 057 * Construct a dead letter policy 058 * 059 * @param broker 060 */ 061 public DeadLetterPolicy(Broker broker) { 062 this.broker = broker; 063 } 064 065 public DeadLetterPolicy(BrokerContainer brokerContainer) { 066 this(brokerContainer.getBroker()); 067 } 068 069 /** 070 * Default constructor 071 */ 072 public DeadLetterPolicy() { 073 } 074 075 /** 076 * @return Returns the broker. 077 */ 078 public Broker getBroker() { 079 return broker; 080 } 081 082 /** 083 * @param broker The broker to set. 084 */ 085 public void setBroker(Broker broker) { 086 this.broker = broker; 087 } 088 089 /** 090 * @return Returns the deadLetterEnabled. 091 */ 092 public boolean isDeadLetterEnabled() { 093 return deadLetterEnabled; 094 } 095 096 /** 097 * @param deadLetterEnabled The deadLetterEnabled to set. 098 */ 099 public void setDeadLetterEnabled(boolean deadLetterEnabled) { 100 this.deadLetterEnabled = deadLetterEnabled; 101 } 102 103 /** 104 * @return Returns the deadLetterPerDestinationName. 105 */ 106 public boolean isDeadLetterPerDestinationName() { 107 return deadLetterPerDestinationName; 108 } 109 110 /** 111 * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set. 112 */ 113 public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) { 114 this.deadLetterPerDestinationName = deadLetterPerDestinationName; 115 } 116 117 /** 118 * @return Returns the deadLetterName. 119 */ 120 public String getDeadLetterName() { 121 return deadLetterName; 122 } 123 124 /** 125 * @param deadLetterName The deadLetterName to set. 126 */ 127 public void setDeadLetterName(String deadLetterName) { 128 this.deadLetterName = deadLetterName; 129 } 130 131 /** 132 * @return Returns the deadLetterPrefix. 133 */ 134 public String getDeadLetterPrefix() { 135 return deadLetterPrefix; 136 } 137 138 /** 139 * @param deadLetterPrefix The deadLetterPrefix to set. 140 */ 141 public void setDeadLetterPrefix(String deadLetterPrefix) { 142 this.deadLetterPrefix = deadLetterPrefix; 143 } 144 145 /** 146 * @return Returns the storeNonPersistentMessages. 147 */ 148 public boolean isStoreNonPersistentMessages() { 149 return storeNonPersistentMessages; 150 } 151 152 /** 153 * @param storeNonPersistentMessages The storeNonPersistentMessages to set. 154 */ 155 public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) { 156 this.storeNonPersistentMessages = storeNonPersistentMessages; 157 } 158 159 /** 160 * @return Returns the noTopicConsumerEnabled. 161 */ 162 public boolean isNoTopicConsumerEnabled() { 163 return noTopicConsumerEnabled; 164 } 165 /** 166 * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set. 167 */ 168 public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) { 169 this.noTopicConsumerEnabled = noTopicConsumerEnabled; 170 } 171 172 /** 173 * @return Returns the allowDuplicates. 174 */ 175 public boolean isAllowDuplicates() { 176 return allowDuplicates; 177 } 178 /** 179 * @param allowDuplicates The allowDuplicates to set. 180 */ 181 public void setAllowDuplicates(boolean allowDuplicates) { 182 this.allowDuplicates = allowDuplicates; 183 } 184 /** 185 * @return Returns the useDatabaseLocking. 186 */ 187 public boolean isUseDatabaseLocking() { 188 return useDatabaseLocking; 189 } 190 /** 191 * @param useDatabaseLocking The useDatabaseLocking to set. 192 */ 193 public void setUseDatabaseLocking(boolean useDatabaseLocking) { 194 this.useDatabaseLocking = useDatabaseLocking; 195 } 196 /** 197 * @param deadLetterQueueTTL The deadLetterQueueTTL to set. 198 */ 199 public void setDeadLetterQueueTTL(long deadLetterQueueTTL) { 200 this.deadLetterQueueTTL = deadLetterQueueTTL; 201 } 202 /** 203 * @param deadLetterTopicTTL The deadLetterTopicTTL to set. 204 */ 205 public void setDeadLetterTopicTTL(long deadLetterTopicTTL) { 206 this.deadLetterTopicTTL = deadLetterTopicTTL; 207 } 208 /** 209 * Get the name of the DLQ from the destination provided 210 * @param destination 211 * @return the name of the DLQ for this Destination 212 */ 213 public String getDeadLetterNameFromDestination(ActiveMQDestination destination){ 214 String answer = this.deadLetterPrefix; 215 if (deadLetterPerDestinationName) { 216 answer += destination.getPhysicalName(); 217 } 218 else { 219 answer += this.deadLetterName; 220 } 221 return answer; 222 } 223 224 /** 225 * Send a message to a dead letter queue 226 * 227 * @param message 228 * @throws JMSException 229 */ 230 public void sendToDeadLetter(ActiveMQMessage message) { 231 if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages) && !message.isDispatchedFromDLQ()) { 232 if (broker != null) { 233 // process duplicates 234 if (!isAllowDuplicates()) { 235 PersistenceAdapter persistenceAdapter = getBroker().getPersistenceAdapter(); 236 // make sure no previous dead letter was already sent 237 if (persistenceAdapter!=null 238 && message.getJMSMessageIdentity()!=null 239 && message.getJMSMessageIdentity().getSequenceNumber()!=null 240 && persistenceAdapter.deadLetterAlreadySent(((Long)message.getJMSMessageIdentity().getSequenceNumber()).longValue(), isUseDatabaseLocking())) { 241 if (log.isDebugEnabled()) log.debug("Dead letter has been already sent for this message: " + message.getJMSMessageID()); 242 return; 243 } 244 } 245 246 // send a dead letter message 247 String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination()); 248 try { 249 ActiveMQMessage deadMessage = createDeadLetterMessage(dlqName, message); 250 broker.sendToDeadLetterQueue(dlqName, deadMessage); 251 if (log.isDebugEnabled()) log.debug("Passed message: " + deadMessage + " to DLQ: " + dlqName); 252 } catch (JMSException e) { 253 log.warn("Failed to send message to dead letter due to: " + e, e); 254 } 255 } 256 else { 257 log.warn("Broker is not initialized - cannot add to DLQ: " + message); 258 } 259 }else if (log.isDebugEnabled()){ 260 log.debug("DLQ not storing message: " + message); 261 } 262 } 263 264 protected ActiveMQMessage createDeadLetterMessage(String dlqName, ActiveMQMessage message) throws JMSException { 265 // make a shallow copy of the orginal message 266 ActiveMQMessage deadMessage = message.shallowCopy(); 267 268 // generate a new producer and message ID 269 String id = idGenerator.generateId(); 270 String producerKey = IdGenerator.getSeedFromId(id); 271 long seq = IdGenerator.getCountFromId(id); 272 deadMessage.setProducerKey(producerKey); 273 deadMessage.setJMSMessageID(id); 274 deadMessage.setSequenceNumber(seq); 275 deadMessage.getJMSMessageIdentity().setMessageID(id); 276 deadMessage.getJMSMessageIdentity().setSequenceNumber(new Long(seq)); 277 278 ActiveMQQueue destination = new ActiveMQQueue(dlqName); 279 deadMessage.setJMSDestination(destination); 280 deadMessage.setDispatchedFromDLQ(true); 281 282 // set the expiration of the dead letter message 283 long expiration = 0L; 284 long timeStamp = System.currentTimeMillis(); 285 if (message.getJMSActiveMQDestination().isTopic()) { 286 if (deadLetterTopicTTL > 0) { 287 expiration = deadLetterTopicTTL + timeStamp; 288 } 289 } else { 290 if (deadLetterQueueTTL > 0) { 291 expiration = deadLetterQueueTTL + timeStamp; 292 } 293 } 294 deadMessage.setJMSExpiration(expiration); 295 deadMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT); 296 297 return deadMessage; 298 } 299 }