001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * Copyright 2004 Protique Ltd 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 * 018 **/ 019 package org.activemq.store.jdbc; 020 021 import java.sql.Connection; 022 import java.sql.SQLException; 023 024 import javax.jms.JMSException; 025 026 import org.activemq.io.WireFormat; 027 import org.activemq.message.ConsumerInfo; 028 import org.activemq.service.MessageIdentity; 029 import org.activemq.service.SubscriberEntry; 030 import org.activemq.store.RecoveryListener; 031 import org.activemq.store.TopicMessageStore; 032 import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler; 033 import org.activemq.util.JMSExceptionHelper; 034 035 /** 036 * @version $Revision: 1.1 $ 037 */ 038 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 039 040 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) { 041 super(persistenceAdapter, adapter, wireFormat, destinationName); 042 } 043 044 public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException { 045 long seq = getMessageSequenceId(messageIdentity); 046 // Get a connection and insert the message into the DB. 047 Connection c = null; 048 try { 049 c = persistenceAdapter.getConnection(); 050 adapter.doSetLastAck(c, destinationName, subscription, seq); 051 } 052 catch (SQLException e) { 053 throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e); 054 } 055 finally { 056 persistenceAdapter.returnConnection(c); 057 } 058 } 059 060 /** 061 * @see org.activemq.store.TopicMessageStore#getLastestMessageIdentity() 062 */ 063 public MessageIdentity getLastestMessageIdentity() throws JMSException { 064 return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId())); 065 } 066 067 /** 068 * 069 */ 070 public void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, final RecoveryListener listener) throws JMSException { 071 072 Connection c = null; 073 try { 074 c = persistenceAdapter.getConnection(); 075 adapter.doRecoverSubscription(c, destinationName, subscriptionId, new MessageListResultHandler() { 076 public void onMessage(long seq, String messageID) throws JMSException { 077 MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq)); 078 listener.recoverMessage(messageIdentity); 079 } 080 }); 081 } 082 catch (SQLException e) { 083 throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscriptionId + ". Reason: " + e, e); 084 } 085 finally { 086 persistenceAdapter.returnConnection(c); 087 } 088 } 089 090 /** 091 * @see org.activemq.store.TopicMessageStore#setSubscriberEntry(org.activemq.message.ConsumerInfo, org.activemq.service.SubscriberEntry) 092 */ 093 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException { 094 String key = info.getConsumerKey(); 095 Connection c = null; 096 try { 097 c = persistenceAdapter.getConnection(); 098 adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry); 099 } 100 catch (SQLException e) { 101 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e); 102 } 103 finally { 104 persistenceAdapter.returnConnection(c); 105 } 106 } 107 108 /** 109 * @see org.activemq.store.TopicMessageStore#getSubscriberEntry(org.activemq.message.ConsumerInfo) 110 */ 111 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException { 112 String key = info.getConsumerKey(); 113 Connection c = null; 114 try { 115 c = persistenceAdapter.getConnection(); 116 return adapter.doGetSubscriberEntry(c, destinationName, key); 117 } 118 catch (SQLException e) { 119 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e); 120 } 121 finally { 122 persistenceAdapter.returnConnection(c); 123 } 124 } 125 126 public void deleteSubscription(String subscription) throws JMSException { 127 Connection c = null; 128 try { 129 c = persistenceAdapter.getConnection(); 130 adapter.doDeleteSubscription(c, destinationName, subscription); 131 } 132 catch (SQLException e) { 133 throw JMSExceptionHelper.newJMSException("Failed to remove subscription for: " + subscription + ". Reason: " + e, e); 134 } 135 finally { 136 persistenceAdapter.returnConnection(c); 137 } 138 } 139 140 public void incrementMessageCount(MessageIdentity messageId) throws JMSException { 141 } 142 143 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity) throws JMSException { 144 } 145 146 }