001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq; 019 020 import javax.jms.DeliveryMode; 021 import javax.jms.Destination; 022 import javax.jms.IllegalStateException; 023 import javax.jms.InvalidDestinationException; 024 import javax.jms.JMSException; 025 import javax.jms.Message; 026 import javax.jms.MessageFormatException; 027 import javax.jms.MessageProducer; 028 029 import org.activemq.management.JMSProducerStatsImpl; 030 import org.activemq.management.StatsCapable; 031 import org.activemq.management.StatsImpl; 032 import org.activemq.message.ActiveMQDestination; 033 import org.activemq.util.IdGenerator; 034 035 /** 036 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a 037 * destination. A <CODE>MessageProducer</CODE> object is created by passing a 038 * <CODE>Destination</CODE> object to a message-producer creation method 039 * supplied by a session. 040 * <P> 041 * <CODE>MessageProducer</CODE> is the parent interface for all message 042 * producers. 043 * <P> 044 * A client also has the option of creating a message producer without 045 * supplying a destination. In this case, a destination must be provided with 046 * every send operation. A typical use for this kind of message producer is to 047 * send replies to requests using the request's <CODE>JMSReplyTo</CODE> 048 * destination. 049 * <P> 050 * A client can specify a default delivery mode, priority, and time to live for 051 * messages sent by a message producer. It can also specify the delivery mode, 052 * priority, and time to live for an individual message. 053 * <P> 054 * A client can specify a time-to-live value in milliseconds for each message 055 * it sends. This value defines a message expiration time that is the sum of 056 * the message's time-to-live and the GMT when it is sent (for transacted 057 * sends, this is the time the client sends the message, not the time the 058 * transaction is committed). 059 * <P> 060 * A JMS provider should do its best to expire messages accurately; however, 061 * the JMS API does not define the accuracy provided. 062 * 063 * @version $Revision: 1.1.1.1 $ 064 * @see javax.jms.TopicPublisher 065 * @see javax.jms.QueueSender 066 * @see javax.jms.Session#createProducer 067 */ 068 public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, Closeable { 069 protected ActiveMQSession session; 070 private IdGenerator idGenerator; 071 protected boolean closed; 072 private boolean disableMessageID; 073 private boolean disableMessageTimestamp; 074 private int defaultDeliveryMode; 075 private int defaultPriority; 076 private long defaultTimeToLive; 077 protected ActiveMQDestination defaultDestination; 078 private long startTime; 079 private JMSProducerStatsImpl stats; 080 private short producerId; 081 082 private boolean reuseMessageId; //hint to reuse the same messageId - if already set 083 084 protected ActiveMQMessageProducer(ActiveMQSession theSession, ActiveMQDestination destination) throws JMSException { 085 this.session = theSession; 086 this.defaultDestination = destination; 087 this.idGenerator = new IdGenerator(); 088 this.disableMessageID = false; 089 this.disableMessageTimestamp = theSession.connection.isDisableTimeStampsByDefault(); 090 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; 091 this.defaultPriority = Message.DEFAULT_PRIORITY; 092 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; 093 this.startTime = System.currentTimeMillis(); 094 this.session.addProducer(this); 095 this.stats = new JMSProducerStatsImpl(theSession.getSessionStats(), destination); 096 } 097 098 public StatsImpl getStats() { 099 return stats; 100 } 101 102 public JMSProducerStatsImpl getProducerStats() { 103 return stats; 104 } 105 106 /** 107 * Sets whether message IDs are disabled. 108 * <P> 109 * Since message IDs take some effort to create and increase a message's 110 * size, some JMS providers may be able to optimize message overhead if 111 * they are given a hint that the message ID is not used by an application. 112 * By calling the <CODE>setDisableMessageID</CODE> method on this message 113 * producer, a JMS client enables this potential optimization for all 114 * messages sent by this message producer. If the JMS provider accepts this 115 * hint, these messages must have the message ID set to null; if the 116 * provider ignores the hint, the message ID must be set to its normal 117 * unique value. 118 * <P> 119 * Message IDs are enabled by default. 120 * 121 * @param value indicates if message IDs are disabled 122 * @throws JMSException if the JMS provider fails to close the producer due to 123 * some internal error. 124 */ 125 public void setDisableMessageID(boolean value) throws JMSException { 126 checkClosed(); 127 this.disableMessageID = value; 128 } 129 130 /** 131 * Gets an indication of whether message IDs are disabled. 132 * 133 * @return an indication of whether message IDs are disabled 134 * @throws JMSException if the JMS provider fails to determine if message IDs are 135 * disabled due to some internal error. 136 */ 137 public boolean getDisableMessageID() throws JMSException { 138 checkClosed(); 139 return this.disableMessageID; 140 } 141 142 /** 143 * Sets whether message timestamps are disabled. 144 * <P> 145 * Since timestamps take some effort to create and increase a message's 146 * size, some JMS providers may be able to optimize message overhead if 147 * they are given a hint that the timestamp is not used by an application. 148 * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this 149 * message producer, a JMS client enables this potential optimization for 150 * all messages sent by this message producer. If the JMS provider accepts 151 * this hint, these messages must have the timestamp set to zero; if the 152 * provider ignores the hint, the timestamp must be set to its normal 153 * value. 154 * <P> 155 * Message timestamps are enabled by default. 156 * 157 * @param value indicates if message timestamps are disabled 158 * @throws JMSException if the JMS provider fails to close the producer due to 159 * some internal error. 160 */ 161 public void setDisableMessageTimestamp(boolean value) throws JMSException { 162 checkClosed(); 163 this.disableMessageTimestamp = value; 164 } 165 166 /** 167 * Gets an indication of whether message timestamps are disabled. 168 * 169 * @return an indication of whether message timestamps are disabled 170 * @throws JMSException if the JMS provider fails to close the producer due to 171 * some internal error. 172 */ 173 public boolean getDisableMessageTimestamp() throws JMSException { 174 checkClosed(); 175 return this.disableMessageTimestamp; 176 } 177 178 /** 179 * Sets the producer's default delivery mode. 180 * <P> 181 * Delivery mode is set to <CODE>PERSISTENT</CODE> by default. 182 * 183 * @param newDeliveryMode the message delivery mode for this message producer; legal 184 * values are <code>DeliveryMode.NON_PERSISTENT</code> and 185 * <code>DeliveryMode.PERSISTENT</code> 186 * @throws JMSException if the JMS provider fails to set the delivery mode due to 187 * some internal error. 188 * @see javax.jms.MessageProducer#getDeliveryMode 189 * @see javax.jms.DeliveryMode#NON_PERSISTENT 190 * @see javax.jms.DeliveryMode#PERSISTENT 191 * @see javax.jms.Message#DEFAULT_DELIVERY_MODE 192 */ 193 public void setDeliveryMode(int newDeliveryMode) throws JMSException { 194 if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) { 195 throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode); 196 } 197 checkClosed(); 198 this.defaultDeliveryMode = newDeliveryMode; 199 } 200 201 /** 202 * Gets the producer's default delivery mode. 203 * 204 * @return the message delivery mode for this message producer 205 * @throws JMSException if the JMS provider fails to close the producer due to 206 * some internal error. 207 */ 208 public int getDeliveryMode() throws JMSException { 209 checkClosed(); 210 return this.defaultDeliveryMode; 211 } 212 213 /** 214 * Sets the producer's default priority. 215 * <P> 216 * The JMS API defines ten levels of priority value, with 0 as the lowest 217 * priority and 9 as the highest. Clients should consider priorities 0-4 as 218 * gradations of normal priority and priorities 5-9 as gradations of 219 * expedited priority. Priority is set to 4 by default. 220 * 221 * @param newDefaultPriority the message priority for this message producer; must be a 222 * value between 0 and 9 223 * @throws JMSException if the JMS provider fails to set the delivery mode due to 224 * some internal error. 225 * @see javax.jms.MessageProducer#getPriority 226 * @see javax.jms.Message#DEFAULT_PRIORITY 227 */ 228 public void setPriority(int newDefaultPriority) throws JMSException { 229 if (newDefaultPriority < 0 || newDefaultPriority > 9) { 230 throw new IllegalStateException("default priority must be a value between 0 and 9"); 231 } 232 checkClosed(); 233 this.defaultPriority = newDefaultPriority; 234 } 235 236 /** 237 * Gets the producer's default priority. 238 * 239 * @return the message priority for this message producer 240 * @throws JMSException if the JMS provider fails to close the producer due to 241 * some internal error. 242 * @see javax.jms.MessageProducer#setPriority 243 */ 244 public int getPriority() throws JMSException { 245 checkClosed(); 246 return this.defaultPriority; 247 } 248 249 /** 250 * Sets the default length of time in milliseconds from its dispatch time 251 * that a produced message should be retained by the message system. 252 * <P> 253 * Time to live is set to zero by default. 254 * 255 * @param timeToLive the message time to live in milliseconds; zero is unlimited 256 * @throws JMSException if the JMS provider fails to set the time to live due to 257 * some internal error. 258 * @see javax.jms.MessageProducer#getTimeToLive 259 * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE 260 */ 261 public void setTimeToLive(long timeToLive) throws JMSException { 262 if (timeToLive < 0l) { 263 throw new IllegalStateException("cannot set a negative timeToLive"); 264 } 265 checkClosed(); 266 this.defaultTimeToLive = timeToLive; 267 } 268 269 /** 270 * Gets the default length of time in milliseconds from its dispatch time 271 * that a produced message should be retained by the message system. 272 * 273 * @return the message time to live in milliseconds; zero is unlimited 274 * @throws JMSException if the JMS provider fails to get the time to live due to 275 * some internal error. 276 * @see javax.jms.MessageProducer#setTimeToLive 277 */ 278 public long getTimeToLive() throws JMSException { 279 checkClosed(); 280 return this.defaultTimeToLive; 281 } 282 283 /** 284 * Gets the destination associated with this <CODE>MessageProducer</CODE>. 285 * 286 * @return this producer's <CODE>Destination/ <CODE> 287 * @throws JMSException if the JMS provider fails to close the producer due to 288 * some internal error. 289 * @since 1.1 290 */ 291 public Destination getDestination() throws JMSException { 292 checkClosed(); 293 return this.defaultDestination; 294 } 295 296 /** 297 * Closes the message producer. 298 * <P> 299 * Since a provider may allocate some resources on behalf of a <CODE> 300 * MessageProducer</CODE> outside the Java virtual machine, clients should 301 * close them when they are not needed. Relying on garbage collection to 302 * eventually reclaim these resources may not be timely enough. 303 * 304 * @throws JMSException if the JMS provider fails to close the producer due to 305 * some internal error. 306 */ 307 public void close() throws JMSException { 308 this.session.removeProducer(this); 309 closed = true; 310 } 311 312 protected void checkClosed() throws IllegalStateException { 313 if (closed) { 314 throw new IllegalStateException("The producer is closed"); 315 } 316 } 317 318 /** 319 * Sends a message using the <CODE>MessageProducer</CODE>'s default 320 * delivery mode, priority, and time to live. 321 * 322 * @param message the message to send 323 * @throws JMSException if the JMS provider fails to send the message due to some 324 * internal error. 325 * @throws MessageFormatException if an invalid message is specified. 326 * @throws InvalidDestinationException if a client uses this method with a <CODE> 327 * MessageProducer</CODE> with an invalid destination. 328 * @throws java.lang.UnsupportedOperationException 329 * if a client uses this method with a <CODE> 330 * MessageProducer</CODE> that did not specify a 331 * destination at creation time. 332 * @see javax.jms.Session#createProducer 333 * @see javax.jms.MessageProducer 334 * @since 1.1 335 */ 336 public void send(Message message) throws JMSException { 337 this.send(this.defaultDestination, message, this.defaultDeliveryMode, this.defaultPriority, 338 this.defaultTimeToLive); 339 } 340 341 /** 342 * Sends a message to the destination, specifying delivery mode, priority, 343 * and time to live. 344 * 345 * @param message the message to send 346 * @param deliveryMode the delivery mode to use 347 * @param priority the priority for this message 348 * @param timeToLive the message's lifetime (in milliseconds) 349 * @throws JMSException if the JMS provider fails to send the message due to some 350 * internal error. 351 * @throws MessageFormatException if an invalid message is specified. 352 * @throws InvalidDestinationException if a client uses this method with a <CODE> 353 * MessageProducer</CODE> with an invalid destination. 354 * @throws java.lang.UnsupportedOperationException 355 * if a client uses this method with a <CODE> 356 * MessageProducer</CODE> that did not specify a 357 * destination at creation time. 358 * @see javax.jms.Session#createProducer 359 * @since 1.1 360 */ 361 public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { 362 this.send(this.defaultDestination, message, deliveryMode, priority, timeToLive); 363 } 364 365 /** 366 * Sends a message to a destination for an unidentified message producer. 367 * Uses the <CODE>MessageProducer</CODE>'s default delivery mode, 368 * priority, and time to live. 369 * <P> 370 * Typically, a message producer is assigned a destination at creation 371 * time; however, the JMS API also supports unidentified message producers, 372 * which require that the destination be supplied every time a message is 373 * sent. 374 * 375 * @param destination the destination to send this message to 376 * @param message the message to send 377 * @throws JMSException if the JMS provider fails to send the message due to some 378 * internal error. 379 * @throws MessageFormatException if an invalid message is specified. 380 * @throws InvalidDestinationException if a client uses this method with an invalid destination. 381 * @throws java.lang.UnsupportedOperationException 382 * if a client uses this method with a <CODE> 383 * MessageProducer</CODE> that specified a destination at 384 * creation time. 385 * @see javax.jms.Session#createProducer 386 * @see javax.jms.MessageProducer 387 * @since 1.1 388 */ 389 public void send(Destination destination, Message message) throws JMSException { 390 this.send(destination, message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive); 391 } 392 393 /** 394 * Sends a message to a destination for an unidentified message producer, 395 * specifying delivery mode, priority and time to live. 396 * <P> 397 * Typically, a message producer is assigned a destination at creation 398 * time; however, the JMS API also supports unidentified message producers, 399 * which require that the destination be supplied every time a message is 400 * sent. 401 * 402 * @param destination the destination to send this message to 403 * @param message the message to send 404 * @param deliveryMode the delivery mode to use 405 * @param priority the priority for this message 406 * @param timeToLive the message's lifetime (in milliseconds) 407 * @throws JMSException if the JMS provider fails to send the message due to some 408 * internal error. 409 * @throws MessageFormatException if an invalid message is specified. 410 * @throws InvalidDestinationException if a client uses this method with an invalid destination. 411 * @see javax.jms.Session#createProducer 412 * @since 1.1 413 */ 414 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) 415 throws JMSException { 416 checkClosed(); 417 if (destination == null) { 418 throw new UnsupportedOperationException("Dont understand null destinations"); 419 } 420 if( this.defaultDestination!=null && destination!=this.defaultDestination && session.connection.isJ2EEcompliant()) { 421 throw new UnsupportedOperationException("This producer can only send messages to: "+defaultDestination.getPhysicalName()); 422 } 423 if (this.defaultDestination==null){ 424 this.session.connection.startAdvisoryForTempDestination(destination); 425 } 426 this.session.send(this, destination, message, deliveryMode, priority, timeToLive,reuseMessageId); 427 stats.onMessage(message); 428 } 429 430 /** 431 * @return Returns the reuseMessageId. 432 */ 433 public boolean isResuseMessageId() { 434 return reuseMessageId; 435 } 436 /** 437 * @param reuseMessageId The resuseMessageId to set. 438 */ 439 public void setReuseMessageId(boolean reuseMessageId) { 440 this.reuseMessageId = reuseMessageId; 441 } 442 443 444 /** 445 * @return Returns the producerId. 446 */ 447 protected short getProducerId() { 448 return producerId; 449 } 450 451 /** 452 * @param producerId The producerId to set. 453 */ 454 public void setProducerId(short producerId) { 455 this.producerId = producerId; 456 } 457 458 459 protected long getStartTime() { 460 return this.startTime; 461 } 462 463 protected IdGenerator getIdGenerator() { 464 return this.idGenerator; 465 } 466 467 protected long getNextSequenceNumber(){ 468 return idGenerator.getNextSequence(); 469 } 470 471 protected String getProducerMessageKey(){ 472 return idGenerator.getSeed(); 473 } 474 }