001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * Copyright 2004 Protique Ltd
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    package org.activemq.store.journal;
020    
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.IOException;
025    import java.util.ArrayList;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.sql.SQLException;
029    
030    import javax.jms.JMSException;
031    import javax.transaction.xa.XAException;
032    
033    import org.activeio.adapter.PacketByteArrayOutputStream;
034    import org.activeio.adapter.PacketInputStream;
035    import org.activeio.journal.InvalidRecordLocationException;
036    import org.activeio.journal.Journal;
037    import org.activeio.journal.JournalEventListener;
038    import org.activeio.journal.RecordLocation;
039    import org.activeio.journal.active.JournalImpl;
040    import org.activeio.journal.howl.HowlJournal;
041    import org.activemq.io.WireFormat;
042    import org.activemq.io.impl.StatelessDefaultWireFormat;
043    import org.activemq.message.ActiveMQMessage;
044    import org.activemq.message.ActiveMQXid;
045    import org.activemq.message.MessageAck;
046    import org.activemq.message.Packet;
047    import org.activemq.service.MessageIdentity;
048    import org.activemq.store.MessageStore;
049    import org.activemq.store.PersistenceAdapter;
050    import org.activemq.store.TopicMessageStore;
051    import org.activemq.store.TransactionStore;
052    import org.activemq.store.jdbc.JDBCPersistenceAdapter;
053    import org.activemq.store.journal.JournalTransactionStore.Tx;
054    import org.activemq.store.journal.JournalTransactionStore.TxOperation;
055    import org.activemq.util.JMSExceptionHelper;
056    import org.apache.commons.logging.Log;
057    import org.apache.commons.logging.LogFactory;
058    import org.objectweb.howl.log.Configuration;
059    
060    import EDU.oswego.cs.dl.util.concurrent.Channel;
061    import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
062    import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
063    import EDU.oswego.cs.dl.util.concurrent.Latch;
064    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
065    import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
066    import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
067    
068    /**
069     * An implementation of {@link PersistenceAdapter} designed for
070     * use with a {@link Journal} and then checkpointing asynchronously
071     * on a timeout with some other long term persistent storage.
072     *
073     * @version $Revision: 1.1 $
074     */
075    public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener {
076    
077        private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
078        public static final String DEFAULT_JOURNAL_TYPE = "default";
079        public static final String HOWL_JOURNAL_TYPE = "howl";
080        
081        private Journal journal;
082        private String journalType = DEFAULT_JOURNAL_TYPE;
083        private PersistenceAdapter longTermPersistence;
084        private File directory = new File("logs");
085        private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat();
086        private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
087        private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap();
088        
089        private static final int PACKET_RECORD_TYPE = 0;
090        private static final int COMMAND_RECORD_TYPE = 1;
091        private static final int TX_COMMAND_RECORD_TYPE = 2;
092        private static final int ACK_RECORD_TYPE = 3;
093    
094        private Channel checkpointRequests = new LinkedQueue();
095        private QueuedExecutor checkpointExecutor;
096        ClockDaemon clockDaemon;
097        private Object clockTicket;
098        private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
099        private int logFileSize=1024*1024*20;
100        private int logFileCount=2;
101        private long checkpointInterval = 1000 * 60 * 5;
102    
103        public JournalPersistenceAdapter() {
104            checkpointExecutor = new QueuedExecutor(new LinkedQueue());
105            checkpointExecutor.setThreadFactory(new ThreadFactory() {
106                public Thread newThread(Runnable runnable) {
107                    Thread answer = new Thread(runnable, "Checkpoint Worker");
108                    answer.setDaemon(true);
109                    answer.setPriority(Thread.MAX_PRIORITY);
110                    return answer;
111                }
112            });
113        }
114    
115        public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException {
116            this();
117            this.directory = directory;
118            this.longTermPersistence = longTermPersistence;
119        }
120    
121        public Map getInitialDestinations() {
122            return longTermPersistence.getInitialDestinations();
123        }
124        
125        private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException {
126            if(isQueue) {
127                return createQueueMessageStore(destination);
128            } else {
129                return createTopicMessageStore(destination);
130            }
131        }
132    
133        public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
134            JournalMessageStore store = (JournalMessageStore) messageStores.get(destinationName);
135            if( store == null ) {
136                    MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
137                    store = new JournalMessageStore(this, checkpointStore, destinationName);
138                    messageStores.put(destinationName, store);
139            }
140            return store;
141        }
142    
143        public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
144            JournalTopicMessageStore store = (JournalTopicMessageStore) topicMessageStores.get(destinationName);
145            if( store == null ) {
146                TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
147                    store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
148                    topicMessageStores.put(destinationName, store);
149            }
150            return store;
151        }
152    
153        public TransactionStore createTransactionStore() throws JMSException {
154            return transactionStore;
155        }
156    
157        public void beginTransaction() throws JMSException {
158            longTermPersistence.beginTransaction();
159        }
160    
161        public void commitTransaction() throws JMSException {
162            longTermPersistence.commitTransaction();
163        }
164    
165        public void rollbackTransaction() {
166            longTermPersistence.rollbackTransaction();
167        }
168    
169        public synchronized void start() throws JMSException {
170            
171            if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
172                // Disabled periodic clean up as it deadlocks with the checkpoint operations.
173                ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
174            }
175            
176            longTermPersistence.start();
177            createTransactionStore();
178            if (journal == null) {
179                try {
180                    log.info("Opening journal.");
181                    journal = createJournal();
182                    log.info("Opened journal: " + journal);
183                    journal.setJournalEventListener(this);
184                }
185                catch (Exception e) {
186                    throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
187                }
188                try {
189                    recover();
190                }
191                catch (Exception e) {
192                    throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
193                }
194            }
195    
196            // Do a checkpoint periodically.
197            clockTicket = getClockDaemon().executePeriodically(checkpointInterval, new Runnable() {
198                public void run() {
199                    checkpoint(false);
200                }
201            }, false);
202    
203        }
204    
205        public synchronized void stop() throws JMSException {
206    
207            if (clockTicket != null) {
208                // Stop the periodical checkpoint.
209                ClockDaemon.cancel(clockTicket);
210                clockTicket=null;
211                clockDaemon.shutDown();
212            }
213    
214            // Take one final checkpoint and stop checkpoint processing.
215            checkpoint(true);
216            checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
217    
218            JMSException firstException = null;
219            if (journal != null) {
220                try {
221                    journal.close();
222                    journal = null;
223                }
224                catch (Exception e) {
225                    firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e);
226                }
227            }
228            longTermPersistence.stop();
229    
230            if (firstException != null) {
231                throw firstException;
232            }
233        }
234    
235        // Properties
236        //-------------------------------------------------------------------------
237        public PersistenceAdapter getLongTermPersistence() {
238            return longTermPersistence;
239        }
240    
241        public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
242            this.longTermPersistence = longTermPersistence;
243        }
244    
245        /**
246         * @return Returns the directory.
247         */
248        public File getDirectory() {
249            return directory;
250        }
251    
252        /**
253         * @param directory The directory to set.
254         */
255        public void setDirectory(File directory) {
256            this.directory = directory;
257        }
258    
259        /**
260         * @return Returns the wireFormat.
261         */
262        public WireFormat getWireFormat() {
263            return wireFormat;
264        }
265    
266        public String getJournalType() {
267            return journalType;
268        }
269    
270        public void setJournalType(String journalType) {
271            this.journalType = journalType;
272        }
273    
274        protected Journal createJournal() throws IOException {
275            if( DEFAULT_JOURNAL_TYPE.equals(journalType) ) {
276                return new JournalImpl(directory,logFileCount,logFileSize);
277            }
278            
279            if( HOWL_JOURNAL_TYPE.equals(journalType) ) {
280                try {
281                    Configuration config = new Configuration();
282                    config.setLogFileDir(directory.getCanonicalPath());
283                    return new HowlJournal(config);
284                } catch (IOException e) {
285                    throw e;
286                } catch (Exception e) {
287                    throw (IOException)new IOException("Could not open HOWL journal: "+e.getMessage()).initCause(e);
288                }
289            }
290            
291            throw new IllegalStateException("Unsupported valued for journalType attribute: "+journalType);
292        }
293    
294        // Implementation methods
295        //-------------------------------------------------------------------------
296    
297        /**
298         * The Journal give us a call back so that we can move old data out of the journal.
299         * Taking a checkpoint does this for us.
300         *
301         * @see org.activemq.journal.JournalEventListener#overflowNotification(org.activemq.journal.RecordLocation)
302         */
303        public void overflowNotification(RecordLocation safeLocation) {
304            checkpoint(false);
305        }
306    
307        /**
308         * When we checkpoint we move all the journaled data to long term storage.
309         * @param b 
310         */
311        public void checkpoint(boolean sync) {
312            try {
313                
314                if( journal == null )
315                    throw new IllegalStateException("Journal is closed.");
316                
317                // Do the checkpoint asynchronously?
318                Latch latch=null;
319                if( sync ) {
320                    latch = new Latch();
321                    checkpointRequests.put(latch);
322                } else {
323                    checkpointRequests.put(Boolean.TRUE);
324                }
325                
326                checkpointExecutor.execute(new Runnable() {
327                    public void run() {
328    
329                        ArrayList listners = new ArrayList();
330                        
331                        try { 
332                            // Avoid running a checkpoint too many times in a row.
333                            // Consume any queued up checkpoint requests.
334                            try {
335                                boolean requested = false;
336                                Object t;
337                                while ((t=checkpointRequests.poll(0)) != null) {
338                                    if( t.getClass()==Latch.class )
339                                        listners.add(t);
340                                    requested = true;
341                                }
342                                if (!requested) {
343                                    return;
344                                }
345                            }
346                            catch (InterruptedException e1) {
347                                return;
348                            }
349        
350                            log.debug("Checkpoint started.");
351                            RecordLocation newMark = null;
352        
353                            Iterator iterator = messageStores.values().iterator();
354                            while (iterator.hasNext()) {
355                                try {
356                                    JournalMessageStore ms = (JournalMessageStore) iterator.next();
357                                    RecordLocation mark = ms.checkpoint();
358                                    if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
359                                        newMark = mark;
360                                    }
361                                }
362                                catch (Exception e) {
363                                    log.error("Failed to checkpoint a message store: " + e, e);
364                                }
365                            }
366                            
367                            iterator = topicMessageStores.values().iterator();
368                            while (iterator.hasNext()) {
369                                try {
370                                    JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
371                                    RecordLocation mark = ms.checkpoint();
372                                    if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
373                                        newMark = mark;
374                                    }
375                                }
376                                catch (Exception e) {
377                                    log.error("Failed to checkpoint a message store: " + e, e);
378                                }
379                            }
380                            
381                            try {
382                                if (newMark != null) {
383                                    if( log.isDebugEnabled() )
384                                        log.debug("Marking journal: "+newMark);
385                                    journal.setMark(newMark, true);
386                                }
387                            }
388                            catch (Exception e) {
389                                log.error("Failed to mark the Journal: " + e, e);
390                            }
391                            
392                            // Clean up the DB if it's a JDBC store.
393                            if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
394                                // Disabled periodic clean up as it deadlocks with the checkpoint operations.
395                                    try {
396                                            ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
397                                    } catch (SQLException sqle)  {
398                                            log.error("Cleanup failed due to: " + sqle, sqle);
399                                    }
400                            }
401    
402                            log.debug("Checkpoint done.");
403                        } finally {
404                            for (Iterator iter = listners.iterator(); iter.hasNext();) {
405                                Latch latch = (Latch) iter.next();
406                                latch.release();
407                            }
408                        }
409                    }
410                });
411    
412                if( sync ) {
413                    latch.acquire();
414                }
415            }
416            catch (InterruptedException e) {
417                log.warn("Request to start checkpoint failed: " + e, e);
418            }
419        }
420    
421        /**
422         * @param destinationName
423         * @param message
424         * @param sync
425         * @throws JMSException
426         */
427        public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
428            try {
429    
430                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
431                DataOutputStream os = new DataOutputStream(pos);
432                os.writeByte(PACKET_RECORD_TYPE);
433                os.writeUTF(destination);
434                os.close();
435                org.activeio.Packet p = wireFormat.writePacket(packet, pos);
436                return journal.write(p, sync);
437            }
438            catch (IOException e) {
439                throw createWriteException(packet, e);
440            }
441        }
442    
443        /**
444         * @param destinationName
445         * @param message
446         * @param sync
447         * @throws JMSException
448         */
449        public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
450            try {
451    
452                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
453                DataOutputStream os = new DataOutputStream(pos);
454                os.writeByte(COMMAND_RECORD_TYPE);
455                os.writeUTF(command);
456                os.close();
457                return journal.write(pos.getPacket(), sync);
458    
459            }
460            catch (IOException e) {
461                throw createWriteException(command, e);
462            }
463        }
464    
465        /**
466         * @param location
467         * @return
468         * @throws JMSException
469         */
470        public Packet readPacket(RecordLocation location) throws JMSException {
471            try {
472                org.activeio.Packet data = journal.read(location);
473                DataInputStream is = new DataInputStream(new PacketInputStream(data));
474                byte type = is.readByte();
475                if (type != PACKET_RECORD_TYPE) {
476                    throw new IOException("Record is not a packet type.");
477                }
478                String destination = is.readUTF();
479                Packet packet = wireFormat.readPacket(data);
480                is.close();
481                return packet;
482    
483            }
484            catch (InvalidRecordLocationException e) {
485                throw createReadException(location, e);
486            }
487            catch (IOException e) {
488                throw createReadException(location, e);
489            }
490        }
491    
492    
493        /**
494         * Move all the messages that were in the journal into long term storeage.  We just replay and do a checkpoint.
495         *
496         * @throws JMSException
497         * @throws IOException
498         * @throws InvalidRecordLocationException
499         * @throws IllegalStateException
500         */
501        private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
502    
503            RecordLocation pos = null;
504            int transactionCounter = 0;
505    
506            log.info("Journal Recovery Started.");
507    
508            // While we have records in the journal.
509            while ((pos = journal.getNextRecordLocation(pos)) != null) {
510                org.activeio.Packet data = journal.read(pos);
511                DataInputStream is = new DataInputStream(new PacketInputStream(data));
512    
513                // Read the destination and packate from the record.
514                String destination = null;
515                Packet packet = null;
516                try {
517                    byte type = is.readByte();
518                    switch (type) {
519                        case PACKET_RECORD_TYPE:
520    
521                            // Is the current packet part of the destination?
522                            destination = is.readUTF();
523                            packet = wireFormat.readPacket(data);
524    
525                            // Try to replay the packet.
526                            if (packet instanceof ActiveMQMessage) {
527                                ActiveMQMessage msg = (ActiveMQMessage) packet;
528                                
529                                JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue());
530                                if( msg.getTransactionId()!=null ) {
531                                    transactionStore.addMessage(store, msg, pos);
532                                } else {
533                                    store.replayAddMessage(msg);
534                                    transactionCounter++;
535                                }
536                            }
537                            else if (packet instanceof MessageAck) {
538                                MessageAck ack = (MessageAck) packet;
539                                JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, ack.getDestination().isQueue());
540                                if( ack.getTransactionId()!=null ) {
541                                    transactionStore.removeMessage(store, ack, pos);
542                                } else {
543                                    store.replayRemoveMessage(ack);
544                                    transactionCounter++;
545                                }
546                            }
547                            else {
548                                log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
549                            }
550    
551                            break;
552                        case TX_COMMAND_RECORD_TYPE:
553                            
554                            TxCommand command = new TxCommand();
555                            command.setType(is.readByte());
556                            command.setWasPrepared(is.readBoolean());
557                            switch(command.getType()) {
558                                    case TxCommand.LOCAL_COMMIT:
559                                    case TxCommand.LOCAL_ROLLBACK:
560                                        command.setTransactionId(is.readUTF());
561                                        break;
562                                    default:
563                                        command.setTransactionId(ActiveMQXid.read(is));
564                                            break;
565                            }
566                            
567                            // Try to replay the packet.
568                            switch(command.getType()) {
569                                    case TxCommand.XA_PREPARE:
570                                  transactionStore.replayPrepare(command.getTransactionId());
571                                        break;
572                                    case TxCommand.XA_COMMIT:
573                                    case TxCommand.LOCAL_COMMIT:
574                                  Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
575                                  // Replay the committed operations.
576                                  if( tx!=null) {
577                                      tx.getOperations();
578                                      for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
579                                          TxOperation op = (TxOperation) iter.next();
580                                          if( op.operationType == TxOperation.ADD_OPERATION_TYPE ) {
581                                              op.store.replayAddMessage((ActiveMQMessage) op.data);
582                                          }
583                                          if( op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
584                                              op.store.replayRemoveMessage((MessageAck) op.data);
585                                          }
586                                          if( op.operationType == TxOperation.ACK_OPERATION_TYPE) {
587                                              JournalAck ack = (JournalAck) op.data;
588                                              ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId()));
589                                          }
590                                      }
591                                      transactionCounter++;
592                                  }
593                                        break;
594                                    case TxCommand.LOCAL_ROLLBACK:
595                                    case TxCommand.XA_ROLLBACK:
596                                  transactionStore.replayRollback(command.getTransactionId());
597                                        break;
598                            }
599                            
600                            break;
601                            
602                        case ACK_RECORD_TYPE:
603                            
604                            destination = is.readUTF();
605                            String subscription = is.readUTF();
606                            String messageId = is.readUTF();
607                            Object transactionId=null;
608                            
609                            JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(destination, false);
610                            if( transactionId!=null ) {
611                                JournalAck ack = new JournalAck(destination, subscription, messageId, transactionId);
612                                transactionStore.acknowledge(store, ack, pos);
613                            } else {
614                                store.replayAcknowledge(subscription, new MessageIdentity(messageId));
615                                transactionCounter++;
616                            }
617                            
618                        case COMMAND_RECORD_TYPE:
619    
620                            break;
621                        default:
622                            log.error("Unknown type of record in transaction log which will be discarded: " + type);
623                            break;
624                    }
625                }
626                finally {
627                    is.close();
628                }
629            }
630    
631            RecordLocation location = writeCommand("RECOVERED", true);
632            journal.setMark(location, true);
633    
634            log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
635        }
636    
637        private JMSException createReadException(RecordLocation location, Exception e) {
638            return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
639        }
640    
641        protected JMSException createWriteException(Packet packet, Exception e) {
642            return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
643        }
644        
645        private XAException createWriteException(TxCommand command, Exception e) {
646            return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e);
647        }
648    
649    
650        protected JMSException createWriteException(String command, Exception e) {
651            return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
652        }
653    
654        protected JMSException createRecoveryFailedException(Exception e) {
655            return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
656        }
657    
658        public ClockDaemon getClockDaemon() {
659            if (clockDaemon == null) {
660                clockDaemon = new ClockDaemon();
661                clockDaemon.setThreadFactory(new ThreadFactory() {
662                    public Thread newThread(Runnable runnable) {
663                        Thread thread = new Thread(runnable, "Checkpoint Timer");
664                        thread.setDaemon(true);
665                        return thread;
666                    }
667                });
668            }
669            return clockDaemon;
670        }
671    
672        public void setClockDaemon(ClockDaemon clockDaemon) {
673            this.clockDaemon = clockDaemon;
674        }
675    
676        /**
677         * @param xid
678         * @return
679         */
680        public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException {
681            try {
682    
683                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
684                DataOutputStream os = new DataOutputStream(pos);
685                os.writeByte(TX_COMMAND_RECORD_TYPE);
686                os.writeByte(command.getType());
687                os.writeBoolean(command.getWasPrepared());
688                switch(command.getType()) {
689                    case TxCommand.LOCAL_COMMIT:
690                    case TxCommand.LOCAL_ROLLBACK:
691                        os.writeUTF( (String) command.getTransactionId() );
692                        break;
693                    default:
694                        ActiveMQXid xid = (ActiveMQXid) command.getTransactionId();
695                            xid.write(os);
696                            break;
697                }
698                os.close();
699                return journal.write(pos.getPacket(), sync);
700            }
701            catch (IOException e) {
702                throw createWriteException(command, e);
703            }
704        }
705    
706        /**
707         * @param destinationName
708         * @param persistentKey
709         * @param messageIdentity
710         * @param b
711         * @return
712         */
713        public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException{
714            try {
715    
716                PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
717                DataOutputStream os = new DataOutputStream(pos);
718                os.writeByte(ACK_RECORD_TYPE);
719                os.writeUTF(destinationName);
720                os.writeUTF(subscription);
721                os.writeUTF(messageIdentity.getMessageID());
722                os.close();
723                return journal.write(pos.getPacket(), sync);
724    
725            }
726            catch (IOException e) {
727                throw createWriteException("Ack for message: "+messageIdentity, e);
728            }
729        }
730    
731        public JournalTransactionStore getTransactionStore() {
732            return transactionStore;
733        }
734    
735        public int getLogFileCount() {
736            return logFileCount;
737        }
738    
739        public void setLogFileCount(int logFileCount) {
740            this.logFileCount = logFileCount;
741        }
742    
743        public int getLogFileSize() {
744            return logFileSize;
745        }
746    
747        public void setLogFileSize(int logFileSize) {
748            this.logFileSize = logFileSize;
749        }
750    
751        /**
752         * Verifies if a dead letter has already been sent for a message  
753         * @param seq
754         * @param useLocking to prevent concurrency/dups
755         * @return
756         */
757        public boolean deadLetterAlreadySent(long seq, boolean useLocking) {
758            return longTermPersistence.deadLetterAlreadySent(seq, useLocking);
759        }
760    
761        public long getCheckpointInterval() {
762            return checkpointInterval;
763        }
764        public void setCheckpointInterval(long checkpointInterval) {
765            this.checkpointInterval = checkpointInterval;
766        }
767    }