001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc.adapter; 019 020 import java.io.ByteArrayOutputStream; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.OutputStream; 024 import java.sql.Blob; 025 import java.sql.Connection; 026 import java.sql.PreparedStatement; 027 import java.sql.ResultSet; 028 import java.sql.SQLException; 029 030 import javax.jms.JMSException; 031 032 import org.activemq.store.jdbc.StatementProvider; 033 034 035 /** 036 * This JDBCAdapter inserts and extracts BLOB data using the 037 * getBlob()/setBlob() operations. This is a little more involved 038 * since to insert a blob you have to: 039 * 040 * 1: insert empty blob. 041 * 2: select the blob 042 * 3: finally update the blob with data value. 043 * 044 * The databases/JDBC drivers that use this adapter are: 045 * <ul> 046 * <li></li> 047 * </ul> 048 * 049 * @version $Revision: 1.1 $ 050 */ 051 public class BlobJDBCAdapter extends DefaultJDBCAdapter { 052 053 public BlobJDBCAdapter() { 054 super(); 055 } 056 057 public BlobJDBCAdapter(StatementProvider provider) { 058 super(provider); 059 } 060 061 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, 062 JMSException { 063 PreparedStatement s = null; 064 ResultSet rs = null; 065 try { 066 067 // Add the Blob record. 068 s = c.prepareStatement(statementProvider.getAddMessageStatment()); 069 s.setLong(1, seq); 070 s.setString(2, destinationName); 071 s.setString(3, messageID); 072 s.setString(4, " "); 073 074 if (s.executeUpdate() != 1) 075 throw new JMSException("Failed to broker message: " + messageID 076 + " in container."); 077 s.close(); 078 079 // Select the blob record so that we can update it. 080 s = c.prepareStatement(statementProvider.getFindMessageStatment()); 081 s.setLong(1, seq); 082 rs = s.executeQuery(); 083 if (!rs.next()) 084 throw new JMSException("Failed to broker message: " + messageID 085 + " in container."); 086 087 // Update the blob 088 Blob blob = rs.getBlob(1); 089 OutputStream stream = blob.setBinaryStream(data.length); 090 stream.write(data); 091 stream.close(); 092 s.close(); 093 094 // Update the row with the updated blob 095 s = c.prepareStatement(statementProvider.getUpdateMessageStatment()); 096 s.setBlob(1, blob); 097 s.setLong(2, seq); 098 099 } catch (IOException e) { 100 throw (SQLException) new SQLException("BLOB could not be updated: " 101 + e).initCause(e); 102 } finally { 103 try { 104 rs.close(); 105 } catch (Throwable e) { 106 } 107 try { 108 s.close(); 109 } catch (Throwable e) { 110 } 111 } 112 } 113 114 public byte[] doGetMessage(Connection c, long seq) throws SQLException { 115 PreparedStatement s=null; ResultSet rs=null; 116 try { 117 118 s = c.prepareStatement(statementProvider.getFindMessageStatment()); 119 s.setLong(1, seq); 120 rs = s.executeQuery(); 121 122 if( !rs.next() ) 123 return null; 124 Blob blob = rs.getBlob(1); 125 InputStream is = blob.getBinaryStream(); 126 127 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); 128 int ch; 129 while( (ch=is.read())>= 0 ) { 130 os.write(ch); 131 } 132 is.close(); 133 os.close(); 134 135 return os.toByteArray(); 136 137 } catch (IOException e) { 138 throw (SQLException) new SQLException("BLOB could not be updated: " 139 + e).initCause(e); 140 } finally { 141 try { rs.close(); } catch (Throwable e) {} 142 try { s.close(); } catch (Throwable e) {} 143 } 144 } 145 146 }