001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.ra; 019 020 import java.util.ArrayList; 021 import java.util.Iterator; 022 023 import javax.jms.Connection; 024 import javax.jms.ConnectionConsumer; 025 import javax.jms.ConnectionMetaData; 026 import javax.jms.Destination; 027 import javax.jms.ExceptionListener; 028 import javax.jms.IllegalStateException; 029 import javax.jms.JMSException; 030 import javax.jms.Queue; 031 import javax.jms.QueueConnection; 032 import javax.jms.QueueSession; 033 import javax.jms.ServerSessionPool; 034 import javax.jms.Session; 035 import javax.jms.Topic; 036 import javax.jms.TopicConnection; 037 import javax.jms.TopicSession; 038 039 import org.activemq.ActiveMQQueueSession; 040 import org.activemq.ActiveMQSession; 041 import org.activemq.ActiveMQTopicSession; 042 043 044 /** 045 * Acts as a pass through proxy for a JMS Connection object. 046 * It intercepts events that are of interest of the ActiveMQManagedConnection. 047 * 048 * @version $Revision: 1.1.1.1 $ 049 */ 050 public class JMSConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener { 051 052 private ActiveMQManagedConnection managedConnection; 053 private ArrayList sessions = new ArrayList(); 054 private ExceptionListener exceptionListener; 055 056 public JMSConnectionProxy(ActiveMQManagedConnection managedConnection) { 057 this.managedConnection = managedConnection; 058 } 059 060 /** 061 * Used to let the ActiveMQManagedConnection that this connection 062 * handel is not needed by the app. 063 * 064 * @throws JMSException 065 */ 066 public void close() throws JMSException { 067 if( managedConnection!=null ) { 068 managedConnection.proxyClosedEvent(this); 069 } 070 } 071 072 /** 073 * Called by the ActiveMQManagedConnection to invalidate this proxy. 074 */ 075 public void cleanup() { 076 exceptionListener=null; 077 managedConnection = null; 078 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 079 JMSSessionProxy p = (JMSSessionProxy) iter.next(); 080 try { 081 p.cleanup(); 082 } catch (JMSException ignore) { 083 } 084 iter.remove(); 085 } 086 } 087 088 /** 089 * 090 */ 091 private Connection getConnection() throws JMSException { 092 if (managedConnection == null) { 093 throw new IllegalStateException("The Connection is closed"); 094 } 095 return managedConnection.getPhysicalConnection(); 096 } 097 098 /** 099 * @param transacted 100 * @param acknowledgeMode 101 * @return 102 * @throws JMSException 103 */ 104 public Session createSession(boolean transacted, int acknowledgeMode) 105 throws JMSException { 106 return createSessionProxy(transacted, acknowledgeMode); 107 } 108 109 /** 110 * @param acknowledgeMode 111 * @param transacted 112 * @return 113 * @throws JMSException 114 */ 115 private JMSSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 116 ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode); 117 RATransactionContext txContext = new RATransactionContext(managedConnection.getTransactionContext()); 118 session.setTransactionContext(txContext); 119 JMSSessionProxy p = new JMSSessionProxy(session); 120 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 121 sessions.add(p); 122 return p; 123 } 124 125 public void setUseSharedTxContext(boolean enable) throws JMSException { 126 for (Iterator iter = sessions.iterator(); iter.hasNext();) { 127 JMSSessionProxy p = (JMSSessionProxy) iter.next(); 128 p.setUseSharedTxContext(enable); 129 } 130 } 131 132 /** 133 * @param transacted 134 * @param acknowledgeMode 135 * @return 136 * @throws JMSException 137 */ 138 public QueueSession createQueueSession(boolean transacted, 139 int acknowledgeMode) throws JMSException { 140 return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode)); 141 } 142 143 /** 144 * @param transacted 145 * @param acknowledgeMode 146 * @return 147 * @throws JMSException 148 */ 149 public TopicSession createTopicSession(boolean transacted, 150 int acknowledgeMode) throws JMSException { 151 return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode)); 152 } 153 154 /** 155 * @return 156 * @throws JMSException 157 */ 158 public String getClientID() throws JMSException { 159 return getConnection().getClientID(); 160 } 161 162 /** 163 * @return 164 * @throws JMSException 165 */ 166 public ExceptionListener getExceptionListener() throws JMSException { 167 return getConnection().getExceptionListener(); 168 } 169 170 /** 171 * @return 172 * @throws JMSException 173 */ 174 public ConnectionMetaData getMetaData() throws JMSException { 175 return getConnection().getMetaData(); 176 } 177 178 /** 179 * @param clientID 180 * @throws JMSException 181 */ 182 public void setClientID(String clientID) throws JMSException { 183 getConnection().setClientID(clientID); 184 } 185 186 /** 187 * @param listener 188 * @throws JMSException 189 */ 190 public void setExceptionListener(ExceptionListener listener) 191 throws JMSException { 192 getConnection(); 193 exceptionListener = listener; 194 } 195 196 /** 197 * @throws JMSException 198 */ 199 public void start() throws JMSException { 200 getConnection().start(); 201 } 202 203 /** 204 * @throws JMSException 205 */ 206 public void stop() throws JMSException { 207 getConnection().stop(); 208 } 209 210 211 /** 212 * @param queue 213 * @param messageSelector 214 * @param sessionPool 215 * @param maxMessages 216 * @return 217 * @throws JMSException 218 */ 219 public ConnectionConsumer createConnectionConsumer(Queue queue, 220 String messageSelector, ServerSessionPool sessionPool, 221 int maxMessages) throws JMSException { 222 throw new JMSException("Not Supported."); 223 } 224 225 /** 226 * @param topic 227 * @param messageSelector 228 * @param sessionPool 229 * @param maxMessages 230 * @return 231 * @throws JMSException 232 */ 233 public ConnectionConsumer createConnectionConsumer(Topic topic, 234 String messageSelector, ServerSessionPool sessionPool, 235 int maxMessages) throws JMSException { 236 throw new JMSException("Not Supported."); 237 } 238 239 /** 240 * @param destination 241 * @param messageSelector 242 * @param sessionPool 243 * @param maxMessages 244 * @return 245 * @throws JMSException 246 */ 247 public ConnectionConsumer createConnectionConsumer(Destination destination, 248 String messageSelector, ServerSessionPool sessionPool, 249 int maxMessages) throws JMSException { 250 throw new JMSException("Not Supported."); 251 } 252 253 /** 254 * @param topic 255 * @param subscriptionName 256 * @param messageSelector 257 * @param sessionPool 258 * @param maxMessages 259 * @return 260 * @throws JMSException 261 */ 262 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 263 String subscriptionName, String messageSelector, 264 ServerSessionPool sessionPool, int maxMessages) throws JMSException { 265 throw new JMSException("Not Supported."); 266 } 267 268 /** 269 * @return Returns the managedConnection. 270 */ 271 public ActiveMQManagedConnection getManagedConnection() { 272 return managedConnection; 273 } 274 275 public void onException(JMSException e) { 276 if(exceptionListener!=null && managedConnection!=null) { 277 try { 278 exceptionListener.onException(e); 279 } catch (Throwable ignore) { 280 // We can never trust user code so ignore any exceptions. 281 } 282 } 283 } 284 285 }