001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.sync;
022
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027import java.sql.Statement;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.concurrent.Callable;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Future;
034
035import net.sf.hajdbc.Dialect;
036import net.sf.hajdbc.Messages;
037import net.sf.hajdbc.SynchronizationContext;
038import net.sf.hajdbc.SynchronizationStrategy;
039import net.sf.hajdbc.TableProperties;
040import net.sf.hajdbc.util.SQLExceptionFactory;
041import net.sf.hajdbc.util.Strings;
042
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Database-independent synchronization strategy that does full record transfer between two databases.
048 * This strategy is best used when there are <em>many</em> differences between the active database and the inactive database (i.e. very much out of sync).
049 * The following algorithm is used:
050 * <ol>
051 *  <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
052 *  <li>For each database table:
053 *   <ol>
054 *    <li>Delete all rows in the inactive database table</li>
055 *    <li>Query all rows on the active database table</li>
056 *    <li>For each row in active database table:
057 *     <ol>
058 *      <li>Insert new row into inactive database table</li>
059 *     </ol>
060 *    </li>
061 *   </ol>
062 *  </li>
063 *  <li>Re-create the foreign keys on the inactive database</li>
064 *  <li>Synchronize sequences</li>
065 * </ol>
066 * @author  Paul Ferraro
067 */
068public class FullSynchronizationStrategy implements SynchronizationStrategy
069{
070        private static Logger logger = LoggerFactory.getLogger(FullSynchronizationStrategy.class);
071
072        private int maxBatchSize = 100;
073        private int fetchSize = 0;
074        
075        /**
076         * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.SynchronizationContext)
077         */
078        @Override
079        public <D> void synchronize(SynchronizationContext<D> context) throws SQLException
080        {
081                Connection sourceConnection = context.getConnection(context.getSourceDatabase());
082                Connection targetConnection = context.getConnection(context.getTargetDatabase());
083
084                Dialect dialect = context.getDialect();
085                ExecutorService executor = context.getExecutor();
086                
087                boolean autoCommit = targetConnection.getAutoCommit();
088                
089                targetConnection.setAutoCommit(true);
090                
091                SynchronizationSupport.dropForeignKeys(context);
092                
093                targetConnection.setAutoCommit(false);
094                
095                try
096                {
097                        for (TableProperties table: context.getSourceDatabaseProperties().getTables())
098                        {
099                                String tableName = table.getName();
100                                Collection<String> columns = table.getColumns();
101                                
102                                String commaDelimitedColumns = Strings.join(columns, Strings.PADDED_COMMA);
103                                
104                                final String selectSQL = "SELECT " + commaDelimitedColumns + " FROM " + tableName; //$NON-NLS-1$ //$NON-NLS-2$
105                                
106                                final Statement selectStatement = sourceConnection.createStatement();
107                                selectStatement.setFetchSize(this.fetchSize);
108                                
109                                Callable<ResultSet> callable = new Callable<ResultSet>()
110                                {
111                                        public ResultSet call() throws SQLException
112                                        {
113                                                return selectStatement.executeQuery(selectSQL);
114                                        }
115                                };
116        
117                                Future<ResultSet> future = executor.submit(callable);
118                                
119                                String deleteSQL = dialect.getTruncateTableSQL(table);
120        
121                                logger.debug(deleteSQL);
122                                
123                                Statement deleteStatement = targetConnection.createStatement();
124        
125                                int deletedRows = deleteStatement.executeUpdate(deleteSQL);
126                                
127                                logger.info(Messages.getMessage(Messages.DELETE_COUNT, deletedRows, tableName));
128                                
129                                deleteStatement.close();
130                                
131                                ResultSet resultSet = future.get();
132                                
133                                String insertSQL = "INSERT INTO " + tableName + " (" + commaDelimitedColumns + ") VALUES (" + Strings.join(Collections.nCopies(columns.size(), Strings.QUESTION), Strings.PADDED_COMMA) + ")"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
134                                
135                                logger.debug(insertSQL);
136                                
137                                PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL);
138                                int statementCount = 0;
139                                
140                                while (resultSet.next())
141                                {
142                                        int index = 0;
143                                        
144                                        for (String column: columns)
145                                        {
146                                                index += 1;
147                                                
148                                                int type = dialect.getColumnType(table.getColumnProperties(column));
149                                                
150                                                Object object = SynchronizationSupport.getObject(resultSet, index, type);
151                                                
152                                                if (resultSet.wasNull())
153                                                {
154                                                        insertStatement.setNull(index, type);
155                                                }
156                                                else
157                                                {
158                                                        insertStatement.setObject(index, object, type);
159                                                }
160                                        }
161                                        
162                                        insertStatement.addBatch();
163                                        statementCount += 1;
164                                        
165                                        if ((statementCount % this.maxBatchSize) == 0)
166                                        {
167                                                insertStatement.executeBatch();
168                                                insertStatement.clearBatch();
169                                        }
170                                        
171                                        insertStatement.clearParameters();
172                                }
173        
174                                if ((statementCount % this.maxBatchSize) > 0)
175                                {
176                                        insertStatement.executeBatch();
177                                }
178        
179                                logger.info(Messages.getMessage(Messages.INSERT_COUNT, statementCount, tableName));
180                                
181                                insertStatement.close();
182                                selectStatement.close();
183                                
184                                targetConnection.commit();
185                        }
186                }
187                catch (InterruptedException e)
188                {
189                        SynchronizationSupport.rollback(targetConnection);
190
191                        throw SQLExceptionFactory.createSQLException(e);
192                }
193                catch (ExecutionException e)
194                {
195                        SynchronizationSupport.rollback(targetConnection);
196
197                        throw SQLExceptionFactory.createSQLException(e.getCause());
198                }
199                catch (SQLException e)
200                {
201                        SynchronizationSupport.rollback(targetConnection);
202                        
203                        throw e;
204                }
205                
206                targetConnection.setAutoCommit(true);
207                
208                SynchronizationSupport.restoreForeignKeys(context);
209                
210                SynchronizationSupport.synchronizeIdentityColumns(context);
211                SynchronizationSupport.synchronizeSequences(context);
212                
213                targetConnection.setAutoCommit(autoCommit);
214        }
215
216        /**
217         * @return the fetchSize.
218         */
219        public int getFetchSize()
220        {
221                return this.fetchSize;
222        }
223
224        /**
225         * @param fetchSize the fetchSize to set.
226         */
227        public void setFetchSize(int fetchSize)
228        {
229                this.fetchSize = fetchSize;
230        }
231        
232        /**
233         * @return the maxBatchSize.
234         */
235        public int getMaxBatchSize()
236        {
237                return this.maxBatchSize;
238        }
239
240        /**
241         * @param maxBatchSize the maxBatchSize to set.
242         */
243        public void setMaxBatchSize(int maxBatchSize)
244        {
245                this.maxBatchSize = maxBatchSize;
246        }
247}