001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.sql;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.FileWriter;
027import java.io.IOException;
028import java.io.InputStream;
029import java.net.URL;
030import java.nio.channels.Channels;
031import java.nio.channels.FileChannel;
032import java.nio.channels.WritableByteChannel;
033import java.sql.Connection;
034import java.sql.SQLException;
035import java.sql.Statement;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Comparator;
039import java.util.HashMap;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.NoSuchElementException;
044import java.util.Set;
045import java.util.TreeMap;
046import java.util.TreeSet;
047import java.util.concurrent.Callable;
048import java.util.concurrent.CopyOnWriteArrayList;
049import java.util.concurrent.ExecutionException;
050import java.util.concurrent.ExecutorService;
051import java.util.concurrent.Future;
052import java.util.concurrent.SynchronousQueue;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.locks.Lock;
056
057import javax.management.DynamicMBean;
058import javax.management.JMException;
059import javax.management.MBeanRegistration;
060import javax.management.MBeanServer;
061import javax.management.ObjectName;
062
063import net.sf.hajdbc.Balancer;
064import net.sf.hajdbc.Database;
065import net.sf.hajdbc.DatabaseActivationListener;
066import net.sf.hajdbc.DatabaseCluster;
067import net.sf.hajdbc.DatabaseClusterDecorator;
068import net.sf.hajdbc.DatabaseClusterFactory;
069import net.sf.hajdbc.DatabaseClusterMBean;
070import net.sf.hajdbc.DatabaseDeactivationListener;
071import net.sf.hajdbc.DatabaseEvent;
072import net.sf.hajdbc.DatabaseMetaDataCache;
073import net.sf.hajdbc.DatabaseMetaDataCacheFactory;
074import net.sf.hajdbc.Dialect;
075import net.sf.hajdbc.LockManager;
076import net.sf.hajdbc.Messages;
077import net.sf.hajdbc.StateManager;
078import net.sf.hajdbc.SynchronizationContext;
079import net.sf.hajdbc.SynchronizationListener;
080import net.sf.hajdbc.SynchronizationStrategy;
081import net.sf.hajdbc.local.LocalLockManager;
082import net.sf.hajdbc.local.LocalStateManager;
083import net.sf.hajdbc.sync.SynchronizationContextImpl;
084import net.sf.hajdbc.sync.SynchronizationStrategyBuilder;
085import net.sf.hajdbc.util.concurrent.CronThreadPoolExecutor;
086
087import org.jibx.runtime.BindingDirectory;
088import org.jibx.runtime.IMarshallingContext;
089import org.jibx.runtime.IUnmarshallingContext;
090import org.jibx.runtime.JiBXException;
091import org.quartz.CronExpression;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095/**
096 * @author  Paul Ferraro
097 * @param <D> either java.sql.Driver or javax.sql.DataSource
098 * @since   1.0
099 */
100public abstract class AbstractDatabaseCluster<D> implements DatabaseCluster<D>, DatabaseClusterMBean, MBeanRegistration
101{
102        /** This is a work-around for Java 1.4, where Boolean does not implement Comparable */
103        private static final Comparator<Boolean> booleanComparator = new Comparator<Boolean>()
104        {
105                @Override
106                public int compare(Boolean value1, Boolean value2)
107                {
108                        return this.valueOf(value1) - this.valueOf(value2);
109                }
110                
111                private int valueOf(Boolean value)
112                {
113                        return value.booleanValue() ? 1 : 0;
114                }
115        };
116
117        static Logger logger = LoggerFactory.getLogger(AbstractDatabaseCluster.class);
118
119//      private static final Method isValidMethod = Methods.findMethod(Connection.class, "isValid", Integer.TYPE);
120        
121        private String id;
122        private Balancer<D> balancer;
123        private Dialect dialect;
124        private DatabaseMetaDataCacheFactory databaseMetaDataCacheFactory;
125        private DatabaseMetaDataCache databaseMetaDataCache;
126        private String defaultSynchronizationStrategyId;
127        private CronExpression failureDetectionExpression;
128        private CronExpression autoActivationExpression;
129        private int minThreads;
130        private int maxThreads;
131        private int maxIdle;
132        private TransactionMode transactionMode;
133        private boolean identityColumnDetectionEnabled;
134        private boolean sequenceDetectionEnabled;
135        private boolean currentDateEvaluationEnabled;
136        private boolean currentTimeEvaluationEnabled;
137        private boolean currentTimestampEvaluationEnabled;
138        private boolean randEvaluationEnabled;
139        
140        private MBeanServer server;
141        private URL url;
142        private Map<String, SynchronizationStrategy> synchronizationStrategyMap = new HashMap<String, SynchronizationStrategy>();
143        private DatabaseClusterDecorator decorator;
144        private Map<String, Database<D>> databaseMap = new HashMap<String, Database<D>>();
145        private ExecutorService executor;
146        private CronThreadPoolExecutor cronExecutor = new CronThreadPoolExecutor(2);
147        private LockManager lockManager = new LocalLockManager();
148        private StateManager stateManager = new LocalStateManager(this);
149        private volatile boolean active = false;
150        private List<DatabaseActivationListener> activationListenerList = new CopyOnWriteArrayList<DatabaseActivationListener>();
151        private List<DatabaseDeactivationListener> deactivationListenerList = new CopyOnWriteArrayList<DatabaseDeactivationListener>();
152        private List<SynchronizationListener> synchronizationListenerList = new CopyOnWriteArrayList<SynchronizationListener>();
153        
154        protected AbstractDatabaseCluster(String id, URL url)
155        {
156                this.id = id;
157                this.url = url;
158        }
159
160        /**
161         * @see net.sf.hajdbc.DatabaseCluster#getId()
162         */
163        @Override
164        public String getId()
165        {
166                return this.id;
167        }
168        
169        /**
170         * @see net.sf.hajdbc.DatabaseClusterMBean#getVersion()
171         */
172        @Override
173        public String getVersion()
174        {
175                return DatabaseClusterFactory.getVersion();
176        }
177        
178        /**
179         * @see net.sf.hajdbc.DatabaseCluster#getAliveMap(java.util.Collection)
180         */
181        @Override
182        public Map<Boolean, List<Database<D>>> getAliveMap(Collection<Database<D>> databases)
183        {
184                Map<Database<D>, Future<Boolean>> futureMap = new TreeMap<Database<D>, Future<Boolean>>();
185
186                for (final Database<D> database: databases)
187                {
188                        Callable<Boolean> task = new Callable<Boolean>()
189                        {
190                                public Boolean call() throws Exception
191                                {
192                                        return AbstractDatabaseCluster.this.isAlive(database);
193                                }
194                        };
195
196                        futureMap.put(database, this.executor.submit(task));
197                }
198
199                Map<Boolean, List<Database<D>>> map = new TreeMap<Boolean, List<Database<D>>>(booleanComparator);
200                
201                int size = databases.size();
202                
203                map.put(false, new ArrayList<Database<D>>(size));
204                map.put(true, new ArrayList<Database<D>>(size));
205                
206                for (Map.Entry<Database<D>, Future<Boolean>> futureMapEntry: futureMap.entrySet())
207                {
208                        try
209                        {
210                                map.get(futureMapEntry.getValue().get()).add(futureMapEntry.getKey());
211                        }
212                        catch (ExecutionException e)
213                        {
214                                // isAlive does not throw an exception
215                                throw new IllegalStateException(e);
216                        }
217                        catch (InterruptedException e)
218                        {
219                                Thread.currentThread().interrupt();
220                        }
221                }
222
223                return map;
224        }
225        
226        boolean isAlive(Database<D> database)
227        {
228                try
229                {
230                        this.test(database);
231                        
232                        return true;
233                }
234                catch (SQLException e)
235                {
236                        logger.warn(Messages.getMessage(Messages.DATABASE_NOT_ALIVE, database, this), e);
237                        
238                        return false;
239                }
240        }
241
242        private void test(Database<D> database) throws SQLException
243        {
244                Connection connection = null;
245                
246                try
247                {
248                        connection = database.connect(database.createConnectionFactory());
249                        
250                        Statement statement = connection.createStatement();
251                        
252                        statement.execute(this.dialect.getSimpleSQL());
253
254                        statement.close();
255                }
256                finally
257                {
258                        if (connection != null)
259                        {
260                                try
261                                {
262                                        connection.close();
263                                }
264                                catch (SQLException e)
265                                {
266                                        logger.warn(e.toString(), e);
267                                }
268                        }
269                }
270        }
271/*      
272        boolean isAliveNew(Database<D> database)
273        {
274                Connection connection = null;
275                
276                try
277                {
278                        connection = database.connect(database.createConnectionFactory());
279                        
280                        return this.isAlive(connection);
281                }
282                catch (SQLException e)
283                {
284                        logger.warn(Messages.getMessage(Messages.DATABASE_NOT_ALIVE, database, this), e);
285                        
286                        return false;
287                }
288                finally
289                {
290                        if (connection != null)
291                        {
292                                try
293                                {
294                                        connection.close();
295                                }
296                                catch (SQLException e)
297                                {
298                                        logger.warn(e.getMessage(), e);
299                                }
300                        }
301                }
302        }
303        
304        private boolean isAlive(Connection connection)
305        {
306                if (isValidMethod != null)
307                {
308                        try
309                        {
310                                return connection.isValid(0);
311                        }
312                        catch (SQLException e)
313                        {
314                                // isValid not yet supported
315                        }
316                }
317
318                try
319                {
320                        Statement statement = connection.createStatement();
321                        
322                        statement.execute(this.dialect.getSimpleSQL());
323
324                        statement.close();
325                        
326                        return true;
327                }
328                catch (SQLException e)
329                {
330                        logger.warn(e.toString(), e);
331                        
332                        return false;
333                }
334        }
335*/      
336        /**
337         * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager)
338         */
339        @Override
340        public boolean deactivate(Database<D> database, StateManager manager)
341        {
342                synchronized (this.balancer)
343                {
344                        this.unregister(database);
345                        // Reregister database mbean using "inactive" interface
346                        this.register(database, database.getInactiveMBean());
347                        
348                        boolean removed = this.balancer.remove(database);
349                        
350                        if (removed)
351                        {
352                                DatabaseEvent event = new DatabaseEvent(database);
353
354                                manager.deactivated(event);
355                                
356                                for (DatabaseDeactivationListener listener: this.deactivationListenerList)
357                                {
358                                        listener.deactivated(event);
359                                }
360                        }
361                        
362                        return removed;
363                }
364        }
365        
366        /**
367         * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager)
368         */
369        @Override
370        public boolean activate(Database<D> database, StateManager manager)
371        {
372                synchronized (this.balancer)
373                {
374                        this.unregister(database);
375                        // Reregister database mbean using "active" interface
376                        this.register(database, database.getActiveMBean());
377                        
378                        if (database.isDirty())
379                        {
380                                this.export();
381                                
382                                database.clean();
383                        }
384                        
385                        boolean added = this.balancer.add(database);
386                        
387                        if (added)
388                        {
389                                DatabaseEvent event = new DatabaseEvent(database);
390
391                                manager.activated(event);
392                                
393                                for (DatabaseActivationListener listener: this.activationListenerList)
394                                {
395                                        listener.activated(event);
396                                }
397                        }
398                        
399                        return added;
400                }
401        }
402        
403        /**
404         * @see net.sf.hajdbc.DatabaseClusterMBean#getActiveDatabases()
405         */
406        @Override
407        public Set<String> getActiveDatabases()
408        {
409                Set<String> databaseSet = new TreeSet<String>();
410                
411                for (Database<D> database: this.balancer.all())
412                {
413                        databaseSet.add(database.getId());
414                }
415                
416                return databaseSet;
417        }
418
419        /**
420         * @see net.sf.hajdbc.DatabaseClusterMBean#getInactiveDatabases()
421         */
422        @Override
423        public Set<String> getInactiveDatabases()
424        {
425                synchronized (this.databaseMap)
426                {
427                        Set<String> databaseSet = new TreeSet<String>(this.databaseMap.keySet());
428
429                        for (Database<D> database: this.balancer.all())
430                        {
431                                databaseSet.remove(database.getId());
432                        }
433                        
434                        return databaseSet;
435                }
436        }
437
438        /**
439         * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String)
440         */
441        @Override
442        public Database<D> getDatabase(String id)
443        {
444                synchronized (this.databaseMap)
445                {
446                        Database<D> database = this.databaseMap.get(id);
447                        
448                        if (database == null)
449                        {
450                                throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_DATABASE, id, this));
451                        }
452                        
453                        return database;
454                }
455        }
456
457        /**
458         * @see net.sf.hajdbc.DatabaseClusterMBean#getDefaultSynchronizationStrategy()
459         */
460        @Override
461        public String getDefaultSynchronizationStrategy()
462        {
463                return this.defaultSynchronizationStrategyId;
464        }
465
466        /**
467         * @see net.sf.hajdbc.DatabaseClusterMBean#getSynchronizationStrategies()
468         */
469        @Override
470        public Set<String> getSynchronizationStrategies()
471        {
472                return new TreeSet<String>(this.synchronizationStrategyMap.keySet());
473        }
474
475        /**
476         * @see net.sf.hajdbc.DatabaseCluster#getBalancer()
477         */
478        @Override
479        public Balancer<D> getBalancer()
480        {
481                return this.balancer;
482        }
483
484        /**
485         * @see net.sf.hajdbc.DatabaseCluster#getTransactionalExecutor()
486         */
487        @Override
488        public ExecutorService getTransactionalExecutor()
489        {
490                return this.transactionMode.getTransactionExecutor(this.executor);
491        }
492
493        /**
494         * @see net.sf.hajdbc.DatabaseCluster#getNonTransactionalExecutor()
495         */
496        @Override
497        public ExecutorService getNonTransactionalExecutor()
498        {
499                return this.executor;
500        }
501        
502        /**
503         * @see net.sf.hajdbc.DatabaseCluster#getDialect()
504         */
505        @Override
506        public Dialect getDialect()
507        {
508                return this.dialect;
509        }
510
511        /**
512         * @see net.sf.hajdbc.DatabaseCluster#getDatabaseMetaDataCache()
513         */
514        @Override
515        public DatabaseMetaDataCache getDatabaseMetaDataCache()
516        {
517                return this.databaseMetaDataCache;
518        }
519
520        /**
521         * @see net.sf.hajdbc.DatabaseCluster#getLockManager()
522         */
523        @Override
524        public LockManager getLockManager()
525        {
526                return this.lockManager;
527        }
528        
529        /**
530         * @see net.sf.hajdbc.DatabaseClusterMBean#isAlive(java.lang.String)
531         */
532        @Override
533        public boolean isAlive(String id)
534        {
535                return this.isAlive(this.getDatabase(id));
536        }
537
538        /**
539         * @see net.sf.hajdbc.DatabaseClusterMBean#deactivate(java.lang.String)
540         */
541        @Override
542        public void deactivate(String databaseId)
543        {
544                if (this.deactivate(this.getDatabase(databaseId), this.stateManager))
545                {
546                        logger.info(Messages.getMessage(Messages.DATABASE_DEACTIVATED, databaseId, this));
547                }
548        }
549
550        /**
551         * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String)
552         */
553        @Override
554        public void activate(String databaseId)
555        {
556                this.activate(databaseId, this.getDefaultSynchronizationStrategy());
557        }
558        
559        /**
560         * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String, java.lang.String)
561         */
562        @Override
563        public void activate(String databaseId, String strategyId)
564        {
565                SynchronizationStrategy strategy = this.synchronizationStrategyMap.get(strategyId);
566                
567                if (strategy == null)
568                {
569                        throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_SYNC_STRATEGY, strategyId));
570                }
571                
572                try
573                {
574                        if (this.activate(this.getDatabase(databaseId), strategy))
575                        {
576                                logger.info(Messages.getMessage(Messages.DATABASE_ACTIVATED, databaseId, this));
577                        }
578                }
579                catch (SQLException e)
580                {
581                        logger.warn(Messages.getMessage(Messages.DATABASE_ACTIVATE_FAILED, databaseId, this), e);
582                        
583                        SQLException exception = e.getNextException();
584                        
585                        while (exception != null)
586                        {
587                                logger.error(exception.getMessage(), exception);
588                                
589                                exception = exception.getNextException();
590                        }
591
592                        throw new IllegalStateException(e.toString());
593                }
594                catch (InterruptedException e)
595                {
596                        logger.warn(e.toString(), e);
597                        
598                        Thread.currentThread().interrupt();
599                }
600        }
601        
602        protected void register(Database<D> database, DynamicMBean mbean)
603        {
604                try
605                {
606                        ObjectName name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
607                        
608                        this.server.registerMBean(mbean, name);
609                }
610                catch (JMException e)
611                {
612                        logger.error(e.toString(), e);
613                        
614                        throw new IllegalStateException(e);
615                }
616        }
617        
618        /**
619         * @see net.sf.hajdbc.DatabaseClusterMBean#remove(java.lang.String)
620         */
621        @Override
622        public void remove(String id)
623        {
624                synchronized (this.databaseMap)
625                {
626                        Database<D> database = this.getDatabase(id);
627                        
628                        if (this.balancer.all().contains(database))
629                        {
630                                throw new IllegalStateException(Messages.getMessage(Messages.DATABASE_STILL_ACTIVE, id, this));
631                        }
632        
633                        this.unregister(database);
634                        
635                        this.databaseMap.remove(id);
636                        
637                        this.export();
638                }
639        }
640
641        private void unregister(Database<D> database)
642        {
643                try
644                {
645                        ObjectName name = DatabaseClusterFactory.getObjectName(this.id, database.getId());
646                        
647                        if (this.server.isRegistered(name))
648                        {
649                                this.server.unregisterMBean(name);
650                        }
651                }
652                catch (JMException e)
653                {
654                        logger.error(e.toString(), e);
655                        
656                        throw new IllegalStateException(e);
657                }
658        }
659
660        /**
661         * @see net.sf.hajdbc.DatabaseCluster#isActive()
662         */
663        @Override
664        public boolean isActive()
665        {
666                return this.active;
667        }
668
669        /**
670         * @see net.sf.hajdbc.Lifecycle#start()
671         */
672        public synchronized void start() throws Exception
673        {
674                if (this.active) return;
675                
676                this.lockManager.start();
677                this.stateManager.start();
678                
679                this.executor = new ThreadPoolExecutor(this.minThreads, this.maxThreads, this.maxIdle, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
680                
681                Set<String> databaseSet = this.stateManager.getInitialState();
682                
683                if (databaseSet != null)
684                {
685                        for (String databaseId: databaseSet)
686                        {
687                                Database<D> database = this.getDatabase(databaseId);
688                                
689                                if (database != null)
690                                {
691                                        this.activate(database, this.stateManager);
692                                }
693                        }
694                }
695                else
696                {
697                        for (Database<D> database: this.getAliveMap(this.databaseMap.values()).get(true))
698                        {
699                                this.activate(database, this.stateManager);
700                        }
701                }
702                
703                this.databaseMetaDataCache = this.databaseMetaDataCacheFactory.createCache(this);
704                
705                try
706                {
707                        this.flushMetaDataCache();
708                }
709                catch (IllegalStateException e)
710                {
711                        // Ignore - cache will initialize lazily.
712                }
713                
714                if (this.failureDetectionExpression != null)
715                {
716                        this.cronExecutor.schedule(new FailureDetectionTask(), this.failureDetectionExpression);
717                }
718                
719                if (this.autoActivationExpression != null)
720                {
721                        this.cronExecutor.schedule(new AutoActivationTask(), this.autoActivationExpression);
722                }
723                
724                this.active = true;
725        }
726
727        /**
728         * @see net.sf.hajdbc.Lifecycle#stop()
729         */
730        public synchronized void stop()
731        {
732                if (!this.active) return;
733
734                this.active = false;
735                
736                this.balancer.clear();
737                
738                this.stateManager.stop();
739                this.lockManager.stop();
740                
741                this.cronExecutor.shutdownNow();
742                
743                if (this.executor != null)
744                {
745                        this.executor.shutdownNow();
746                }
747        }
748        
749        /**
750         * @see net.sf.hajdbc.DatabaseClusterMBean#flushMetaDataCache()
751         */
752        @Override
753        public void flushMetaDataCache()
754        {
755                try
756                {
757                        this.databaseMetaDataCache.flush();
758                }
759                catch (SQLException e)
760                {
761                        throw new IllegalStateException(e.toString(), e);
762                }
763        }
764
765        /**
766         * @see net.sf.hajdbc.DatabaseCluster#isIdentityColumnDetectionEnabled()
767         */
768        @Override
769        public boolean isIdentityColumnDetectionEnabled()
770        {
771                return this.identityColumnDetectionEnabled;
772        }
773
774        /**
775         * @see net.sf.hajdbc.DatabaseCluster#isSequenceDetectionEnabled()
776         */
777        @Override
778        public boolean isSequenceDetectionEnabled()
779        {
780                return this.sequenceDetectionEnabled;
781        }
782
783        /**
784         * @see net.sf.hajdbc.DatabaseCluster#isCurrentDateEvaluationEnabled()
785         */
786        @Override
787        public boolean isCurrentDateEvaluationEnabled()
788        {
789                return this.currentDateEvaluationEnabled;
790        }
791
792        /**
793         * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimeEvaluationEnabled()
794         */
795        @Override
796        public boolean isCurrentTimeEvaluationEnabled()
797        {
798                return this.currentTimeEvaluationEnabled;
799        }
800
801        /**
802         * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimestampEvaluationEnabled()
803         */
804        @Override
805        public boolean isCurrentTimestampEvaluationEnabled()
806        {
807                return this.currentTimestampEvaluationEnabled;
808        }
809
810        /**
811         * @see net.sf.hajdbc.DatabaseCluster#isRandEvaluationEnabled()
812         */
813        @Override
814        public boolean isRandEvaluationEnabled()
815        {
816                return this.randEvaluationEnabled;
817        }
818
819        /**
820         * @see java.lang.Object#toString()
821         */
822        @Override
823        public String toString()
824        {
825                return this.getId();
826        }
827        
828        /**
829         * @see java.lang.Object#equals(java.lang.Object)
830         */
831        @SuppressWarnings("unchecked")
832        @Override
833        public boolean equals(Object object)
834        {
835                if ((object == null) || !(object instanceof DatabaseCluster)) return false;
836                
837                String id = ((DatabaseCluster) object).getId();
838                
839                return (id != null) && id.equals(this.id);
840        }
841        
842        /**
843         * @see java.lang.Object#hashCode()
844         */
845        @Override
846        public int hashCode()
847        {
848                return this.id.hashCode();
849        }
850        
851        protected DatabaseClusterDecorator getDecorator()
852        {
853                return this.decorator;
854        }
855        
856        protected void setDecorator(DatabaseClusterDecorator decorator)
857        {
858                this.decorator = decorator;
859        }
860        
861        protected void add(Database<D> database)
862        {
863                String id = database.getId();
864                
865                synchronized (this.databaseMap)
866                {
867                        if (this.databaseMap.containsKey(id))
868                        {
869                                throw new IllegalArgumentException(Messages.getMessage(Messages.DATABASE_ALREADY_EXISTS, id, this));
870                        }
871                        
872                        this.register(database, database.getInactiveMBean());
873                        
874                        this.databaseMap.put(id, database);
875                }
876        }
877        
878        protected Iterator<Database<D>> getDatabases()
879        {
880                synchronized (this.databaseMap)
881                {
882                        return this.databaseMap.values().iterator();
883                }
884        }
885        
886        /**
887         * @see net.sf.hajdbc.DatabaseCluster#getStateManager()
888         */
889        @Override
890        public StateManager getStateManager()
891        {
892                return this.stateManager;
893        }
894
895        /**
896         * @see net.sf.hajdbc.DatabaseCluster#setStateManager(net.sf.hajdbc.StateManager)
897         */
898        @Override
899        public void setStateManager(StateManager stateManager)
900        {
901                this.stateManager = stateManager;
902        }
903
904        /**
905         * @see net.sf.hajdbc.DatabaseCluster#setLockManager(net.sf.hajdbc.LockManager)
906         */
907        @Override
908        public void setLockManager(LockManager lockManager)
909        {
910                this.lockManager = lockManager;
911        }
912
913        /**
914         * @see net.sf.hajdbc.DatabaseClusterMBean#getUrl()
915         */
916        @Override
917        public URL getUrl()
918        {
919                return this.url;
920        }
921
922        private boolean activate(Database<D> database, SynchronizationStrategy strategy) throws SQLException, InterruptedException
923        {
924                Lock lock = this.lockManager.writeLock(LockManager.GLOBAL);
925                
926                lock.lockInterruptibly();
927                
928                try
929                {
930                        SynchronizationContext<D> context = new SynchronizationContextImpl<D>(this, database);
931                        
932                        if (context.getActiveDatabaseSet().contains(database))
933                        {
934                                return false;
935                        }
936                        
937                        this.test(database);
938                        
939                        try
940                        {
941                                DatabaseEvent event = new DatabaseEvent(database);
942                                
943                                logger.info(Messages.getMessage(Messages.DATABASE_SYNC_START, database, this));
944                                
945                                for (SynchronizationListener listener: this.synchronizationListenerList)
946                                {
947                                        listener.beforeSynchronization(event);
948                                }
949                                
950                                strategy.synchronize(context);
951
952                                logger.info(Messages.getMessage(Messages.DATABASE_SYNC_END, database, this));
953                                
954                                for (SynchronizationListener listener: this.synchronizationListenerList)
955                                {
956                                        listener.afterSynchronization(event);
957                                }
958                                
959                                return this.activate(database, this.stateManager);
960                        }
961                        finally
962                        {
963                                context.close();
964                        }
965                }
966                catch (NoSuchElementException e)
967                {
968                        return this.activate(database, this.stateManager);
969                }
970                finally
971                {
972                        lock.unlock();
973                }
974        }
975        
976        /**
977         * @see javax.management.MBeanRegistration#postDeregister()
978         */
979        @Override
980        public void postDeregister()
981        {
982                this.stop();
983                
984                this.unregisterDatabases();
985        }
986
987        private void unregisterDatabases()
988        {
989                synchronized (this.databaseMap)
990                {
991                        Iterator<Database<D>> databases = this.databaseMap.values().iterator();
992                        
993                        while (databases.hasNext())
994                        {
995                                this.unregister(databases.next());
996                                
997                                databases.remove();
998                        }
999                }
1000        }
1001        
1002        /**
1003         * @see javax.management.MBeanRegistration#postRegister(java.lang.Boolean)
1004         */
1005        @Override
1006        public void postRegister(Boolean registered)
1007        {
1008                if (!registered)
1009                {
1010                        this.postDeregister();
1011                }
1012        }
1013
1014        /**
1015         * @see javax.management.MBeanRegistration#preDeregister()
1016         */
1017        @Override
1018        public void preDeregister() throws Exception
1019        {
1020                // Nothing to do
1021        }
1022
1023        /**
1024         * @see javax.management.MBeanRegistration#preRegister(javax.management.MBeanServer, javax.management.ObjectName)
1025         */
1026        @Override
1027        public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception
1028        {
1029                this.server = server;
1030                
1031                InputStream inputStream = null;
1032                
1033                logger.info(Messages.getMessage(Messages.HA_JDBC_INIT, this.getVersion(), this.url));
1034                
1035                try
1036                {
1037                        inputStream = this.url.openStream();
1038                        
1039                        IUnmarshallingContext context = BindingDirectory.getFactory(this.getClass()).createUnmarshallingContext();
1040        
1041                        context.setDocument(inputStream, null);
1042                        
1043                        context.setUserContext(this);
1044                        
1045                        context.unmarshalElement();
1046                        
1047                        if (this.decorator != null)
1048                        {
1049                                this.decorator.decorate(this);
1050                        }
1051                        
1052                        this.start();
1053                        
1054                        return name;
1055                }
1056                catch (IOException e)
1057                {
1058                        logger.error(Messages.getMessage(Messages.CONFIG_NOT_FOUND, this.url), e);
1059                        
1060                        throw e;
1061                }
1062                catch (JiBXException e)
1063                {
1064                        logger.error(Messages.getMessage(Messages.CONFIG_LOAD_FAILED, this.url), e);
1065                        
1066                        this.unregisterDatabases();
1067                        
1068                        throw e;
1069                }
1070                catch (Exception e)
1071                {
1072                        logger.error(Messages.getMessage(Messages.CLUSTER_START_FAILED, this), e);
1073                        
1074                        this.postDeregister();
1075                        
1076                        throw e;
1077                }
1078                finally
1079                {
1080                        if (inputStream != null)
1081                        {
1082                                try
1083                                {
1084                                        inputStream.close();
1085                                }
1086                                catch (IOException e)
1087                                {
1088                                        logger.warn(e.toString(), e);
1089                                }
1090                        }
1091                }
1092        }
1093        
1094        private void export()
1095        {
1096                File file = null;
1097                WritableByteChannel outputChannel = null;
1098                FileChannel fileChannel = null;
1099                
1100                try
1101                {
1102                        file = File.createTempFile("ha-jdbc", ".xml"); //$NON-NLS-1$ //$NON-NLS-2$
1103                        
1104                        IMarshallingContext context = BindingDirectory.getFactory(this.getClass()).createMarshallingContext();
1105                
1106                        context.setIndent(1, System.getProperty("line.separator"), '\t'); //$NON-NLS-1$
1107                        
1108                        // This method closes the writer
1109                        context.marshalDocument(this, null, null, new FileWriter(file));
1110                        
1111                        fileChannel = new FileInputStream(file).getChannel();
1112
1113                        outputChannel = this.getOutputChannel(this.url);
1114
1115                        fileChannel.transferTo(0, file.length(), outputChannel);
1116                }
1117                catch (Exception e)
1118                {
1119                        logger.warn(Messages.getMessage(Messages.CONFIG_STORE_FAILED, this.url), e);
1120                }
1121                finally
1122                {
1123                        if (outputChannel != null)
1124                        {
1125                                try
1126                                {
1127                                        outputChannel.close();
1128                                }
1129                                catch (IOException e)
1130                                {
1131                                        logger.warn(e.getMessage(), e);
1132                                }
1133                        }
1134                        
1135                        if (fileChannel != null)
1136                        {
1137                                try
1138                                {
1139                                        fileChannel.close();
1140                                }
1141                                catch (IOException e)
1142                                {
1143                                        logger.warn(e.getMessage(), e);
1144                                }
1145                        }
1146                        
1147                        if (file != null)
1148                        {
1149                                file.delete();
1150                        }
1151                }
1152        }
1153        
1154        /**
1155         * We cannot use URLConnection for files because Sun's implementation does not support output.
1156         */
1157        private WritableByteChannel getOutputChannel(URL url) throws IOException
1158        {
1159                return this.isFile(url) ? new FileOutputStream(this.toFile(url)).getChannel() : Channels.newChannel(url.openConnection().getOutputStream());
1160        }
1161        
1162        private boolean isFile(URL url)
1163        {
1164                return url.getProtocol().equals("file"); //$NON-NLS-1$
1165        }
1166        
1167        private File toFile(URL url)
1168        {
1169                return new File(url.getPath());
1170        }
1171        
1172        protected void addSynchronizationStrategyBuilder(SynchronizationStrategyBuilder builder) throws Exception
1173        {
1174                this.synchronizationStrategyMap.put(builder.getId(), builder.buildStrategy());
1175        }
1176        
1177        protected Iterator<SynchronizationStrategyBuilder> getSynchronizationStrategyBuilders() throws Exception
1178        {
1179                List<SynchronizationStrategyBuilder> builderList = new ArrayList<SynchronizationStrategyBuilder>(this.synchronizationStrategyMap.size());
1180                
1181                for (Map.Entry<String, SynchronizationStrategy> mapEntry: this.synchronizationStrategyMap.entrySet())
1182                {
1183                        builderList.add(SynchronizationStrategyBuilder.getBuilder(mapEntry.getKey(), mapEntry.getValue()));
1184                }
1185                
1186                return builderList.iterator();
1187        }
1188
1189        /**
1190         * @see net.sf.hajdbc.DatabaseClusterMBean#addActivationListener(net.sf.hajdbc.DatabaseActivationListener)
1191         */
1192        @Override
1193        public void addActivationListener(DatabaseActivationListener listener)
1194        {
1195                this.activationListenerList.add(listener);
1196        }
1197
1198        /**
1199         * @see net.sf.hajdbc.DatabaseClusterMBean#addDeactivationListener(net.sf.hajdbc.DatabaseDeactivationListener)
1200         */
1201        @Override
1202        public void addDeactivationListener(DatabaseDeactivationListener listener)
1203        {
1204                this.deactivationListenerList.add(listener);
1205        }
1206
1207        /**
1208         * @see net.sf.hajdbc.DatabaseClusterMBean#addSynchronizationListener(net.sf.hajdbc.SynchronizationListener)
1209         */
1210        @Override
1211        public void addSynchronizationListener(SynchronizationListener listener)
1212        {
1213                this.synchronizationListenerList.add(listener);
1214        }
1215
1216        /**
1217         * @see net.sf.hajdbc.DatabaseClusterMBean#removeActivationListener(net.sf.hajdbc.DatabaseActivationListener)
1218         */
1219        @Override
1220        public void removeActivationListener(DatabaseActivationListener listener)
1221        {
1222                this.activationListenerList.remove(listener);
1223        }
1224
1225        /**
1226         * @see net.sf.hajdbc.DatabaseClusterMBean#removeDeactivationListener(net.sf.hajdbc.DatabaseDeactivationListener)
1227         */
1228        @Override
1229        public void removeDeactivationListener(DatabaseDeactivationListener listener)
1230        {
1231                this.deactivationListenerList.remove(listener);
1232        }
1233
1234        /**
1235         * @see net.sf.hajdbc.DatabaseClusterMBean#removeSynchronizationListener(net.sf.hajdbc.SynchronizationListener)
1236         */
1237        @Override
1238        public void removeSynchronizationListener(SynchronizationListener listener)
1239        {
1240                this.synchronizationListenerList.remove(listener);
1241        }
1242
1243        class FailureDetectionTask implements Runnable
1244        {
1245                /**
1246                 * @see java.lang.Runnable#run()
1247                 */
1248                @Override
1249                public void run()
1250                {
1251                        Set<Database<D>> databaseSet = AbstractDatabaseCluster.this.getBalancer().all();
1252                        
1253                        if (databaseSet.size() > 1)
1254                        {
1255                                Map<Boolean, List<Database<D>>> aliveMap = AbstractDatabaseCluster.this.getAliveMap(databaseSet);
1256                                
1257                                // Deactivate the dead databases, so long as at least one is alive
1258                                // Skip deactivation if membership is empty in case of cluster panic
1259                                if (!aliveMap.get(true).isEmpty() && !AbstractDatabaseCluster.this.getStateManager().isMembershipEmpty())
1260                                {
1261                                        for (Database<D> database: aliveMap.get(false))
1262                                        {
1263                                                if (AbstractDatabaseCluster.this.deactivate(database, AbstractDatabaseCluster.this.getStateManager()))
1264                                                {
1265                                                        logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this));
1266                                                }
1267                                        }
1268                                }
1269                        }
1270                }
1271        }       
1272        
1273        class AutoActivationTask implements Runnable
1274        {
1275                /**
1276                 * @see java.lang.Runnable#run()
1277                 */
1278                @Override
1279                public void run()
1280                {
1281                        for (String databaseId: AbstractDatabaseCluster.this.getInactiveDatabases())
1282                        {
1283                                AbstractDatabaseCluster.this.activate(databaseId);
1284                        }
1285                }
1286        }
1287}