001    /**
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.jdbc.adapter;
019    
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Statement;
025    
026    import javax.jms.JMSException;
027    import javax.transaction.xa.XAException;
028    
029    import org.activemq.message.ActiveMQXid;
030    import org.activemq.service.SubscriberEntry;
031    import org.activemq.store.TransactionStore.RecoveryListener;
032    import org.activemq.store.jdbc.JDBCAdapter;
033    import org.activemq.store.jdbc.StatementProvider;
034    import org.activemq.util.LongSequenceGenerator;
035    import org.activemq.service.MessageIdentity;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * Implements all the default JDBC operations that are used
041     * by the JDBCPersistenceAdapter.
042     * <p/>
043     * Subclassing is encouraged to override the default
044     * implementation of methods to account for differences
045     * in JDBC Driver implementations.
046     * <p/>
047     * The JDBCAdapter inserts and extracts BLOB data using the
048     * getBytes()/setBytes() operations.
049     * <p/>
050     * The databases/JDBC drivers that use this adapter are:
051     * <ul>
052     * <li></li>
053     * </ul>
054     *
055     * @version $Revision: 1.1 $
056     */
057    public class DefaultJDBCAdapter implements JDBCAdapter {
058    
059        private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
060    
061        final protected StatementProvider statementProvider;
062        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
063    
064        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
065            s.setBytes(index, data);
066        }
067    
068        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
069            return rs.getBytes(index);
070        }
071    
072        /**
073         * @param provider
074         */
075        public DefaultJDBCAdapter(StatementProvider provider) {
076            this.statementProvider = new CachingStatementProvider(provider);
077        }
078    
079        public DefaultJDBCAdapter() {
080            this(new DefaultStatementProvider());
081        }
082    
083        public LongSequenceGenerator getSequenceGenerator() {
084            return sequenceGenerator;
085        }
086    
087        public void doCreateTables(Connection c) throws SQLException {
088            Statement s = null;
089            try {
090                s = c.createStatement();
091                String[] createStatments = statementProvider.getCreateSchemaStatments();
092                for (int i = 0; i < createStatments.length; i++) {
093                    // This will fail usually since the tables will be
094                    // created allready.
095                    try {
096                        boolean rc = s.execute(createStatments[i]);
097                    }
098                    catch (SQLException e) {
099                        log.info("Could not create JDBC tables; they could already exist." +
100                            " Failure was: " + createStatments[i] + " Message: " + e.getMessage() +
101                            " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
102                    }
103                }
104                c.commit();
105            }
106            finally {
107                try {
108                    s.close();
109                }
110                catch (Throwable e) {
111                }
112            }
113        }
114    
115        public void doDropTables(Connection c) throws SQLException {
116            Statement s = null;
117            try {
118                s = c.createStatement();
119                String[] dropStatments = statementProvider.getDropSchemaStatments();
120                for (int i = 0; i < dropStatments.length; i++) {
121                    // This will fail usually since the tables will be
122                    // created allready.
123                    try {
124                        boolean rc = s.execute(dropStatments[i]);
125                    }
126                    catch (SQLException e) {
127                        log.warn("Could not drop JDBC tables; they may not exist." +
128                            " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() +
129                            " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
130                    }
131                }
132                c.commit();
133            }
134            finally {
135                try {
136                    s.close();
137                }
138                catch (Throwable e) {
139                }
140            }
141        }
142    
143        public void initSequenceGenerator(Connection c) {
144            PreparedStatement s = null;
145            ResultSet rs = null;
146            try {
147                s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
148                rs = s.executeQuery();
149                long seq1 = 0;
150                if (rs.next()) {
151                    seq1 = rs.getLong(1);
152                }
153                rs.close();
154                s.close();
155                s = c.prepareStatement(statementProvider.getFindLastSequenceIdInAcks());
156                rs = s.executeQuery();
157                long seq2 = 0;
158                if (rs.next()) {
159                    seq2 = rs.getLong(1);
160                }
161                
162                sequenceGenerator.setLastSequenceId(Math.max(seq1, seq2));
163                log.debug("Last sequence id: "+sequenceGenerator.getLastSequenceId());
164            }
165            catch (SQLException e) {
166                log.warn("Failed to find last sequence number: " + e, e);
167            }
168            finally {
169                try {
170                    rs.close();
171                }
172                catch (Throwable e) {
173                }
174                try {
175                    s.close();
176                }
177                catch (Throwable e) {
178                }
179            }
180        }
181    
182        public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data, long expiration) throws SQLException, JMSException {
183            PreparedStatement s = null;
184            try {
185                s = c.prepareStatement(statementProvider.getAddMessageStatment());
186                s.setLong(1, seq);
187                s.setString(2, destinationName);
188                s.setString(3, messageID);
189                setBinaryData(s, 4, data);
190                s.setLong(5, expiration);
191                if (s.executeUpdate() != 1) {
192                    throw new JMSException("Failed to broker message: " + messageID + " in container.  ");
193                }
194            }
195            finally {
196                try {
197                    s.close();
198                }
199                catch (Throwable e) {
200                }
201            }
202        }
203        
204            public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException {
205            PreparedStatement s = null;
206            ResultSet rs = null;
207            try {
208    
209                s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment());
210                s.setString(1, messageID);
211                rs = s.executeQuery();
212    
213                if (!rs.next()) {
214                    return null;
215                }
216                return new Long( rs.getLong(1) );
217    
218            }
219            finally {
220                try {
221                    rs.close();
222                }
223                catch (Throwable e) {
224                }
225                try {
226                    s.close();
227                }
228                catch (Throwable e) {
229                }
230            }
231            }
232    
233        public byte[] doGetMessage(Connection c, long seq) throws SQLException {
234            PreparedStatement s = null;
235            ResultSet rs = null;
236            try {
237    
238                s = c.prepareStatement(statementProvider.getFindMessageStatment());
239                s.setLong(1, seq);
240                rs = s.executeQuery();
241    
242                if (!rs.next()) {
243                    return null;
244                }
245                return getBinaryData(rs, 1);
246    
247            }
248            finally {
249                try {
250                    rs.close();
251                }
252                catch (Throwable e) {
253                }
254                try {
255                    s.close();
256                }
257                catch (Throwable e) {
258                }
259            }
260        }
261    
262        public void doGetMessageForUpdate(Connection c, long seq, boolean useLocking, ExpiredMessageResultHandler handler) throws SQLException, JMSException {
263            PreparedStatement s = null;
264            ResultSet rs = null;
265            try {
266    
267                    if (useLocking) {
268                            s = c.prepareStatement(statementProvider.getFindMessageAttributesForUpdateStatment());
269                    } else {
270                            s = c.prepareStatement(statementProvider.getFindMessageAttributesStatment());
271                    }
272                s.setLong(1, seq);
273                rs = s.executeQuery();
274    
275                if (rs.next()) {
276                    String container = rs.getString(1);
277                    String msgid = rs.getString(2);
278                    boolean isSentToDeadLetter = rs.getString(3)!=null&&rs.getString(3).equals("Y");
279                    handler.onMessage(seq, container, msgid, isSentToDeadLetter);
280                }
281            }
282            finally {
283                try {
284                    rs.close();
285                }
286                catch (Throwable e) {
287                }
288                try {
289                    s.close();
290                }
291                catch (Throwable e) {
292                }
293            }
294        }
295    
296        public void doSetDeadLetterFlag(Connection c, long seq) throws SQLException, JMSException {
297                    PreparedStatement s = null;
298                    ResultSet rs = null;
299                    try {
300                        // Update the db with the updated blob
301                        s = c.prepareStatement(statementProvider.getSetDeadLetterFlagStatement());
302                        s.setLong(1, seq);
303                        int i = s.executeUpdate();
304                            if (i <= 0)
305                            throw new JMSException("Failed to broker message: " + seq
306                                    + " in container.");
307                    
308                    } finally {
309                        try {
310                            rs.close();
311                        } catch (Throwable e) {
312                        }
313                        try {
314                            s.close();
315                        } catch (Throwable e) {
316                        }
317                    }
318        }
319    
320        public void doRemoveMessage(Connection c, long seq) throws SQLException {
321            PreparedStatement s = null;
322            try {
323                s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
324                s.setLong(1, seq);
325                if (s.executeUpdate() != 1) {
326                    log.error("Could not delete sequenece number for: " + seq);
327                }
328            }
329            finally {
330                try {
331                    s.close();
332                }
333                catch (Throwable e) {
334                }
335            }
336        }
337    
338        public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException {
339            PreparedStatement s = null;
340            ResultSet rs = null;
341            try {
342    
343                s = c.prepareStatement(statementProvider.getFindAllMessagesStatment());
344                s.setString(1, destinationName);
345                rs = s.executeQuery();
346    
347                while (rs.next()) {
348                    long seq = rs.getLong(1);
349                    String msgid = rs.getString(2);
350                    listener.onMessage(seq, msgid);
351                }
352    
353            }
354            finally {
355                try {
356                    rs.close();
357                }
358                catch (Throwable e) {
359                }
360                try {
361                    s.close();
362                }
363                catch (Throwable e) {
364                }
365            }
366        }
367    
368        public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
369            PreparedStatement s = null;
370            try {
371                s = c.prepareStatement(statementProvider.getRemoveXidStatment());
372                s.setString(1, xid.toLocalTransactionId());
373                if (s.executeUpdate() != 1) {
374                    throw new XAException("Failed to remove prepared transaction: " + xid + ".");
375                }
376            }
377            finally {
378                try {
379                    s.close();
380                }
381                catch (Throwable e) {
382                }
383            }
384        }
385    
386    
387        public void doAddXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
388            PreparedStatement s = null;
389            try {
390    
391                s = c.prepareStatement(statementProvider.getAddXidStatment());
392                s.setString(1, xid.toLocalTransactionId());
393                if (s.executeUpdate() != 1) {
394                    throw new XAException("Failed to store prepared transaction: " + xid);
395                }
396    
397            }
398            finally {
399                try {
400                    s.close();
401                }
402                catch (Throwable e) {
403                }
404            }
405        }
406    
407        public void doLoadPreparedTransactions(Connection c, RecoveryListener listener) throws SQLException {
408            PreparedStatement s = null;
409            ResultSet rs = null;
410            try {
411    
412                s = c.prepareStatement(statementProvider.getFindAllXidStatment());
413                rs = s.executeQuery();
414    
415                while (rs.next()) {
416                    String id = rs.getString(1);
417                    
418                    
419                    /*
420                    byte data[] = this.getBinaryData(rs, 2);
421                    try {
422                        ActiveMQXid xid = new ActiveMQXid(id);
423                        Transaction transaction = XATransactionCommand.fromBytes(data);
424                        transactionManager.loadTransaction(xid, transaction);
425                    }
426                    catch (Exception e) {
427                        log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
428                    }
429                    */
430                }
431            }
432            finally {
433                try {
434                    rs.close();
435                }
436                catch (Throwable e) {
437                }
438                try {
439                    s.close();
440                }
441                catch (Throwable e) {
442                }
443            }
444        }
445        
446        /**
447         * @throws JMSException
448         * @see org.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long)
449         */
450        public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException {
451            PreparedStatement s = null;
452            try {
453                s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
454                s.setLong(1, seq);
455                s.setString(2, subscriptionID);
456                s.setString(3, destinationName);
457    
458                if (s.executeUpdate() != 1) {
459                    throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
460                }
461            }
462            finally {
463                try {
464                    s.close();
465                }
466                catch (Throwable e) {
467                }
468            }
469        }
470    
471        /**
472         * @throws JMSException
473         * @see org.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler)
474         */
475        public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException {
476    //        dumpTables(c, destinationName, subscriptionID);
477    
478            PreparedStatement s = null;
479            ResultSet rs = null;
480            try {
481    
482    //            System.out.println(statementProvider.getFindAllDurableSubMessagesStatment());
483                s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
484                s.setString(1, destinationName);
485                s.setString(2, subscriptionID);
486                rs = s.executeQuery();
487    
488                while (rs.next()) {
489                    long seq = rs.getLong(1);
490                    String msgid = rs.getString(2);
491                    listener.onMessage(seq, msgid);
492                }
493    
494            }
495            finally {
496                try {
497                    rs.close();
498                }
499                catch (Throwable e) {
500                }
501                try {
502                    s.close();
503                }
504                catch (Throwable e) {
505                }
506            }
507        }
508    
509        /**
510         * @see org.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.activemq.service.SubscriberEntry)
511         */
512        public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException {
513    
514            PreparedStatement s = null;
515            try {
516                s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment());
517                s.setInt(1, subscriberEntry.getSubscriberID());
518                s.setString(2, subscriberEntry.getClientID());
519                s.setString(3, subscriberEntry.getConsumerName());
520                s.setString(4, subscriberEntry.getSelector());
521                s.setString(5, sub);
522                s.setString(6, destinationName);
523                
524                // If the sub was not there then we need to create it.
525                if (s.executeUpdate() != 1) {
526                    s.close();
527                    
528                    long id=0;
529                    ResultSet rs=null;
530                    s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
531                    try {
532                        rs = s.executeQuery();
533                        if (rs.next()) {
534                            id = rs.getLong(1);
535                        }
536                    } finally {
537                        try {
538                            rs.close();
539                        } catch (Throwable e) {
540                        }
541                    }
542                    s.close();
543                    
544                    s = c.prepareStatement(statementProvider.getCreateDurableSubStatment());
545                    s.setInt(1, subscriberEntry.getSubscriberID());
546                    s.setString(2, subscriberEntry.getClientID());
547                    s.setString(3, subscriberEntry.getConsumerName());
548                    s.setString(4, subscriberEntry.getSelector());
549                    s.setString(5, sub);
550                    s.setString(6, destinationName);
551                    
552                    s.setLong(7, id);
553    
554                    if (s.executeUpdate() != 1) {
555                        log.error("Failed to store durable subscription for: " + sub);
556                    }
557                }
558            }
559            finally {
560                try {
561                    s.close();
562                }
563                catch (Throwable e) {
564                }
565            }
566        }
567    
568        /**
569         * @see org.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object)
570         */
571        public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException {
572            PreparedStatement s = null;
573            ResultSet rs = null;
574            try {
575    
576                s = c.prepareStatement(statementProvider.getFindDurableSubStatment());
577                s.setString(1, sub);
578                s.setString(2, destinationName);
579                rs = s.executeQuery();
580    
581                if (!rs.next()) {
582                    return null;
583                }
584    
585                SubscriberEntry answer = new SubscriberEntry();
586                answer.setSubscriberID(rs.getInt(1));
587                answer.setClientID(rs.getString(2));
588                answer.setConsumerName(rs.getString(3));
589                answer.setDestination(rs.getString(4));
590    
591                return answer;
592    
593            }
594            finally {
595                try {
596                    rs.close();
597                }
598                catch (Throwable e) {
599                }
600                try {
601                    s.close();
602                }
603                catch (Throwable e) {
604                }
605            }
606        }
607    
608        public void doRemoveAllMessages(Connection c, String destinationName) throws SQLException, JMSException {
609            PreparedStatement s = null;
610            try {
611                s = c.prepareStatement(statementProvider.getRemoveAllMessagesStatment());
612                s.setString(1, destinationName);
613                s.executeUpdate();
614                s.close();
615                
616                s = c.prepareStatement(statementProvider.getRemoveAllSubscriptionsStatment());
617                s.setString(1, destinationName);
618                s.executeUpdate();
619                
620            }
621            finally {
622                try {
623                    s.close();
624                }
625                catch (Throwable e) {
626                }
627            }
628        }
629    
630        public void doDeleteSubscription(Connection c, String destinationName, String subscription) throws SQLException, JMSException {
631            PreparedStatement s = null;
632            try {
633                s = c.prepareStatement(statementProvider.getDeleteSubscriptionStatment());
634                s.setString(1, subscription);
635                s.setString(2, destinationName);
636    
637                s.executeUpdate();
638            }
639            finally {
640                try {
641                    s.close();
642                }
643                catch (Throwable e) {
644                }
645            }
646        }
647    
648        public void doDeleteOldMessages(Connection c) throws SQLException, JMSException {
649            PreparedStatement s = null;
650            try {
651                s = c.prepareStatement(statementProvider.getDeleteOldMessagesStatment());
652                //s.setLong(1, System.currentTimeMillis());
653                int i = s.executeUpdate();
654                log.debug("Deleted "+i+" old message(s).");
655            }
656            finally {
657                try {
658                    s.close();
659                }
660                catch (Throwable e) {
661                }
662            }
663        }
664    
665        public void doGetExpiredMessages(Connection c, ExpiredMessageResultHandler handler) throws SQLException, JMSException {
666            PreparedStatement s = null;
667            ResultSet rs = null;
668            try {
669                s = c.prepareStatement(statementProvider.getFindExpiredMessagesStatment());
670                s.setLong(1, System.currentTimeMillis());
671                rs = s.executeQuery();
672                while(rs.next()) {
673                    long seq = rs.getLong(1);
674                    String container = rs.getString(2);
675                    String msgid = rs.getString(3);
676                    boolean isSentToDeadLetter = rs.getString(4)!=null&&rs.getString(4).equals("Y");
677                    handler.onMessage(seq, container, msgid, isSentToDeadLetter);
678                }
679            }
680            finally {
681                try {
682                    s.close();
683                }
684                catch (Throwable e) {
685                }
686            }
687        }
688    
689        public void doDeleteExpiredMessage(Connection c, MessageIdentity messageIdentity) throws SQLException, JMSException {
690            PreparedStatement s = null;
691            ResultSet rs = null;
692            try {
693                s = c.prepareStatement(statementProvider.getDeleteMessageStatement());
694                Long seq = (Long)messageIdentity.getSequenceNumber();
695                s.setLong(1, seq.longValue());
696                s.setString(2, messageIdentity.getMessageID());
697                int i = s.executeUpdate();
698                log.debug("Deleted "+i+" old message.");
699            }
700            finally {
701                try {
702                    s.close();
703                }
704                catch (Throwable e) {
705                }
706            }
707        }
708    
709        public StatementProvider getStatementProvider() {
710            return statementProvider;
711        }
712    
713        /*
714         * Usefull for debuging.
715         *
716        public void dumpTables(Connection c, String destinationName, String subscriptionID) throws SQLException {        
717            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
718            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
719            PreparedStatement s = c.prepareStatement("SELECT M.ID, M.MSGID " +
720                    "FROM ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D  " +
721                    "WHERE D.CONTAINER=? AND D.SUB=?  " +
722                    "AND M.CONTAINER=D.CONTAINER " +
723                    "AND M.ID > D.LAST_ACKED_ID " +
724                    "ORDER BY M.ID");
725            s.setString(1,destinationName);
726            s.setString(2,subscriptionID);        
727            printQuery(s,System.out);
728        }
729    
730        private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
731            printQuery(c.prepareStatement(query), out);
732        }
733        
734        private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
735            
736            ResultSet set=null;
737            try {
738                set = s.executeQuery();
739                ResultSetMetaData metaData = set.getMetaData();
740                for( int i=1; i<= metaData.getColumnCount(); i++ ) {
741                    if(i==1)
742                        out.print("||");
743                    out.print(metaData.getColumnName(i)+"||");
744                }
745                out.println();
746                while(set.next()) {
747                    for( int i=1; i<= metaData.getColumnCount(); i++ ) {
748                        if(i==1)
749                            out.print("|");
750                        out.print(set.getString(i)+"|");
751                    }
752                    out.println();
753                }
754            } finally {
755                try { set.close(); } catch (Throwable ignore) {}
756                try { s.close(); } catch (Throwable ignore) {}
757            }
758        }
759        */
760    }