001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.service.impl; 019 020 import java.util.Iterator; 021 import java.util.Map; 022 023 import javax.jms.Destination; 024 import javax.jms.JMSException; 025 026 import org.activemq.broker.BrokerClient; 027 import org.activemq.filter.DestinationFilter; 028 import org.activemq.message.ActiveMQDestination; 029 import org.activemq.message.ActiveMQMessage; 030 import org.activemq.message.ConsumerInfo; 031 import org.activemq.service.MessageContainerManager; 032 033 /** 034 * Implements an initial image service where on subscription 035 * the client will receive the last image that was previously cached. 036 * This is very useful in financial market data and in rapidly changing 037 * transient event models where you don't want to persist messages 038 * when you are away, but wish to cache the last image, per destination 039 * around so that when a new reliable consumer subscribes you receive the 040 * latest value you may have missed. 041 * <p/> 042 * This is especially true in finance with slow moving markets where you may 043 * have to wait a while for an update (or times when you subscribe after 044 * market close etc). 045 * 046 * @version $Revision: 1.1.1.1 $ 047 */ 048 public class InitialImageMessageContainerManager extends ProxyMessageContainerManager { 049 private Map cache; 050 private boolean topic; 051 private DestinationFilter destinationFilter; 052 053 /** 054 * Creates a topic based initial image message container manager using the given destination filter 055 * 056 * @param delegate 057 * @param cache 058 * @param destinationFilter 059 */ 060 public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter) { 061 this(delegate, cache, true, destinationFilter); 062 } 063 064 public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) { 065 super(delegate); 066 this.cache = cache; 067 this.topic = topic; 068 this.destinationFilter = destinationFilter; 069 } 070 071 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException { 072 super.addMessageConsumer(client, info); 073 074 // lookup message for destination 075 ActiveMQDestination destination = info.getDestination(); 076 if (isValid(destination)) { 077 if (destination.isWildcard()) { 078 DestinationFilter filter = DestinationFilter.parseFilter(destination); 079 sendMatchingInitialImages(client, info, filter); 080 } 081 else { 082 ActiveMQMessage message = null; 083 synchronized (cache) { 084 message = (ActiveMQMessage) cache.get(destination); 085 } 086 if (message != null) { 087 sendMessage(client, message); 088 } 089 } 090 } 091 } 092 093 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException { 094 ActiveMQDestination destination = message.getJMSActiveMQDestination(); 095 if (isValid(destination)) { 096 cache.put(destination, message); 097 } 098 super.sendMessage(client, message); 099 } 100 101 102 // Implementation methods 103 //------------------------------------------------------------------------- 104 protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter) throws JMSException { 105 synchronized (cache) { 106 for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) { 107 Map.Entry entry = (Map.Entry) iter.next(); 108 Destination destination = (Destination) entry.getKey(); 109 if (filter.matches(destination)) { 110 ActiveMQMessage message = (ActiveMQMessage) entry.getValue(); 111 sendMessage(client, message); 112 } 113 } 114 } 115 } 116 117 118 /** 119 * Does this message match the destinations on which initial image caching should be used 120 * 121 * @param destination 122 * @return true if the given destination should use initial image caching 123 * which is typically true if the message is a topic which may match 124 * an optional DestinationFilter 125 */ 126 protected boolean isValid(ActiveMQDestination destination) { 127 return destination.isTopic() == topic && (destinationFilter == null || destinationFilter.matches(destination)); 128 } 129 }