001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import javax.jms.ConnectionConsumer; 021 import javax.jms.ExceptionListener; 022 import javax.jms.JMSException; 023 import javax.jms.Session; 024 import javax.jms.Topic; 025 import javax.resource.ResourceException; 026 import javax.resource.spi.work.Work; 027 import javax.resource.spi.work.WorkException; 028 import javax.resource.spi.work.WorkManager; 029 030 import org.activemq.ActiveMQConnection; 031 import org.activemq.message.ActiveMQDestination; 032 import org.activemq.message.ActiveMQQueue; 033 import org.activemq.message.ActiveMQTopic; 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 037 /** 038 * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:09 $ 039 */ 040 public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker { 041 042 private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class); 043 044 private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second. 045 private static final long MAX_RECONNECT_DELAY = 1000*30; // 30 seconds. 046 private static final ThreadLocal threadLocal = new ThreadLocal(); 047 048 private ConnectionConsumer consumer; 049 private ServerSessionPoolImpl serverSessionPool; 050 private ActiveMQDestination dest; 051 private boolean running; 052 private Work connectWork; 053 protected ActiveMQConnection connection; 054 055 private long reconnectDelay=INITIAL_RECONNECT_DELAY; 056 057 /** 058 * @param adapter 059 * @param key 060 * @throws ResourceException 061 */ 062 public ActiveMQAsfEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) 063 throws ResourceException { 064 super(adapter, key); 065 066 connectWork = new Work() { 067 068 public void release() { 069 } 070 071 synchronized public void run() { 072 if( !isRunning() ) 073 return; 074 if( connection!=null ) 075 return; 076 077 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 078 try { 079 connection = adapter.makeConnection(activationSpec); 080 connection.start(); 081 connection.setExceptionListener(new ExceptionListener() { 082 public void onException(JMSException error) { 083 reconnect(error); 084 } 085 }); 086 087 if (activationSpec.isDurableSubscription()) { 088 consumer = connection.createDurableConnectionConsumer( 089 (Topic) dest, 090 activationSpec.getSubscriptionName(), 091 emptyToNull(activationSpec.getMessageSelector()), 092 serverSessionPool, 093 activationSpec.getMaxMessagesPerSessionsIntValue(), 094 activationSpec.getNoLocalBooleanValue()); 095 } else { 096 consumer = connection.createConnectionConsumer( 097 dest, 098 emptyToNull(activationSpec.getMessageSelector()), 099 serverSessionPool, 100 activationSpec.getMaxMessagesPerSessionsIntValue(), 101 activationSpec.getNoLocalBooleanValue()); 102 } 103 104 } catch (JMSException error) { 105 reconnect(error); 106 } 107 } 108 }; 109 110 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 111 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { 112 dest = new ActiveMQQueue(activationSpec.getDestination()); 113 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { 114 dest = new ActiveMQTopic(activationSpec.getDestination()); 115 } else { 116 throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType()); 117 } 118 } 119 120 synchronized public void start() throws WorkException, ResourceException { 121 if (running) 122 return; 123 running = true; 124 125 log.debug("Starting"); 126 serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); 127 connect(); 128 log.debug("Started"); 129 } 130 131 /** 132 * 133 */ 134 synchronized public void stop() throws InterruptedException { 135 if (!running) 136 return; 137 running = false; 138 serverSessionPool.close(); 139 disconnect(); 140 } 141 142 private boolean isRunning() { 143 return running; 144 } 145 146 synchronized private void connect() { 147 if (!running) 148 return; 149 150 try { 151 workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); 152 } catch (WorkException e) { 153 running = false; 154 log.error("Work Manager did not accept work: ",e); 155 } 156 } 157 158 /** 159 * 160 */ 161 synchronized private void disconnect() { 162 safeClose(consumer); 163 consumer=null; 164 safeClose(connection); 165 connection=null; 166 } 167 168 private void reconnect(JMSException error){ 169 log.debug("Reconnect cause: ",error); 170 long reconnectDelay; 171 synchronized(this) { 172 reconnectDelay = this.reconnectDelay; 173 // Only log errors if the server is really down.. And not a temp failure. 174 if (reconnectDelay == MAX_RECONNECT_DELAY) { 175 log.info("Endpoint connection to JMS broker failed: " + error.getMessage()); 176 log.info("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds"); 177 } 178 } 179 try { 180 disconnect(); 181 Thread.sleep(reconnectDelay); 182 183 synchronized(this) { 184 // Use exponential rollback. 185 this.reconnectDelay*=2; 186 if (this.reconnectDelay > MAX_RECONNECT_DELAY) 187 this.reconnectDelay=MAX_RECONNECT_DELAY; 188 } 189 connect(); 190 } catch(InterruptedException e) {} 191 } 192 193 protected void registerThreadSession(Session session) { 194 threadLocal.set(session); 195 } 196 197 protected void unregisterThreadSession(Session session) { 198 threadLocal.set(null); 199 } 200 201 private String emptyToNull(String value) { 202 if (value == null || value.length() == 0) { 203 return null; 204 } 205 return value; 206 } 207 208 }