001    /**
002     *
003     * Copyright 2004 Hiram Chirino
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.ra;
019    
020    import java.io.PrintWriter;
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    
024    import javax.jms.Connection;
025    import javax.jms.ExceptionListener;
026    import javax.jms.JMSException;
027    import javax.resource.ResourceException;
028    import javax.resource.spi.ConnectionEvent;
029    import javax.resource.spi.ConnectionEventListener;
030    import javax.resource.spi.ConnectionRequestInfo;
031    import javax.resource.spi.LocalTransaction;
032    import javax.resource.spi.ManagedConnection;
033    import javax.resource.spi.ManagedConnectionMetaData;
034    import javax.security.auth.Subject;
035    import javax.transaction.xa.XAResource;
036    
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    import org.activemq.ActiveMQConnection;
040    import org.activemq.LocalTransactionEventListener;
041    import org.activemq.TransactionContext;
042    
043    /**
044     * ActiveMQManagedConnection maps to real physical connection to the
045     * server.  Since a ManagedConnection has to provide a transaction
046     * managment interface to the physical connection, and sessions
047     * are the objects implement transaction managment interfaces in
048     * the JMS API, this object also maps to a singe physical JMS session.
049     * <p/>
050     * The side-effect is that JMS connection the application gets
051     * will allways create the same session object.  This is good if
052     * running in an app server since the sessions are elisted in the
053     * context transaction.  This is bad if used outside of an app
054     * server since the user may be trying to create 2 different
055     * sessions to coordinate 2 different uow.
056     *
057     * @version $Revision: 1.1.1.1 $
058     */
059    public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: , DissociatableManagedConnection {
060    
061        private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
062    
063        private PrintWriter logWriter;
064    
065        private final ActiveMQConnection physicalConnection;
066        private final TransactionContext transactionContext;
067        private final ArrayList proxyConnections = new ArrayList();
068        private final ArrayList listeners = new ArrayList();
069        private final LocalAndXATransaction localAndXATransaction;
070        
071        private Subject subject;
072        private ActiveMQConnectionRequestInfo info;
073        private boolean destoryed;
074    
075        public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
076            try {
077                this.subject = subject;
078                this.info = info;
079                this.physicalConnection = physicalConnection;
080                this.transactionContext = new TransactionContext(physicalConnection);
081                
082                this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
083                    public void setInManagedTx(boolean inManagedTx) throws JMSException {
084                        super.setInManagedTx(inManagedTx);                    
085                        Iterator iterator = proxyConnections.iterator();
086                        while (iterator.hasNext()) {
087                            JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
088                            proxy.setUseSharedTxContext(inManagedTx);
089                        }                    
090                    }
091                };
092                
093                this.transactionContext.setLocalTransactionEventListener( new LocalTransactionEventListener() {
094                    public void beginEvent() {
095                        fireBeginEvent();
096                    }
097                    public void commitEvent() {
098                        fireCommitEvent();
099                    }
100                    public void rollbackEvent() {
101                        fireRollbackEvent();
102                    }
103                });
104                            
105                physicalConnection.setExceptionListener(this);
106                    } catch (JMSException e) {
107                throw new ResourceException("Could not create a new connection: "+e.getMessage(), e);
108            }                       
109        }
110        
111        public boolean isInManagedTx() {
112            return localAndXATransaction.isInManagedTx();
113        }
114        
115        static public boolean matches(Object x, Object y) {
116            if (x == null ^ y == null) {
117                return false;
118            }
119            if (x != null && !x.equals(y)) {
120                return false;
121            }
122            return true;
123        }
124    
125        public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
126    
127            // Do we need to change the associated userid/password
128            if( !matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword()) ) {
129                ((ActiveMQConnection)physicalConnection).changeUserInfo(info.getUserName(), info.getPassword());
130            }
131            
132            // Do we need to set the clientId?
133            if( info.getClientid()!=null && info.getClientid().length()>0 ) 
134                physicalConnection.setClientID(info.getClientid());
135    
136            this.subject = subject;
137            this.info = info;
138        }
139    
140        public Connection getPhysicalConnection() {
141            return physicalConnection;
142        }
143        
144        private void fireBeginEvent() {
145            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
146                    ConnectionEvent.LOCAL_TRANSACTION_STARTED);
147            Iterator iterator = listeners.iterator();
148            while (iterator.hasNext()) {
149                ConnectionEventListener l = (ConnectionEventListener) iterator.next();
150                l.localTransactionStarted(event);
151            }
152        }
153    
154        private void fireCommitEvent() {
155            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
156                    ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
157            Iterator iterator = listeners.iterator();
158            while (iterator.hasNext()) {
159                ConnectionEventListener l = (ConnectionEventListener) iterator.next();
160                l.localTransactionCommitted(event);
161            }
162        }
163    
164        private void fireRollbackEvent() {
165            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
166                    ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
167            Iterator iterator = listeners.iterator();
168            while (iterator.hasNext()) {
169                ConnectionEventListener l = (ConnectionEventListener) iterator.next();
170                l.localTransactionRolledback(event);
171            }
172        }
173    
174        private void fireCloseEvent(JMSConnectionProxy proxy) {
175            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
176                    ConnectionEvent.CONNECTION_CLOSED);
177            event.setConnectionHandle(proxy);
178            
179            Iterator iterator = listeners.iterator();
180            while (iterator.hasNext()) {
181                ConnectionEventListener l = (ConnectionEventListener) iterator.next();
182                l.connectionClosed(event);
183            }
184        }
185    
186        private void fireErrorOccurredEvent(Exception error) {
187            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
188                    ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
189            Iterator iterator = listeners.iterator();
190            while (iterator.hasNext()) {
191                ConnectionEventListener l = (ConnectionEventListener) iterator.next();
192                l.connectionErrorOccurred(event);
193            }
194        }
195    
196        /**
197         * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
198         *      javax.resource.spi.ConnectionRequestInfo)
199         */
200        public Object getConnection(Subject subject, ConnectionRequestInfo info)
201                throws ResourceException {
202            JMSConnectionProxy proxy = new JMSConnectionProxy(this);
203            proxyConnections.add(proxy);
204            return proxy;
205        }
206    
207        private boolean isDestroyed() {
208            return destoryed;
209        }
210        
211        /**
212         * Close down the physical connection to the server.
213         *
214         * @see javax.resource.spi.ManagedConnection#destroy()
215         */
216        public void destroy() throws ResourceException {
217            // Have we allready been destroyed??
218            if (isDestroyed()) {
219                return;
220            }
221    
222            cleanup();
223    
224            try {
225                physicalConnection.close();
226                destoryed = true;
227            } catch (JMSException e) {
228                log.info("Error occured during close of a JMS connection.", e);
229            }
230        }
231    
232        /**
233         * Cleans up all proxy handles attached to this physical connection so that
234         * they cannot be used anymore.
235         * 
236         * @see javax.resource.spi.ManagedConnection#cleanup()
237         */
238        public void cleanup() throws ResourceException {
239            
240            // Have we allready been destroyed??
241            if (isDestroyed()) {
242                return;
243            }
244    
245            Iterator iterator = proxyConnections.iterator();
246            while (iterator.hasNext()) {
247                JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
248                proxy.cleanup();
249            }
250            proxyConnections.clear();
251    
252            try {
253                ((ActiveMQConnection)physicalConnection).cleanup();
254            } catch (JMSException e) {
255                throw new ResourceException("Could cleanup the ActiveMQ connection: "+e, e);
256            }
257                
258        }
259    
260        /**
261         * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
262         */
263        public void associateConnection(Object connection) throws ResourceException {
264            throw new ResourceException("Not supported.");
265        }
266    
267        /**
268         * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
269         */
270        public void addConnectionEventListener(ConnectionEventListener listener) {
271            listeners.add(listener);
272        }
273    
274        /**
275         * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
276         */
277        public void removeConnectionEventListener(ConnectionEventListener listener) {
278            listeners.remove(listener);
279        }
280    
281        /**
282         * @see javax.resource.spi.ManagedConnection#getXAResource()
283         */
284        public XAResource getXAResource() throws ResourceException {
285            return localAndXATransaction;
286        }
287    
288        /**
289         * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
290         */
291        public LocalTransaction getLocalTransaction() throws ResourceException {
292            return localAndXATransaction;
293        }
294    
295        /**
296         * @see javax.resource.spi.ManagedConnection#getMetaData()
297         */
298        public ManagedConnectionMetaData getMetaData() throws ResourceException {
299            return new ManagedConnectionMetaData() {
300    
301                public String getEISProductName() throws ResourceException {
302                    if (physicalConnection == null) {
303                        throw new ResourceException("Not connected.");
304                    }
305                    try {
306                        return physicalConnection.getMetaData().getJMSProviderName();
307                    }
308                    catch (JMSException e) {
309                        throw new ResourceException("Error accessing provider.", e);
310                    }
311                }
312    
313                public String getEISProductVersion() throws ResourceException {
314                    if (physicalConnection == null) {
315                        throw new ResourceException("Not connected.");
316                    }
317                    try {
318                        return physicalConnection.getMetaData().getProviderVersion();
319                    }
320                    catch (JMSException e) {
321                        throw new ResourceException("Error accessing provider.", e);
322                    }
323                }
324    
325                public int getMaxConnections() throws ResourceException {
326                    if (physicalConnection == null) {
327                        throw new ResourceException("Not connected.");
328                    }
329                    return Integer.MAX_VALUE;
330                }
331    
332                public String getUserName() throws ResourceException {
333                    if (physicalConnection == null) {
334                        throw new ResourceException("Not connected.");
335                    }
336                    try {
337                        return physicalConnection.getClientID();
338                    }
339                    catch (JMSException e) {
340                        throw new ResourceException("Error accessing provider.", e);
341                    }
342                }
343            };
344        }
345    
346        /**
347         * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
348         */
349        public void setLogWriter(PrintWriter logWriter) throws ResourceException {
350            this.logWriter = logWriter;
351        }
352    
353        /**
354         * @see javax.resource.spi.ManagedConnection#getLogWriter()
355         */
356        public PrintWriter getLogWriter() throws ResourceException {
357            return logWriter;
358        }
359    
360        /**
361         * @param subject
362         * @param info
363         * @return
364         */
365        public boolean matches(Subject subject, ConnectionRequestInfo info) {
366    
367            // Check to see if it is our info class
368            if (info == null) {
369                return false;
370            }
371            if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
372                return false;
373            }
374    
375            // Do the subjects match?
376            if (subject == null ^ this.subject == null) {
377                return false;
378            }
379            if (subject != null && !subject.equals(this.subject)) {
380                return false;
381            }
382    
383            // Does the info match?
384            return info.equals(this.info);
385        }
386    
387        /**
388         * When a proxy is closed this cleans up the proxy and notifys the
389         * ConnectionEventListeners that a connection closed.
390         *
391         * @param proxy
392         */
393        public void proxyClosedEvent(JMSConnectionProxy proxy) {
394            proxyConnections.remove(proxy);
395            proxy.cleanup();
396            fireCloseEvent(proxy);
397        }
398    
399        public void onException(JMSException e) {
400            log.warn("Connection failed: "+e);
401            log.debug("Cause: ", e);
402            
403            // Let any active proxy connections know that exception occured.
404            for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) {
405                JMSConnectionProxy proxy = (JMSConnectionProxy) iter.next();
406                proxy.onException(e);
407            }
408            // Let the container know that the error occured.
409            fireErrorOccurredEvent(e);
410        }
411    
412        /**
413         * @return Returns the transactionContext.
414         */
415        public TransactionContext getTransactionContext() {
416            return transactionContext;
417        }
418    
419    }