001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import java.util.Iterator;
021    import java.util.Map;
022    
023    import javax.jms.Destination;
024    import javax.jms.JMSException;
025    
026    import org.activemq.broker.BrokerClient;
027    import org.activemq.filter.DestinationFilter;
028    import org.activemq.message.ActiveMQDestination;
029    import org.activemq.message.ActiveMQMessage;
030    import org.activemq.message.ConsumerInfo;
031    import org.activemq.service.MessageContainerManager;
032    
033    /**
034     * Implements an initial image service where on subscription
035     * the client will receive the last image that was previously cached.
036     * This is very useful in financial market data and in rapidly changing
037     * transient event models where you don't want to persist messages
038     * when you are away, but wish to cache the last image, per destination
039     * around so that when a new reliable consumer subscribes you receive the
040     * latest value you may have missed.
041     * <p/>
042     * This is especially true in finance with slow moving markets where you may
043     * have to wait a while for an update (or times when you subscribe after
044     * market close etc).
045     *
046     * @version $Revision: 1.1.1.1 $
047     */
048    public class InitialImageMessageContainerManager extends ProxyMessageContainerManager {
049        private Map cache;
050        private boolean topic;
051        private DestinationFilter destinationFilter;
052    
053        /**
054         * Creates a topic based initial image message container manager using the given destination filter
055         *
056         * @param delegate
057         * @param cache
058         * @param destinationFilter
059         */
060        public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter) {
061            this(delegate, cache, true, destinationFilter);
062        }
063    
064        public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) {
065            super(delegate);
066            this.cache = cache;
067            this.topic = topic;
068            this.destinationFilter = destinationFilter;
069        }
070    
071        public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
072            super.addMessageConsumer(client, info);
073    
074            // lookup message for destination
075            ActiveMQDestination destination = info.getDestination();
076            if (isValid(destination)) {
077                if (destination.isWildcard()) {
078                    DestinationFilter filter = DestinationFilter.parseFilter(destination);
079                    sendMatchingInitialImages(client, info, filter);
080                }
081                else {
082                    ActiveMQMessage message = null;
083                    synchronized (cache) {
084                        message = (ActiveMQMessage) cache.get(destination);
085                    }
086                    if (message != null) {
087                        sendMessage(client, message);
088                    }
089                }
090            }
091        }
092    
093        public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
094            ActiveMQDestination destination = message.getJMSActiveMQDestination();
095            if (isValid(destination)) {
096                cache.put(destination, message);
097            }
098            super.sendMessage(client, message);
099        }
100       
101    
102        // Implementation methods
103        //-------------------------------------------------------------------------
104        protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter) throws JMSException {
105            synchronized (cache) {
106                for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
107                    Map.Entry entry = (Map.Entry) iter.next();
108                    Destination destination = (Destination) entry.getKey();
109                    if (filter.matches(destination)) {
110                        ActiveMQMessage message = (ActiveMQMessage) entry.getValue();
111                        sendMessage(client, message);
112                    }
113                }
114            }
115        }
116    
117    
118        /**
119         * Does this message match the destinations on which initial image caching should be used
120         *
121         * @param destination
122         * @return true if the given destination should use initial image caching
123         *         which is typically true if the message is a topic which may match
124         *         an optional DestinationFilter
125         */
126        protected boolean isValid(ActiveMQDestination destination) {
127            return destination.isTopic() == topic && (destinationFilter == null || destinationFilter.matches(destination));
128        }
129    }