001    /**
002     *
003     * Copyright 2004 Hiram Chirino
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.ra;
019    
020    import java.util.HashMap;
021    
022    import javax.jms.Connection;
023    import javax.jms.JMSException;
024    import javax.jms.XAConnection;
025    import javax.jms.XASession;
026    import javax.resource.NotSupportedException;
027    import javax.resource.ResourceException;
028    import javax.resource.spi.ActivationSpec;
029    import javax.resource.spi.BootstrapContext;
030    import javax.resource.spi.ResourceAdapter;
031    import javax.resource.spi.ResourceAdapterInternalException;
032    import javax.resource.spi.endpoint.MessageEndpointFactory;
033    import javax.transaction.xa.XAResource;
034    
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    import org.activemq.ActiveMQConnection;
038    import org.activemq.ActiveMQConnectionFactory;
039    import org.activemq.XmlConfigHelper;
040    import org.activemq.broker.BrokerContainer;
041    import org.activemq.broker.BrokerContainerFactory;
042    import org.activemq.broker.BrokerContext;
043    import org.activemq.util.IdGenerator;
044    
045    /**
046     * Knows how to connect to one ActiveMQ server. It can then activate endpoints
047     * and deliver messages to those enpoints using the connection configure in the
048     * resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
049     * 
050     * @version $Revision: 1.2 $
051     */
052    public class ActiveMQResourceAdapter implements ResourceAdapter {
053        private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
054    
055        private static final String ASF_ENDPOINT_WORKER_TYPE = "asf";
056    
057        private static final String POLLING_ENDPOINT_WORKER_TYPE = "polling";
058    
059        private BootstrapContext bootstrapContext;
060    
061        private HashMap endpointWorkers = new HashMap();
062    
063        final private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
064    
065        private String endpointWorkerType = ASF_ENDPOINT_WORKER_TYPE;
066    
067        private ActiveMQConnectionFactory connectionFactory;
068    
069        private BrokerContainer container;
070        
071        private Boolean useEmbeddedBroker;
072        private String brokerXmlConfig;
073    
074        private HashMap connectionFactoryMap = new HashMap(1);
075    
076        public ActiveMQResourceAdapter() {
077        }
078    
079        /**
080         * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
081         */
082        public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
083            this.bootstrapContext = bootstrapContext;
084            if (isUseEmbeddedBroker() != null && isUseEmbeddedBroker().booleanValue()) {
085                createBroker();
086            }
087        }
088    
089        private void createBroker() throws ResourceAdapterInternalException {
090            try {
091                BrokerContainerFactory brokerContainerFactory = XmlConfigHelper.createBrokerContainerFactory(getBrokerXmlConfig());
092    
093                IdGenerator idgen = new IdGenerator();
094                container = brokerContainerFactory.createBrokerContainer(idgen.generateId(), BrokerContext.getInstance());
095                container.start();
096                connectionFactory = new ActiveMQConnectionFactory(container, getServerUrl());
097            } catch (JMSException e) {
098                log.error(e.toString(), e);
099                throw new ResourceAdapterInternalException("Failed to startup an embedded broker", e);
100            }
101        }
102    
103        /**
104         * Return a connection using the default connection request info from the RAR
105         * deployment.
106         */
107        public ActiveMQConnection makeConnection() throws JMSException {
108            return makeConnection(info);
109        }
110    
111        /**
112         * Return a connection using a specific connection request info.
113         */
114        public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo crInfo) throws JMSException {
115    
116            ActiveMQConnectionFactory connectionFactory = getConnectionFactory(crInfo);
117    
118            String userName = info.getUserName();
119            String password = info.getPassword();
120            ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
121    
122            String clientId = info.getClientid();
123            if (clientId != null) {
124                physicalConnection.setClientID(clientId);
125            }
126            return physicalConnection;
127        }
128    
129        /**
130         * @param activationSpec
131         */
132        public ActiveMQConnection makeConnection(ActiveMQActivationSpec activationSpec) throws JMSException {
133            //use the default RA connection request for info
134            ActiveMQConnectionFactory connectionFactory = getConnectionFactory(info);
135            String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
136            String password = defaultValue(activationSpec.getPassword(), info.getPassword());
137            ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
138            if (activationSpec.isDurableSubscription()) {
139                physicalConnection.setClientID(activationSpec.getClientId());
140            }
141            return physicalConnection;
142        }
143    
144        /**
145         * Returns a connection factory given a connection configuration.
146         * The implementation of this method treats the factories as singletons
147         * only creating one factory for a given set of configuration data.
148         */
149        private ActiveMQConnectionFactory getConnectionFactory(ActiveMQConnectionRequestInfo crInfo) {
150            //use adapter default if none provided
151            if(crInfo == null) {
152                crInfo = info;
153            }
154    
155            if(!(connectionFactoryMap.containsKey(crInfo))) {
156                //slightly possible the factory can be set twice here
157                //but highly unlikely and no real functional impact
158                //other than an extra reference
159                synchronized(connectionFactoryMap) {
160                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(crInfo.getServerUrl());
161                    connectionFactoryMap.put(crInfo, factory);
162                    return factory;
163                }
164            }
165            return (ActiveMQConnectionFactory)connectionFactoryMap.get(crInfo);
166        }
167    
168        private String defaultValue(String value, String defaultValue) {
169            if (value != null)
170                return value;
171            return defaultValue;
172        }
173    
174        /**
175         * @see javax.resource.spi.ResourceAdapter#stop()
176         */
177        public void stop() {
178            while (endpointWorkers.size() > 0) {
179                    ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
180                    endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
181            }
182            stopBroker();
183            this.bootstrapContext = null;
184        }
185    
186        private void stopBroker() {
187            if (container != null) {
188                try {
189                    container.stop();
190                } catch (JMSException e) {
191                    log.warn("Exception while stopping the broker container", e);
192                }
193            }
194        }
195    
196        /**
197         * @return
198         */
199        public BootstrapContext getBootstrapContext() {
200            return bootstrapContext;
201        }
202    
203        /**
204         * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
205         *      javax.resource.spi.ActivationSpec)
206         */
207        public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
208                throws ResourceException {
209    
210            // spec section 5.3.3
211            if (activationSpec.getResourceAdapter() != this) {
212                throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
213            }
214    
215            if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
216    
217                ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
218                        (ActiveMQActivationSpec) activationSpec);
219                // This is weird.. the same endpoint activated twice.. must be a
220                // container error.
221                if (endpointWorkers.containsKey(key)) {
222                    throw new IllegalStateException("Endpoint previously activated");
223                }
224    
225                ActiveMQBaseEndpointWorker worker;
226                if (POLLING_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
227                    worker = new ActiveMQPollingEndpointWorker(this, key);
228                } else if (ASF_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
229                    worker = new ActiveMQAsfEndpointWorker(this, key);
230                } else {
231                    throw new NotSupportedException("That type of EndpointWorkerType is not supported: "
232                            + getEndpointWorkerType());
233                }
234    
235                endpointWorkers.put(key, worker);
236                worker.start();
237    
238            } else {
239                throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
240            }
241    
242        }
243    
244        /**
245         * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
246         *      javax.resource.spi.ActivationSpec)
247         */
248        public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
249    
250            if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
251                ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
252                        (ActiveMQActivationSpec) activationSpec);
253                ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker) endpointWorkers.remove(key);
254                if (worker == null) {
255                    // This is weird.. that endpoint was not activated.. oh well..
256                    // this method
257                    // does not throw exceptions so just return.
258                    return;
259                }
260                try {
261                    worker.stop();
262                } catch (InterruptedException e) {
263                    // We interrupted.. we won't throw an exception but will stop
264                    // waiting for the worker
265                    // to stop.. we tried our best. Keep trying to interrupt the
266                    // thread.
267                    Thread.currentThread().interrupt();
268                }
269    
270            }
271    
272        }
273    
274        /**
275         * We only connect to one resource manager per ResourceAdapter instance, so
276         * any ActivationSpec will return the same XAResource.
277         * 
278         * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
279         */
280        public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
281            Connection connection = null;
282            try {
283                connection = makeConnection();
284                if (connection instanceof XAConnection) {
285                    XASession session = ((XAConnection) connection).createXASession();
286                    XAResource xaResource = session.getXAResource();
287                    return new XAResource[] { xaResource };
288                } else {
289                    return new XAResource[] {};
290                }
291            } catch (JMSException e) {
292                throw new ResourceException(e);
293            } finally {
294                try {
295                    connection.close();
296                } catch (Throwable ignore) {
297                }
298            }
299        }
300    
301        // ///////////////////////////////////////////////////////////////////////
302        //
303        // Java Bean getters and setters for this ResourceAdapter class.
304        //
305        // ///////////////////////////////////////////////////////////////////////
306    
307        /**
308         * @return
309         */
310        public String getClientid() {
311            return emptyToNull(info.getClientid());
312        }
313    
314        /**
315         * @return
316         */
317        public String getPassword() {
318            return emptyToNull(info.getPassword());
319        }
320    
321        /**
322         * @return
323         */
324        public String getServerUrl() {
325            return info.getServerUrl();
326        }
327    
328        /**
329         * @return
330         */
331        public String getUserName() {
332            return emptyToNull(info.getUserName());
333        }
334    
335        /**
336         * @param clientid
337         */
338        public void setClientid(String clientid) {
339            info.setClientid(clientid);
340        }
341    
342        /**
343         * @param password
344         */
345        public void setPassword(String password) {
346            info.setPassword(password);
347        }
348    
349        /**
350         * @param url
351         */
352        public void setServerUrl(String url) {
353            info.setServerUrl(url);
354        }
355    
356        /**
357         * @param userid
358         */
359        public void setUserName(String userid) {
360            info.setUserName(userid);
361        }
362    
363        /**
364         * @return Returns the endpointWorkerType.
365         */
366        public String getEndpointWorkerType() {
367            return endpointWorkerType;
368        }
369    
370        /**
371         * @param endpointWorkerType
372         *            The endpointWorkerType to set.
373         */
374        public void setEndpointWorkerType(String endpointWorkerType) {
375            this.endpointWorkerType = endpointWorkerType.toLowerCase();
376        }
377    
378        public String getBrokerXmlConfig() {
379            return brokerXmlConfig;
380        }
381    
382        /**
383         * Sets the <a href="http://activemq.org/Xml+Configuration">XML
384         * configuration file </a> used to configure the ActiveMQ broker via Spring
385         * if using embedded mode.
386         * 
387         * @param brokerXmlConfig
388         *            is the filename which is assumed to be on the classpath unless
389         *            a URL is specified. So a value of <code>foo/bar.xml</code>
390         *            would be assumed to be on the classpath whereas
391         *            <code>file:dir/file.xml</code> would use the file system.
392         *            Any valid URL string is supported.
393         * @see #setUseEmbeddedBroker(Boolean)
394         */
395        public void setBrokerXmlConfig(String brokerXmlConfig) {
396            this.brokerXmlConfig=brokerXmlConfig;
397        }
398    
399        public Boolean isUseEmbeddedBroker() {
400            return useEmbeddedBroker;
401        }
402    
403        public void setUseEmbeddedBroker(Boolean useEmbeddedBroker) {
404            this.useEmbeddedBroker = useEmbeddedBroker;
405        }
406    
407        /**
408         * @return Returns the info.
409         */
410        public ActiveMQConnectionRequestInfo getInfo() {
411            return info;
412        }
413    
414        public boolean equals(Object o) {
415            if (this == o) {
416                return true;
417            }
418            if (!(o instanceof ActiveMQResourceAdapter)) {
419                return false;
420            }
421    
422            final ActiveMQResourceAdapter activeMQResourceAdapter = (ActiveMQResourceAdapter) o;
423    
424            if (!endpointWorkerType.equals(activeMQResourceAdapter.endpointWorkerType)) {
425                return false;
426            }
427            if (!info.equals(activeMQResourceAdapter.info)) {
428                return false;
429            }
430            if ( notEqual(useEmbeddedBroker, activeMQResourceAdapter.useEmbeddedBroker) ) {
431                return false;
432            }
433            if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.brokerXmlConfig) ) {
434                return false;
435            }
436    
437            return true;
438        }
439        
440        private boolean notEqual(Object o1, Object o2) {
441            return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
442        }
443    
444    
445        public int hashCode() {
446            int result;
447            result = info.hashCode();
448            result = 29 * result + endpointWorkerType.hashCode();
449            if (useEmbeddedBroker != null && useEmbeddedBroker.booleanValue()) {
450                result = result * 29 + 1;
451            }
452            if( brokerXmlConfig !=null ) {
453                result ^= brokerXmlConfig.hashCode();
454            }
455            return result;
456        }
457    
458        private String emptyToNull(String value) {
459            if (value == null || value.length() == 0) {
460                return null;
461            }
462            return value;
463        }
464    
465        public Boolean getUseEmbeddedBroker() {
466            return useEmbeddedBroker;
467        }
468    
469        public Boolean getUseInboundSession() {
470            return info.getUseInboundSession();
471        }
472    
473        public void setUseInboundSession(Boolean useInboundSession) {
474            info.setUseInboundSession(useInboundSession);
475        }
476    
477    }