001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.gbean; 019 020 import org.activemq.ActiveMQConnectionFactory; 021 import org.activemq.broker.BrokerConnector; 022 import org.activemq.broker.impl.BrokerConnectorImpl; 023 import org.activemq.io.WireFormat; 024 import org.activemq.io.impl.DefaultWireFormat; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.geronimo.gbean.GBeanInfo; 028 import org.apache.geronimo.gbean.GBeanInfoBuilder; 029 import org.apache.geronimo.gbean.GBeanLifecycle; 030 import org.apache.geronimo.gbean.GConstructorInfo; 031 032 import javax.jms.JMSException; 033 import java.net.InetSocketAddress; 034 import java.net.URI; 035 import java.net.URISyntaxException; 036 037 /** 038 * Default implementation of the ActiveMQ connector 039 * 040 * @version $Revision: 1.1.1.1 $ 041 */ 042 public class ActiveMQConnectorGBean implements GBeanLifecycle, ActiveMQConnector { 043 private Log log = LogFactory.getLog(getClass().getName()); 044 045 private BrokerConnector brokerConnector; 046 private ActiveMQContainer container; 047 private WireFormat wireFormat = new DefaultWireFormat(); 048 private String protocol; 049 private String host; 050 private int port; 051 private String path; 052 private String query; 053 private String urlAsStarted; 054 055 public ActiveMQConnectorGBean(ActiveMQContainer container, String protocol, String host, int port) { 056 this.container = container; 057 this.protocol = protocol; 058 this.host = host; 059 this.port = port; 060 } 061 062 public String getProtocol() { 063 return protocol; 064 } 065 066 public void setProtocol(String protocol) { 067 this.protocol = protocol; 068 } 069 070 public String getHost() { 071 return host; 072 } 073 074 public void setHost(String host) { 075 this.host = host; 076 } 077 078 public int getPort() { 079 return port; 080 } 081 082 public void setPort(int port) { 083 this.port = port; 084 } 085 086 public String getPath() { 087 return path; 088 } 089 090 public void setPath(String path) { 091 this.path = path; 092 } 093 094 public String getQuery() { 095 return query; 096 } 097 098 public void setQuery(String query) { 099 this.query = query; 100 } 101 102 public String getUrl() { 103 try { 104 return new URI(protocol, null, host, port, path, query, null).toString(); 105 } catch (URISyntaxException e) { 106 throw new IllegalStateException("Attributes don't form a valid URI: "+protocol+"://"+host+":"+port+"/"+path+"?"+query); 107 } 108 } 109 110 public WireFormat getWireFormat() { 111 return wireFormat; 112 } 113 114 public void setWireFormat(WireFormat wireFormat) { 115 this.wireFormat = wireFormat; 116 } 117 118 public InetSocketAddress getListenAddress() { 119 return brokerConnector == null ? null : brokerConnector.getServerChannel().getSocketAddress(); 120 } 121 122 public synchronized void doStart() throws Exception { 123 ClassLoader old = Thread.currentThread().getContextClassLoader(); 124 Thread.currentThread().setContextClassLoader(ActiveMQContainerGBean.class.getClassLoader()); 125 try { 126 if (brokerConnector == null) { 127 urlAsStarted = getUrl(); 128 brokerConnector = createBrokerConnector(urlAsStarted); 129 brokerConnector.start(); 130 ActiveMQConnectionFactory.registerBroker(urlAsStarted, brokerConnector); 131 } 132 } finally { 133 Thread.currentThread().setContextClassLoader(old); 134 } 135 } 136 137 public synchronized void doStop() throws Exception { 138 if (brokerConnector != null) { 139 ActiveMQConnectionFactory.unregisterBroker(urlAsStarted); 140 BrokerConnector temp = brokerConnector; 141 brokerConnector = null; 142 temp.stop(); 143 } 144 } 145 146 public synchronized void doFail() { 147 if (brokerConnector != null) { 148 BrokerConnector temp = brokerConnector; 149 brokerConnector = null; 150 try { 151 temp.stop(); 152 } 153 catch (JMSException e) { 154 log.info("Caught while closing due to failure: " + e, e); 155 } 156 } 157 } 158 159 protected BrokerConnector createBrokerConnector(String url) throws Exception { 160 return new BrokerConnectorImpl(container.getBrokerContainer(), url, wireFormat); 161 } 162 163 public static final GBeanInfo GBEAN_INFO; 164 165 static { 166 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("ActiveMQ Message Broker Connector", ActiveMQConnectorGBean.class, CONNECTOR_J2EE_TYPE); 167 infoFactory.addAttribute("url", String.class.getName(), false); 168 infoFactory.addAttribute("wireFormat", WireFormat.class.getName(), false); 169 infoFactory.addReference("activeMQContainer", ActiveMQContainer.class); 170 infoFactory.addInterface(ActiveMQConnector.class, new String[]{"host","port","protocol","path","query"}, 171 new String[]{"host","port"}); 172 infoFactory.setConstructor(new GConstructorInfo(new String[]{"activeMQContainer", "protocol", "host", "port"})); 173 GBEAN_INFO = infoFactory.getBeanInfo(); 174 } 175 176 public static GBeanInfo getGBeanInfo() { 177 return GBEAN_INFO; 178 } 179 }