001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.gbean;
019    
020    import org.activemq.ActiveMQConnectionFactory;
021    import org.activemq.broker.BrokerConnector;
022    import org.activemq.broker.impl.BrokerConnectorImpl;
023    import org.activemq.io.WireFormat;
024    import org.activemq.io.impl.DefaultWireFormat;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.geronimo.gbean.GBeanInfo;
028    import org.apache.geronimo.gbean.GBeanInfoBuilder;
029    import org.apache.geronimo.gbean.GBeanLifecycle;
030    import org.apache.geronimo.gbean.GConstructorInfo;
031    
032    import javax.jms.JMSException;
033    import java.net.InetSocketAddress;
034    import java.net.URI;
035    import java.net.URISyntaxException;
036    
037    /**
038     * Default implementation of the ActiveMQ connector
039     *
040     * @version $Revision: 1.1.1.1 $
041     */
042    public class ActiveMQConnectorGBean implements GBeanLifecycle, ActiveMQConnector {
043        private Log log = LogFactory.getLog(getClass().getName());
044    
045        private BrokerConnector brokerConnector;
046        private ActiveMQContainer container;
047        private WireFormat wireFormat = new DefaultWireFormat();
048        private String protocol;
049        private String host;
050        private int port;
051        private String path;
052        private String query;
053        private String urlAsStarted;
054    
055        public ActiveMQConnectorGBean(ActiveMQContainer container, String protocol, String host, int port) {
056            this.container = container;
057            this.protocol = protocol;
058            this.host = host;
059            this.port = port;
060        }
061    
062        public String getProtocol() {
063            return protocol;
064        }
065    
066        public void setProtocol(String protocol) {
067            this.protocol = protocol;
068        }
069    
070        public String getHost() {
071            return host;
072        }
073    
074        public void setHost(String host) {
075            this.host = host;
076        }
077    
078        public int getPort() {
079            return port;
080        }
081    
082        public void setPort(int port) {
083            this.port = port;
084        }
085    
086        public String getPath() {
087            return path;
088        }
089    
090        public void setPath(String path) {
091            this.path = path;
092        }
093    
094        public String getQuery() {
095            return query;
096        }
097    
098        public void setQuery(String query) {
099            this.query = query;
100        }
101    
102        public String getUrl() {
103            try {
104                return new URI(protocol, null, host, port, path, query, null).toString();
105            } catch (URISyntaxException e) {
106                throw new IllegalStateException("Attributes don't form a valid URI: "+protocol+"://"+host+":"+port+"/"+path+"?"+query);
107            }
108        }
109    
110        public WireFormat getWireFormat() {
111            return wireFormat;
112        }
113    
114        public void setWireFormat(WireFormat wireFormat) {
115            this.wireFormat = wireFormat;
116        }
117    
118        public InetSocketAddress getListenAddress() {
119            return brokerConnector == null ? null : brokerConnector.getServerChannel().getSocketAddress();
120        }
121    
122        public synchronized void doStart() throws Exception {
123            ClassLoader old = Thread.currentThread().getContextClassLoader();
124            Thread.currentThread().setContextClassLoader(ActiveMQContainerGBean.class.getClassLoader());
125            try {
126                    if (brokerConnector == null) {
127                    urlAsStarted = getUrl();
128                        brokerConnector = createBrokerConnector(urlAsStarted);
129                        brokerConnector.start();
130                        ActiveMQConnectionFactory.registerBroker(urlAsStarted, brokerConnector);
131                    }
132            } finally {
133                    Thread.currentThread().setContextClassLoader(old);
134            }
135        }
136    
137        public synchronized void doStop() throws Exception {
138            if (brokerConnector != null) {
139                ActiveMQConnectionFactory.unregisterBroker(urlAsStarted);
140                BrokerConnector temp = brokerConnector;
141                brokerConnector = null;
142                temp.stop();
143            }
144        }
145    
146        public synchronized void doFail() {
147            if (brokerConnector != null) {
148                BrokerConnector temp = brokerConnector;
149                brokerConnector = null;
150                try {
151                    temp.stop();
152                }
153                catch (JMSException e) {
154                    log.info("Caught while closing due to failure: " + e, e);
155                }
156            }
157        }
158    
159        protected BrokerConnector createBrokerConnector(String url) throws Exception {
160            return new BrokerConnectorImpl(container.getBrokerContainer(), url, wireFormat);
161        }
162    
163        public static final GBeanInfo GBEAN_INFO;
164    
165        static {
166            GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("ActiveMQ Message Broker Connector", ActiveMQConnectorGBean.class, CONNECTOR_J2EE_TYPE);
167            infoFactory.addAttribute("url", String.class.getName(), false);
168            infoFactory.addAttribute("wireFormat", WireFormat.class.getName(), false);
169            infoFactory.addReference("activeMQContainer", ActiveMQContainer.class);
170            infoFactory.addInterface(ActiveMQConnector.class, new String[]{"host","port","protocol","path","query"},
171                     new String[]{"host","port"});
172            infoFactory.setConstructor(new GConstructorInfo(new String[]{"activeMQContainer", "protocol", "host", "port"}));
173            GBEAN_INFO = infoFactory.getBeanInfo();
174        }
175    
176        public static GBeanInfo getGBeanInfo() {
177            return GBEAN_INFO;
178        }
179    }