001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import java.util.HashMap; 021 022 import javax.jms.Connection; 023 import javax.jms.JMSException; 024 import javax.jms.XAConnection; 025 import javax.jms.XASession; 026 import javax.resource.NotSupportedException; 027 import javax.resource.ResourceException; 028 import javax.resource.spi.ActivationSpec; 029 import javax.resource.spi.BootstrapContext; 030 import javax.resource.spi.ResourceAdapter; 031 import javax.resource.spi.ResourceAdapterInternalException; 032 import javax.resource.spi.endpoint.MessageEndpointFactory; 033 import javax.transaction.xa.XAResource; 034 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 import org.activemq.ActiveMQConnection; 038 import org.activemq.ActiveMQConnectionFactory; 039 import org.activemq.XmlConfigHelper; 040 import org.activemq.broker.BrokerContainer; 041 import org.activemq.broker.BrokerContainerFactory; 042 import org.activemq.broker.BrokerContext; 043 import org.activemq.util.IdGenerator; 044 045 /** 046 * Knows how to connect to one ActiveMQ server. It can then activate endpoints 047 * and deliver messages to those enpoints using the connection configure in the 048 * resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4) 049 * 050 * @version $Revision: 1.2 $ 051 */ 052 public class ActiveMQResourceAdapter implements ResourceAdapter { 053 private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class); 054 055 private static final String ASF_ENDPOINT_WORKER_TYPE = "asf"; 056 057 private static final String POLLING_ENDPOINT_WORKER_TYPE = "polling"; 058 059 private BootstrapContext bootstrapContext; 060 061 private HashMap endpointWorkers = new HashMap(); 062 063 final private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo(); 064 065 private String endpointWorkerType = ASF_ENDPOINT_WORKER_TYPE; 066 067 private ActiveMQConnectionFactory connectionFactory; 068 069 private BrokerContainer container; 070 071 private Boolean useEmbeddedBroker; 072 private String brokerXmlConfig; 073 074 private HashMap connectionFactoryMap = new HashMap(1); 075 076 public ActiveMQResourceAdapter() { 077 } 078 079 /** 080 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) 081 */ 082 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { 083 this.bootstrapContext = bootstrapContext; 084 if (isUseEmbeddedBroker() != null && isUseEmbeddedBroker().booleanValue()) { 085 createBroker(); 086 } 087 } 088 089 private void createBroker() throws ResourceAdapterInternalException { 090 try { 091 BrokerContainerFactory brokerContainerFactory = XmlConfigHelper.createBrokerContainerFactory(getBrokerXmlConfig()); 092 093 IdGenerator idgen = new IdGenerator(); 094 container = brokerContainerFactory.createBrokerContainer(idgen.generateId(), BrokerContext.getInstance()); 095 container.start(); 096 connectionFactory = new ActiveMQConnectionFactory(container, getServerUrl()); 097 } catch (JMSException e) { 098 log.error(e.toString(), e); 099 throw new ResourceAdapterInternalException("Failed to startup an embedded broker", e); 100 } 101 } 102 103 /** 104 * Return a connection using the default connection request info from the RAR 105 * deployment. 106 */ 107 public ActiveMQConnection makeConnection() throws JMSException { 108 return makeConnection(info); 109 } 110 111 /** 112 * Return a connection using a specific connection request info. 113 */ 114 public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo crInfo) throws JMSException { 115 116 ActiveMQConnectionFactory connectionFactory = getConnectionFactory(crInfo); 117 118 String userName = info.getUserName(); 119 String password = info.getPassword(); 120 ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); 121 122 String clientId = info.getClientid(); 123 if (clientId != null) { 124 physicalConnection.setClientID(clientId); 125 } 126 return physicalConnection; 127 } 128 129 /** 130 * @param activationSpec 131 */ 132 public ActiveMQConnection makeConnection(ActiveMQActivationSpec activationSpec) throws JMSException { 133 //use the default RA connection request for info 134 ActiveMQConnectionFactory connectionFactory = getConnectionFactory(info); 135 String userName = defaultValue(activationSpec.getUserName(), info.getUserName()); 136 String password = defaultValue(activationSpec.getPassword(), info.getPassword()); 137 ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); 138 if (activationSpec.isDurableSubscription()) { 139 physicalConnection.setClientID(activationSpec.getClientId()); 140 } 141 return physicalConnection; 142 } 143 144 /** 145 * Returns a connection factory given a connection configuration. 146 * The implementation of this method treats the factories as singletons 147 * only creating one factory for a given set of configuration data. 148 */ 149 private ActiveMQConnectionFactory getConnectionFactory(ActiveMQConnectionRequestInfo crInfo) { 150 //use adapter default if none provided 151 if(crInfo == null) { 152 crInfo = info; 153 } 154 155 if(!(connectionFactoryMap.containsKey(crInfo))) { 156 //slightly possible the factory can be set twice here 157 //but highly unlikely and no real functional impact 158 //other than an extra reference 159 synchronized(connectionFactoryMap) { 160 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(crInfo.getServerUrl()); 161 connectionFactoryMap.put(crInfo, factory); 162 return factory; 163 } 164 } 165 return (ActiveMQConnectionFactory)connectionFactoryMap.get(crInfo); 166 } 167 168 private String defaultValue(String value, String defaultValue) { 169 if (value != null) 170 return value; 171 return defaultValue; 172 } 173 174 /** 175 * @see javax.resource.spi.ResourceAdapter#stop() 176 */ 177 public void stop() { 178 while (endpointWorkers.size() > 0) { 179 ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next(); 180 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); 181 } 182 stopBroker(); 183 this.bootstrapContext = null; 184 } 185 186 private void stopBroker() { 187 if (container != null) { 188 try { 189 container.stop(); 190 } catch (JMSException e) { 191 log.warn("Exception while stopping the broker container", e); 192 } 193 } 194 } 195 196 /** 197 * @return 198 */ 199 public BootstrapContext getBootstrapContext() { 200 return bootstrapContext; 201 } 202 203 /** 204 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, 205 * javax.resource.spi.ActivationSpec) 206 */ 207 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) 208 throws ResourceException { 209 210 // spec section 5.3.3 211 if (activationSpec.getResourceAdapter() != this) { 212 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance"); 213 } 214 215 if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) { 216 217 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, 218 (ActiveMQActivationSpec) activationSpec); 219 // This is weird.. the same endpoint activated twice.. must be a 220 // container error. 221 if (endpointWorkers.containsKey(key)) { 222 throw new IllegalStateException("Endpoint previously activated"); 223 } 224 225 ActiveMQBaseEndpointWorker worker; 226 if (POLLING_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) { 227 worker = new ActiveMQPollingEndpointWorker(this, key); 228 } else if (ASF_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) { 229 worker = new ActiveMQAsfEndpointWorker(this, key); 230 } else { 231 throw new NotSupportedException("That type of EndpointWorkerType is not supported: " 232 + getEndpointWorkerType()); 233 } 234 235 endpointWorkers.put(key, worker); 236 worker.start(); 237 238 } else { 239 throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass()); 240 } 241 242 } 243 244 /** 245 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, 246 * javax.resource.spi.ActivationSpec) 247 */ 248 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { 249 250 if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) { 251 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, 252 (ActiveMQActivationSpec) activationSpec); 253 ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker) endpointWorkers.remove(key); 254 if (worker == null) { 255 // This is weird.. that endpoint was not activated.. oh well.. 256 // this method 257 // does not throw exceptions so just return. 258 return; 259 } 260 try { 261 worker.stop(); 262 } catch (InterruptedException e) { 263 // We interrupted.. we won't throw an exception but will stop 264 // waiting for the worker 265 // to stop.. we tried our best. Keep trying to interrupt the 266 // thread. 267 Thread.currentThread().interrupt(); 268 } 269 270 } 271 272 } 273 274 /** 275 * We only connect to one resource manager per ResourceAdapter instance, so 276 * any ActivationSpec will return the same XAResource. 277 * 278 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) 279 */ 280 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { 281 Connection connection = null; 282 try { 283 connection = makeConnection(); 284 if (connection instanceof XAConnection) { 285 XASession session = ((XAConnection) connection).createXASession(); 286 XAResource xaResource = session.getXAResource(); 287 return new XAResource[] { xaResource }; 288 } else { 289 return new XAResource[] {}; 290 } 291 } catch (JMSException e) { 292 throw new ResourceException(e); 293 } finally { 294 try { 295 connection.close(); 296 } catch (Throwable ignore) { 297 } 298 } 299 } 300 301 // /////////////////////////////////////////////////////////////////////// 302 // 303 // Java Bean getters and setters for this ResourceAdapter class. 304 // 305 // /////////////////////////////////////////////////////////////////////// 306 307 /** 308 * @return 309 */ 310 public String getClientid() { 311 return emptyToNull(info.getClientid()); 312 } 313 314 /** 315 * @return 316 */ 317 public String getPassword() { 318 return emptyToNull(info.getPassword()); 319 } 320 321 /** 322 * @return 323 */ 324 public String getServerUrl() { 325 return info.getServerUrl(); 326 } 327 328 /** 329 * @return 330 */ 331 public String getUserName() { 332 return emptyToNull(info.getUserName()); 333 } 334 335 /** 336 * @param clientid 337 */ 338 public void setClientid(String clientid) { 339 info.setClientid(clientid); 340 } 341 342 /** 343 * @param password 344 */ 345 public void setPassword(String password) { 346 info.setPassword(password); 347 } 348 349 /** 350 * @param url 351 */ 352 public void setServerUrl(String url) { 353 info.setServerUrl(url); 354 } 355 356 /** 357 * @param userid 358 */ 359 public void setUserName(String userid) { 360 info.setUserName(userid); 361 } 362 363 /** 364 * @return Returns the endpointWorkerType. 365 */ 366 public String getEndpointWorkerType() { 367 return endpointWorkerType; 368 } 369 370 /** 371 * @param endpointWorkerType 372 * The endpointWorkerType to set. 373 */ 374 public void setEndpointWorkerType(String endpointWorkerType) { 375 this.endpointWorkerType = endpointWorkerType.toLowerCase(); 376 } 377 378 public String getBrokerXmlConfig() { 379 return brokerXmlConfig; 380 } 381 382 /** 383 * Sets the <a href="http://activemq.org/Xml+Configuration">XML 384 * configuration file </a> used to configure the ActiveMQ broker via Spring 385 * if using embedded mode. 386 * 387 * @param brokerXmlConfig 388 * is the filename which is assumed to be on the classpath unless 389 * a URL is specified. So a value of <code>foo/bar.xml</code> 390 * would be assumed to be on the classpath whereas 391 * <code>file:dir/file.xml</code> would use the file system. 392 * Any valid URL string is supported. 393 * @see #setUseEmbeddedBroker(Boolean) 394 */ 395 public void setBrokerXmlConfig(String brokerXmlConfig) { 396 this.brokerXmlConfig=brokerXmlConfig; 397 } 398 399 public Boolean isUseEmbeddedBroker() { 400 return useEmbeddedBroker; 401 } 402 403 public void setUseEmbeddedBroker(Boolean useEmbeddedBroker) { 404 this.useEmbeddedBroker = useEmbeddedBroker; 405 } 406 407 /** 408 * @return Returns the info. 409 */ 410 public ActiveMQConnectionRequestInfo getInfo() { 411 return info; 412 } 413 414 public boolean equals(Object o) { 415 if (this == o) { 416 return true; 417 } 418 if (!(o instanceof ActiveMQResourceAdapter)) { 419 return false; 420 } 421 422 final ActiveMQResourceAdapter activeMQResourceAdapter = (ActiveMQResourceAdapter) o; 423 424 if (!endpointWorkerType.equals(activeMQResourceAdapter.endpointWorkerType)) { 425 return false; 426 } 427 if (!info.equals(activeMQResourceAdapter.info)) { 428 return false; 429 } 430 if ( notEqual(useEmbeddedBroker, activeMQResourceAdapter.useEmbeddedBroker) ) { 431 return false; 432 } 433 if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.brokerXmlConfig) ) { 434 return false; 435 } 436 437 return true; 438 } 439 440 private boolean notEqual(Object o1, Object o2) { 441 return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2)); 442 } 443 444 445 public int hashCode() { 446 int result; 447 result = info.hashCode(); 448 result = 29 * result + endpointWorkerType.hashCode(); 449 if (useEmbeddedBroker != null && useEmbeddedBroker.booleanValue()) { 450 result = result * 29 + 1; 451 } 452 if( brokerXmlConfig !=null ) { 453 result ^= brokerXmlConfig.hashCode(); 454 } 455 return result; 456 } 457 458 private String emptyToNull(String value) { 459 if (value == null || value.length() == 0) { 460 return null; 461 } 462 return value; 463 } 464 465 public Boolean getUseEmbeddedBroker() { 466 return useEmbeddedBroker; 467 } 468 469 public Boolean getUseInboundSession() { 470 return info.getUseInboundSession(); 471 } 472 473 public void setUseInboundSession(Boolean useInboundSession) { 474 info.setUseInboundSession(useInboundSession); 475 } 476 477 }