001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    
026    import javax.jms.JMSException;
027    
028    import org.activemq.ActiveMQPrefetchPolicy;
029    import org.activemq.broker.BrokerContainer;
030    import org.activemq.service.Service;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
035    
036    /**
037     * Represents a connector to one or more remote brokers.
038     * This class manages a number of {@link NetworkChannel} instances
039     * which may or may not be connected to a
040     * remote broker at any point in time.
041     * <p/>
042     * The implementation of this class could use a fixed number of
043     * configured {@link NetworkChannel} instances or could use
044     * discovery to find them.
045     *
046     * @version $Revision: 1.1.1.1 $
047     */
048    public class NetworkConnector implements Service {
049        private static final Log log = LogFactory.getLog(NetworkConnector.class);
050    
051        private BrokerContainer brokerContainer;
052        private List networkChannels = new ArrayList();
053        private Map localDetails = new HashMap();
054        private String remoteUserName;
055        private String remotePassword;
056        protected PooledExecutor threadPool;
057        private ActiveMQPrefetchPolicy localPrefetchPolicy = new ActiveMQPrefetchPolicy();
058        private ActiveMQPrefetchPolicy remotePrefetchPolicy = new ActiveMQPrefetchPolicy();
059    
060    
061        public NetworkConnector(BrokerContainer brokerContainer) {
062            this.brokerContainer = brokerContainer;
063            this.threadPool = new PooledExecutor();
064        }
065    
066        public void start() throws JMSException {
067            for (Iterator iter = networkChannels.iterator(); iter.hasNext();) {
068                NetworkChannel networkChannel = (NetworkChannel) iter.next();
069                networkChannel.setBrokerContainer(getBrokerContainer());
070                networkChannel.setThreadPool(threadPool);
071                networkChannel.start();
072            }
073        }
074    
075        public void stop() throws JMSException {
076            for (Iterator iter = networkChannels.iterator(); iter.hasNext();) {
077                NetworkChannel networkChannel = (NetworkChannel) iter.next();
078                try {
079                    networkChannel.stop();
080                }
081                catch (JMSException e) {
082                    log.warn("Failed to stop network channel: " + e, e);
083                }
084            }
085        }
086    
087        public void setTransportChannelListener(TransportChannelListener listener) {
088        }
089        
090        
091    
092    
093        // Properties
094        //-------------------------------------------------------------------------
095        
096        public BrokerContainer getBrokerContainer() {
097            return brokerContainer;
098        }
099        
100    
101        /**
102         * @return Returns the threadPool.
103         */
104        public PooledExecutor getThreadPool() {
105            return threadPool;
106        }
107       
108    
109        public List getNetworkChannels() {
110            return networkChannels;
111        }
112        
113        public Map getLocalDetails() {
114            return localDetails;
115        }
116    
117        public void setLocalDetails(Map localDetails) {
118            this.localDetails = localDetails;
119        }
120    
121        public String getRemotePassword() {
122            return remotePassword;
123        }
124    
125        public void setRemotePassword(String remotePassword) {
126            this.remotePassword = remotePassword;
127        }
128    
129        public String getRemoteUserName() {
130            return remoteUserName;
131        }
132    
133        public void setRemoteUserName(String remoteUserName) {
134            this.remoteUserName = remoteUserName;
135        }
136    
137    
138        /**
139         * Sets a list of {@link NetworkChannel} instances
140         *
141         * @param networkChannels
142         */
143        public void setNetworkChannels(List networkChannels) {
144            this.networkChannels = networkChannels;
145        }
146    
147        /**
148         * Adds a new network channel for the given URI
149         *
150         * @param uri
151         * @return
152         */
153        public NetworkChannel addNetworkChannel(String uri) throws JMSException {
154            NetworkChannel networkChannel = createNetworkChannel(uri);
155            addNetworkChannel(networkChannel);
156            return networkChannel;
157        }
158        
159    
160        /**
161         * Adds a new network channel
162         */
163        public void addNetworkChannel(NetworkChannel networkChannel) throws JMSException {
164            configure(networkChannel);
165            networkChannels.add(networkChannel);
166        }
167    
168        /**
169         * Removes a network channel
170         */
171        public void removeNetworkChannel(NetworkChannel networkChannel) {
172            networkChannels.remove(networkChannel);
173        }
174    
175        public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
176            return localPrefetchPolicy;
177        }
178    
179        public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) {
180            this.localPrefetchPolicy = localPrefetchPolicy;
181        }
182    
183        public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
184            return remotePrefetchPolicy;
185        }
186    
187        public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) {
188            this.remotePrefetchPolicy = remotePrefetchPolicy;
189        }
190    
191    
192        // Expose individual prefetch properties as individual properties
193        //-------------------------------------------------------------------------
194        public int getLocalDurableTopicPrefetch() {
195            return localPrefetchPolicy.getDurableTopicPrefetch();
196        }
197    
198        public void setLocalDurableTopicPrefetch(int durableTopicPrefetch) {
199            localPrefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch);
200        }
201    
202        public int getLocalQueuePrefetch() {
203            return localPrefetchPolicy.getQueuePrefetch();
204        }
205    
206        public void setLocalQueuePrefetch(int queuePrefetch) {
207            localPrefetchPolicy.setQueuePrefetch(queuePrefetch);
208        }
209    
210        public int getLocalQueueBrowserPrefetch() {
211            return localPrefetchPolicy.getQueueBrowserPrefetch();
212        }
213    
214        public void setLocalQueueBrowserPrefetch(int queueBrowserPrefetch) {
215            localPrefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch);
216        }
217    
218        public int getLocalTopicPrefetch() {
219            return localPrefetchPolicy.getTopicPrefetch();
220        }
221    
222        public void setLocalTopicPrefetch(int topicPrefetch) {
223            localPrefetchPolicy.setTopicPrefetch(topicPrefetch);
224        }
225    
226    
227        public int getRemoteDurableTopicPrefetch() {
228            return remotePrefetchPolicy.getDurableTopicPrefetch();
229        }
230    
231        public void setRemoteDurableTopicPrefetch(int durableTopicPrefetch) {
232            remotePrefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch);
233        }
234    
235        public int getRemoteQueuePrefetch() {
236            return remotePrefetchPolicy.getQueuePrefetch();
237        }
238    
239        public void setRemoteQueuePrefetch(int queuePrefetch) {
240            remotePrefetchPolicy.setQueuePrefetch(queuePrefetch);
241        }
242    
243        public int getRemoteQueueBrowserPrefetch() {
244            return remotePrefetchPolicy.getQueueBrowserPrefetch();
245        }
246    
247        public void setRemoteQueueBrowserPrefetch(int queueBrowserPrefetch) {
248            remotePrefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch);
249        }
250    
251        public int getRemoteTopicPrefetch() {
252            return remotePrefetchPolicy.getTopicPrefetch();
253        }
254    
255        public void setRemoteTopicPrefetch(int topicPrefetch) {
256            remotePrefetchPolicy.setTopicPrefetch(topicPrefetch);
257        }
258    
259    
260        // Implementation methods
261        //-------------------------------------------------------------------------
262    
263    
264        /**
265         * Create a channel from the url
266         * @param url
267         * @return
268         */
269        protected NetworkChannel createNetworkChannel(String url) {
270            NetworkChannel answer = new NetworkChannel(this,getBrokerContainer(), url);
271            answer.setRemoteUserName(getRemoteUserName());
272            answer.setRemotePassword(getRemotePassword());
273            return answer;
274        }
275    
276        /**
277         * Performs any network connector based configuration; such as setting the dispatch policies
278         *
279         * @param networkChannel
280         */
281        protected void configure(NetworkChannel networkChannel) throws JMSException {
282            networkChannel.setLocalPrefetchPolicy(localPrefetchPolicy);
283            networkChannel.setRemotePrefetchPolicy(remotePrefetchPolicy);
284        }
285    
286    }