001    /**
002     *
003     * Copyright 2004 Hiram Chirino
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.ra;
019    
020    import javax.jms.ConnectionConsumer;
021    import javax.jms.ExceptionListener;
022    import javax.jms.JMSException;
023    import javax.jms.Session;
024    import javax.jms.Topic;
025    import javax.resource.ResourceException;
026    import javax.resource.spi.work.Work;
027    import javax.resource.spi.work.WorkException;
028    import javax.resource.spi.work.WorkManager;
029    
030    import org.activemq.ActiveMQConnection;
031    import org.activemq.message.ActiveMQDestination;
032    import org.activemq.message.ActiveMQQueue;
033    import org.activemq.message.ActiveMQTopic;
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    /**
038     * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:09 $
039     */
040    public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {
041    
042        private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
043    
044        private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
045        private static final long MAX_RECONNECT_DELAY = 1000*30; // 30 seconds.
046        private static final ThreadLocal threadLocal = new ThreadLocal();
047        
048        private ConnectionConsumer consumer;
049        private ServerSessionPoolImpl serverSessionPool;
050        private ActiveMQDestination dest;
051        private boolean running;
052        private Work connectWork;
053        protected ActiveMQConnection connection;
054        
055        private long reconnectDelay=INITIAL_RECONNECT_DELAY;
056    
057        /**
058         * @param adapter
059         * @param key
060         * @throws ResourceException
061         */
062        public ActiveMQAsfEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key)
063                throws ResourceException {
064            super(adapter, key);
065    
066            connectWork = new Work() {
067    
068                public void release() {
069                }
070    
071                synchronized public void run() {
072                    if( !isRunning() )
073                        return;
074                    if( connection!=null )
075                        return;
076                    
077                    ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
078                    try {
079                        connection = adapter.makeConnection(activationSpec);
080                        connection.start();
081                        connection.setExceptionListener(new ExceptionListener() {
082                            public void onException(JMSException error) {
083                                reconnect(error);
084                            }
085                        });
086    
087                        if (activationSpec.isDurableSubscription()) {
088                            consumer = connection.createDurableConnectionConsumer(
089                                    (Topic) dest,
090                                    activationSpec.getSubscriptionName(), 
091                                    emptyToNull(activationSpec.getMessageSelector()),
092                                    serverSessionPool, 
093                                    activationSpec.getMaxMessagesPerSessionsIntValue(),
094                                    activationSpec.getNoLocalBooleanValue());
095                        } else {
096                            consumer = connection.createConnectionConsumer(
097                                    dest, 
098                                    emptyToNull(activationSpec.getMessageSelector()), 
099                                    serverSessionPool, 
100                                    activationSpec.getMaxMessagesPerSessionsIntValue(),
101                                    activationSpec.getNoLocalBooleanValue());
102                        }
103    
104                    } catch (JMSException error) {
105                        reconnect(error);
106                    }
107                }
108            };
109    
110            ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
111            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
112                dest = new ActiveMQQueue(activationSpec.getDestination());
113            } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
114                dest = new ActiveMQTopic(activationSpec.getDestination());
115            } else {
116                throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
117            }
118        }
119    
120        synchronized public void start() throws WorkException, ResourceException {
121            if (running)
122                return;
123            running = true;
124    
125            log.debug("Starting");
126            serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
127            connect();
128            log.debug("Started");
129        }
130    
131        /**
132         * 
133         */
134        synchronized public void stop() throws InterruptedException {
135            if (!running)
136                return;
137            running = false;
138            serverSessionPool.close();
139            disconnect();        
140        }
141    
142        private boolean isRunning() {
143            return running;
144        }    
145    
146        synchronized private void connect() {
147            if (!running)
148                return;
149    
150            try {
151                workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
152            } catch (WorkException e) {
153                running = false;
154                log.error("Work Manager did not accept work: ",e);
155            }
156        }
157    
158        /**
159         * 
160         */
161        synchronized private void disconnect() {
162            safeClose(consumer);
163            consumer=null;
164            safeClose(connection);
165            connection=null;
166        }
167    
168        private void reconnect(JMSException error){
169            log.debug("Reconnect cause: ",error);
170            long reconnectDelay;
171            synchronized(this) {
172                reconnectDelay = this.reconnectDelay;
173                // Only log errors if the server is really down.. And not a temp failure.
174                if (reconnectDelay == MAX_RECONNECT_DELAY) {
175                    log.info("Endpoint connection to JMS broker failed: " + error.getMessage());
176                    log.info("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds");
177                }
178            }
179            try {
180                disconnect();
181                Thread.sleep(reconnectDelay);
182                
183                synchronized(this) {
184                    // Use exponential rollback.
185                    this.reconnectDelay*=2;
186                    if (this.reconnectDelay > MAX_RECONNECT_DELAY)
187                        this.reconnectDelay=MAX_RECONNECT_DELAY;
188                }
189                connect();
190            } catch(InterruptedException e) {}
191        }
192    
193        protected void registerThreadSession(Session session) {
194            threadLocal.set(session);
195        }
196    
197        protected void unregisterThreadSession(Session session) {
198            threadLocal.set(null);
199        }
200    
201        private String emptyToNull(String value) {
202            if (value == null || value.length() == 0) {
203                return null;
204            }
205            return value;
206        }
207    
208    }