001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * Copyright 2004 Hiram Chirino 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq; 020 021 import java.util.ArrayList; 022 023 import javax.jms.JMSException; 024 import javax.jms.TransactionInProgressException; 025 import javax.jms.TransactionRolledBackException; 026 import javax.transaction.xa.XAException; 027 import javax.transaction.xa.XAResource; 028 import javax.transaction.xa.Xid; 029 030 import org.activemq.message.ActiveMQXid; 031 import org.activemq.message.IntResponseReceipt; 032 import org.activemq.message.ResponseReceipt; 033 import org.activemq.message.TransactionInfo; 034 import org.activemq.message.XATransactionInfo; 035 import org.activemq.util.IdGenerator; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * A TransactionContext provides the means to control a JMS transaction. It provides 041 * a local transaction interface and also an XAResource interface. 042 * 043 * <p/> 044 * An application server controls the transactional assignment of an XASession 045 * by obtaining its XAResource. It uses the XAResource to assign the session 046 * to a transaction, prepare and commit work on the transaction, and so on. 047 * <p/> 048 * An XAResource provides some fairly sophisticated facilities for 049 * interleaving work on multiple transactions, recovering a list of 050 * transactions in progress, and so on. A JTA aware JMS provider must fully 051 * implement this functionality. This could be done by using the services of a 052 * database that supports XA, or a JMS provider may choose to implement this 053 * functionality from scratch. 054 * <p/> 055 * 056 * @version $Revision: 1.1.1.1 $ 057 * @see javax.jms.Session 058 * @see javax.jms.QueueSession 059 * @see javax.jms.TopicSession 060 * @see javax.jms.XASession 061 */ 062 public class TransactionContext implements XAResource { 063 064 private static final Log log = LogFactory.getLog(TransactionContext.class); 065 066 private final ActiveMQConnection connection; 067 private final ArrayList sessions = new ArrayList(2); 068 private final IdGenerator localTransactionIdGenerator = new IdGenerator(); 069 070 // To track XA transactions. 071 private Xid associatedXid; 072 private ActiveMQXid activeXid; 073 074 // To track local transactions. 075 private String localTransactionId; 076 077 private LocalTransactionEventListener localTransactionEventListener; 078 079 public TransactionContext(ActiveMQConnection connection) { 080 this.connection = connection; 081 } 082 083 public boolean isInXATransaction() { 084 return associatedXid!=null; 085 } 086 087 public boolean isInLocalTransaction() { 088 return localTransactionId!=null; 089 } 090 091 /** 092 * @return Returns the localTransactionEventListener. 093 */ 094 public LocalTransactionEventListener getLocalTransactionEventListener() { 095 return localTransactionEventListener; 096 } 097 098 /** 099 * Used by the resource adapter to listen to transaction events. 100 * 101 * @param localTransactionEventListener The localTransactionEventListener to set. 102 */ 103 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 104 this.localTransactionEventListener = localTransactionEventListener; 105 } 106 107 ///////////////////////////////////////////////////////////// 108 // 109 // Methods that interface with the session 110 // 111 ///////////////////////////////////////////////////////////// 112 public void addSession(ActiveMQSession session) { 113 sessions.add(session); 114 } 115 public void removeSession(ActiveMQSession session) { 116 sessions.remove(session); 117 } 118 119 private void postRollback() { 120 int size = sessions.size(); 121 for(int i=0; i < size; i++ ){ 122 ((ActiveMQSession)sessions.get(i)).redeliverUnacknowledgedMessages(true); 123 } 124 } 125 126 private void postCommit() { 127 int size = sessions.size(); 128 for(int i=0; i < size; i++ ){ 129 ((ActiveMQSession)sessions.get(i)).clearDeliveredMessages(); 130 } 131 } 132 133 public Object getTransactionId() { 134 if( localTransactionId!=null ) 135 return localTransactionId; 136 return activeXid; 137 } 138 139 ///////////////////////////////////////////////////////////// 140 // 141 // Local transaction interface. 142 // 143 ///////////////////////////////////////////////////////////// 144 145 /** 146 * Start a local transaction. 147 */ 148 public void begin() throws JMSException { 149 if( associatedXid!=null ) 150 throw new TransactionInProgressException("Cannot start local transction. XA transaction is allready in progress."); 151 152 if( localTransactionId==null ) { 153 this.localTransactionId = localTransactionIdGenerator.generateId(); 154 TransactionInfo info = new TransactionInfo(); 155 info.setTransactionId((String)localTransactionId); 156 info.setType(TransactionInfo.START); 157 this.connection.asyncSendPacket(info); 158 159 // Notify the listener that the tx was started. 160 if (localTransactionEventListener != null) { 161 localTransactionEventListener.beginEvent(); 162 } 163 if( log.isDebugEnabled() ) 164 log.debug("Started local transaction: "+localTransactionId); 165 } 166 } 167 168 /** 169 * Rolls back any messages done in this transaction and releases any locks currently held. 170 * 171 * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error. 172 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session. 173 */ 174 public void rollback() throws JMSException { 175 if( associatedXid!=null ) 176 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is allready in progress "); 177 178 if( localTransactionId!=null ) { 179 TransactionInfo info = new TransactionInfo(); 180 info.setTransactionId((String)localTransactionId); 181 info.setType(TransactionInfo.ROLLBACK); 182 //before we send, update the current transaction id 183 this.localTransactionId = null; 184 this.connection.asyncSendPacket(info); 185 // Notify the listener that the tx was rolled back 186 if (localTransactionEventListener != null) { 187 localTransactionEventListener.rollbackEvent(); 188 } 189 if( log.isDebugEnabled() ) 190 log.debug("Rolledback local transaction: "+localTransactionId); 191 } 192 postRollback(); 193 } 194 195 /** 196 * Commits all messages done in this transaction and releases any locks currently held. 197 * 198 * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error. 199 * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during 200 * commit. 201 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session. 202 */ 203 public void commit() throws JMSException { 204 if( associatedXid!=null ) 205 throw new TransactionInProgressException("Cannot commit() if an XA transaction is allready in progress "); 206 207 // Only send commit if the transaction was started. 208 if (localTransactionId!=null) { 209 TransactionInfo info = new TransactionInfo(); 210 info.setTransactionId((String)localTransactionId); 211 info.setType(TransactionInfo.COMMIT); 212 //before we send, update the current transaction id 213 this.localTransactionId = null; 214 // Notify the listener that the tx was commited back 215 this.connection.syncSendPacket(info); 216 if (localTransactionEventListener != null) { 217 localTransactionEventListener.commitEvent(); 218 } 219 if( log.isDebugEnabled() ) 220 log.debug("Committed local transaction: "+localTransactionId); 221 } 222 postCommit(); 223 } 224 225 ///////////////////////////////////////////////////////////// 226 // 227 // XAResource Implementation 228 // 229 ///////////////////////////////////////////////////////////// 230 /** 231 * Associates a transaction with the resource. 232 */ 233 public void start(Xid xid, int flags) throws XAException { 234 if( localTransactionId!=null ) 235 throw new XAException(XAException.XAER_PROTO); 236 237 // Are we allready associated? 238 if (associatedXid != null) { 239 throw new XAException(XAException.XAER_PROTO); 240 } 241 242 if ((flags & TMJOIN) == TMJOIN) { 243 // TODO: verify that the server has seen the xid 244 } 245 if ((flags & TMJOIN) == TMRESUME) { 246 // TODO: verify that the xid was suspended. 247 } 248 249 // associate 250 setXid(xid); 251 252 } 253 254 public void end(Xid xid, int flags) throws XAException { 255 if( localTransactionId!=null ) 256 throw new XAException(XAException.XAER_PROTO); 257 258 if ((flags & TMSUSPEND) == TMSUSPEND) { 259 // You can only suspend the associated xid. 260 if (associatedXid == null || !ActiveMQXid.equals(associatedXid,xid)) { 261 throw new XAException(XAException.XAER_PROTO); 262 } 263 264 //TODO: we may want to put the xid in a suspended list. 265 setXid(null); 266 } else if ((flags & TMFAIL) == TMFAIL) { 267 //TODO: We need to rollback the transaction?? 268 setXid(null); 269 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 270 //set to null if this is the current xid. 271 //otherwise this could be an asynchronous success call 272 if (ActiveMQXid.equals(associatedXid,xid)) { 273 setXid(null); 274 } 275 } else { 276 throw new XAException(XAException.XAER_INVAL); 277 } 278 if( log.isDebugEnabled() ) 279 log.debug("Ended XA transaction: "+activeXid); 280 281 } 282 283 public int prepare(Xid xid) throws XAException { 284 285 // We allow interleaving multiple transactions, so 286 // we don't limit prepare to the associated xid. 287 ActiveMQXid x; 288 //THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first 289 if (ActiveMQXid.equals(associatedXid,xid)) { 290 throw new XAException(XAException.XAER_PROTO); 291 } else { 292 //TODO cache the known xids so we don't keep recreating this one?? 293 x = new ActiveMQXid(xid); 294 } 295 296 XATransactionInfo info = new XATransactionInfo(); 297 info.setXid(x); 298 info.setType(XATransactionInfo.PRE_COMMIT); 299 300 try { 301 if( log.isDebugEnabled() ) 302 log.debug("Preparing XA transaction: "+x); 303 304 // Find out if the server wants to commit or rollback. 305 IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info); 306 return receipt.getResult(); 307 } catch (JMSException e) { 308 throw toXAException(e); 309 } 310 } 311 312 public void rollback(Xid xid) throws XAException { 313 314 // We allow interleaving multiple transactions, so 315 // we don't limit rollback to the associated xid. 316 ActiveMQXid x; 317 if (ActiveMQXid.equals(associatedXid,xid)) { 318 //I think this can happen even without an end(xid) call. Need to check spec. 319 x = activeXid; 320 } else { 321 x = new ActiveMQXid(xid); 322 } 323 324 XATransactionInfo info = new XATransactionInfo(); 325 info.setXid(x); 326 info.setType(XATransactionInfo.ROLLBACK); 327 328 try { 329 if( log.isDebugEnabled() ) 330 log.debug("Rollingback XA transaction: "+x); 331 332 // Let the server know that the tx is rollback. 333 this.connection.syncSendPacket(info); 334 } catch (JMSException e) { 335 throw toXAException(e); 336 } 337 338 postRollback(); 339 } 340 341 // XAResource interface 342 public void commit(Xid xid, boolean onePhase) throws XAException { 343 344 // We allow interleaving multiple transactions, so 345 // we don't limit commit to the associated xid. 346 ActiveMQXid x; 347 if (ActiveMQXid.equals(associatedXid,xid)) { 348 //should never happen, end(xid,TMSUCCESS) must have been previously called 349 throw new XAException(XAException.XAER_PROTO); 350 } else { 351 x = new ActiveMQXid(xid); 352 } 353 354 XATransactionInfo info = new XATransactionInfo(); 355 info.setXid(x); 356 info.setType(onePhase ? XATransactionInfo.COMMIT_ONE_PHASE : XATransactionInfo.COMMIT); 357 358 try { 359 if( log.isDebugEnabled() ) 360 log.debug("Committing XA transaction: "+x); 361 362 // Notify the server that the tx was commited back 363 this.connection.syncSendPacket(info); 364 } catch (JMSException e) { 365 throw toXAException(e); 366 } 367 368 postCommit(); 369 } 370 371 372 public void forget(Xid xid) throws XAException { 373 374 // We allow interleaving multiple transactions, so 375 // we don't limit forget to the associated xid. 376 ActiveMQXid x; 377 if (ActiveMQXid.equals(associatedXid,xid)) { 378 //TODO determine if this can happen... I think not. 379 x = activeXid; 380 } else { 381 x = new ActiveMQXid(xid); 382 } 383 384 XATransactionInfo info = new XATransactionInfo(); 385 info.setXid(x); 386 info.setType(XATransactionInfo.FORGET); 387 388 try { 389 if( log.isDebugEnabled() ) 390 log.debug("Forgetting XA transaction: "+x); 391 392 // Tell the server to forget the transaction. 393 this.connection.syncSendPacket(info); 394 } catch (JMSException e) { 395 throw toXAException(e); 396 } 397 } 398 399 public boolean isSameRM(XAResource xaResource) throws XAException { 400 if (xaResource == null) { 401 return false; 402 } 403 if (!(xaResource instanceof TransactionContext)) { 404 return false; 405 } 406 TransactionContext xar = (TransactionContext) xaResource; 407 try { 408 return getResourceManagerId().equals(xar.getResourceManagerId()); 409 } catch (Throwable e) { 410 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 411 } 412 } 413 414 415 public Xid[] recover(int flag) throws XAException { 416 417 XATransactionInfo info = new XATransactionInfo(); 418 info.setType(XATransactionInfo.XA_RECOVER); 419 420 try { 421 ResponseReceipt receipt = (ResponseReceipt) this.connection.syncSendRequest(info); 422 return (ActiveMQXid[]) receipt.getResult(); 423 } catch (JMSException e) { 424 throw toXAException(e); 425 } 426 } 427 428 public int getTransactionTimeout() throws XAException { 429 430 XATransactionInfo info = new XATransactionInfo(); 431 info.setType(XATransactionInfo.GET_TX_TIMEOUT); 432 433 try { 434 // get the tx timeout that was set. 435 IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info); 436 return receipt.getResult(); 437 } catch (JMSException e) { 438 throw toXAException(e); 439 } 440 } 441 442 public boolean setTransactionTimeout(int seconds) throws XAException { 443 444 XATransactionInfo info = new XATransactionInfo(); 445 info.setType(XATransactionInfo.SET_TX_TIMEOUT); 446 info.setTransactionTimeout(seconds); 447 448 try { 449 // Setup the new tx timeout 450 this.connection.asyncSendPacket(info); 451 return true; 452 } catch (JMSException e) { 453 throw toXAException(e); 454 } 455 } 456 457 ///////////////////////////////////////////////////////////// 458 // 459 // Helper methods. 460 // 461 ///////////////////////////////////////////////////////////// 462 private String getResourceManagerId() throws JMSException { 463 return this.connection.getResourceManagerId(); 464 } 465 466 private void setXid(Xid xid) throws XAException { 467 if (xid != null) { 468 // associate 469 associatedXid = xid; 470 activeXid = new ActiveMQXid(xid); 471 472 XATransactionInfo info = new XATransactionInfo(); 473 info.setXid(activeXid); 474 info.setType(XATransactionInfo.START); 475 try { 476 this.connection.asyncSendPacket(info); 477 if( log.isDebugEnabled() ) 478 log.debug("Started XA transaction: "+activeXid); 479 } catch (JMSException e) { 480 throw toXAException(e); 481 } 482 483 } else { 484 485 if( activeXid!=null ) { 486 XATransactionInfo info = new XATransactionInfo(); 487 info.setXid(activeXid); 488 info.setType(XATransactionInfo.END); 489 try { 490 this.connection.syncSendPacket(info); 491 if( log.isDebugEnabled() ) 492 log.debug("Ended XA transaction: "+activeXid); 493 } catch (JMSException e) { 494 throw toXAException(e); 495 } 496 } 497 498 // dis-associate 499 associatedXid = null; 500 activeXid = null; 501 } 502 } 503 504 /** 505 * Converts a JMSException from the server to an XAException. 506 * if the JMSException contained a linked XAException that is 507 * returned instead. 508 * 509 * @param e 510 * @return 511 */ 512 private XAException toXAException(JMSException e) { 513 if (e.getCause() != null && e.getCause() instanceof XAException) { 514 XAException original = (XAException) e.getCause(); 515 XAException xae = new XAException(original.getMessage()); 516 xae.errorCode = original.errorCode; 517 xae.initCause(original); 518 return xae; 519 } 520 521 XAException xae = new XAException(e.getMessage()); 522 xae.errorCode = XAException.XAER_RMFAIL; 523 xae.initCause(e); 524 return xae; 525 } 526 527 public ActiveMQConnection getConnection() { 528 return connection; 529 } 530 531 }