001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import org.opends.messages.MessageBuilder;
029    
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.messages.ReplicationMessages.*;
032    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
033    
034    import java.util.ArrayList;
035    import java.util.Date;
036    import java.util.List;
037    import java.util.LinkedList;
038    import java.util.NoSuchElementException;
039    
040    import org.opends.server.admin.std.server.MonitorProviderCfg;
041    import org.opends.server.api.DirectoryThread;
042    import org.opends.server.api.MonitorProvider;
043    import org.opends.server.config.ConfigException;
044    import org.opends.server.types.Attribute;
045    import org.opends.server.types.DN;
046    import org.opends.server.types.InitializationException;
047    import org.opends.server.util.TimeThread;
048    import org.opends.server.core.DirectoryServer;
049    import org.opends.server.replication.common.ChangeNumber;
050    import org.opends.server.replication.protocol.UpdateMessage;
051    import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
052    
053    import com.sleepycat.je.DatabaseException;
054    import com.sleepycat.je.DeadlockException;
055    
056    /**
057     * This class is used for managing the replicationServer database for each
058     * server in the topology.
059     * It is responsible for efficiently saving the updates that is received from
060     * each master server into stable storage.
061     * This class is also able to generate a ReplicationIterator that can be
062     * used to read all changes from a given ChangeNUmber.
063     *
064     * This class publish some monitoring information below cn=monitor.
065     *
066     */
067    public class DbHandler implements Runnable
068    {
069      // The msgQueue holds all the updates not yet saved to stable storage.
070      // This list is only used as a temporary placeholder so that the write
071      // in the stable storage can be grouped for efficiency reason.
072      // Adding an update synchronously add the update to this list.
073      // A dedicated thread loops on flush() and trim().
074      // flush() : get a number of changes from the in memory list by block
075      //           and write them to the db.
076      // trim()  : deletes from the DB a number of changes that are older than a
077      //           certain date.
078      //
079      // Changes are not read back by replicationServer threads that are responsible
080      // for pushing the changes to other replication server or to LDAP server
081      //
082      private final LinkedList<UpdateMessage> msgQueue =
083        new LinkedList<UpdateMessage>();
084      private ReplicationDB db;
085      private ChangeNumber firstChange = null;
086      private ChangeNumber lastChange = null;
087      private short serverId;
088      private DN baseDn;
089      private DbMonitorProvider dbMonitor = new DbMonitorProvider();
090      private boolean shutdown = false;
091      private boolean done = false;
092      private DirectoryThread thread = null;
093      private final Object flushLock = new Object();
094      private ReplicationServer replicationServer;
095    
096    
097      // The High and low water mark for the max size of the msgQueue.
098      // the threads calling add() method will be blocked if the size of
099      // msgQueue becomes larger than the  MSG_QUEUE_HIMARK and will resume
100      // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
101      final static int MSG_QUEUE_HIMARK = 5000;
102      final static int MSG_QUEUE_LOWMARK = 4000;
103    
104      // The maximum number of retries in case of DatabaseDeadlock Exception.
105      private static final int DEADLOCK_RETRIES = 10;
106    
107      /**
108       *
109       * The trim age in milliseconds. Changes record in the change DB that
110       * are older than this age are removed.
111       *
112       */
113      private long trimage;
114    
115      /**
116       * Creates a new dbHandler associated to a given LDAP server.
117       *
118       * @param id Identifier of the DB.
119       * @param baseDn the baseDn for which this DB was created.
120       * @param replicationServer The ReplicationServer that creates this dbHandler.
121       * @param dbenv the Database Env to use to create the ReplicationServer DB.
122       * server for this domain.
123       * @throws DatabaseException If a database problem happened
124       */
125      public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
126          ReplicationDbEnv dbenv)
127             throws DatabaseException
128      {
129        this.replicationServer = replicationServer;
130        this.serverId = id;
131        this.baseDn = baseDn;
132        this.trimage = replicationServer.getTrimage();
133        db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
134        firstChange = db.readFirstChange();
135        lastChange = db.readLastChange();
136        thread = new DirectoryThread(this,
137                                     "Replication Server db " + id + " " +  baseDn);
138        thread.start();
139    
140        DirectoryServer.deregisterMonitorProvider(
141                          dbMonitor.getMonitorInstanceName());
142        DirectoryServer.registerMonitorProvider(dbMonitor);
143      }
144    
145      /**
146       * Add an update to the list of messages that must be saved to the db
147       * managed by this db handler.
148       * This method is blocking if the size of the list of message is larger
149       * than its maximum.
150       *
151       * @param update The update that must be saved to the db managed by this db
152       *               handler.
153       */
154      public void add(UpdateMessage update)
155      {
156        synchronized (msgQueue)
157        {
158          int size = msgQueue.size();
159          while (size > MSG_QUEUE_HIMARK)
160          {
161            try
162            {
163              msgQueue.wait(500);
164            } catch (InterruptedException e)
165            {
166              // simply loop to try again.
167            }
168            size = msgQueue.size();
169          }
170    
171          msgQueue.add(update);
172          if (lastChange == null || lastChange.older(update.getChangeNumber()))
173          {
174            lastChange = update.getChangeNumber();
175          }
176          if (firstChange == null)
177            firstChange = update.getChangeNumber();
178        }
179      }
180    
181      /**
182       * Get some changes out of the message queue of the LDAP server.
183       *
184       * @param number the number of messages to extract.
185       * @return a List containing number changes extracted from the queue.
186       */
187      private List<UpdateMessage> getChanges(int number)
188      {
189        int current = 0;
190        LinkedList<UpdateMessage> changes = new LinkedList<UpdateMessage>();
191    
192        synchronized (msgQueue)
193        {
194          int size = msgQueue.size();
195          while ((current < number) && (current < size))
196          {
197            UpdateMessage msg = msgQueue.get(current);
198            current++;
199            changes.add(msg);
200          }
201        }
202        return changes;
203      }
204    
205      /**
206       * Get the firstChange.
207       * @return Returns the firstChange.
208       */
209      public ChangeNumber getFirstChange()
210      {
211        return firstChange;
212      }
213    
214      /**
215       * Get the lastChange.
216       * @return Returns the lastChange.
217       */
218      public ChangeNumber getLastChange()
219      {
220        return lastChange;
221      }
222    
223      /**
224       * Get the number of changes.
225       *
226       * @return Returns the number of changes.
227       */
228      public long getChangesCount()
229      {
230        try
231        {
232          return lastChange.getSeqnum() - firstChange.getSeqnum() + 1;
233        }
234        catch (Exception e)
235        {
236          return 0;
237        }
238      }
239    
240      /**
241       * Generate a new ReplicationIterator that allows to browse the db
242       * managed by this dbHandler and starting at the position defined
243       * by a given changeNumber.
244       *
245       * @param changeNumber The position where the iterator must start.
246       *
247       * @return a new ReplicationIterator that allows to browse the db
248       *         managed by this dbHandler and starting at the position defined
249       *         by a given changeNumber.
250       *
251       * @throws DatabaseException if a database problem happened.
252       * @throws Exception  If there is no other change to push after change
253       *         with changeNumber number.
254       */
255      public ReplicationIterator generateIterator(ChangeNumber changeNumber)
256                               throws DatabaseException, Exception
257      {
258        /*
259         * When we create an iterator we need to make sure that we
260         * don't miss some changes because the iterator is created
261         * close to the limit of the changed that have not yet been
262         * flushed to the database.
263         * We detect this by comparing the date of the changeNumber where
264         * we want to start with the date of the first ChangeNumber
265         * of the msgQueue.
266         * If this is the case we flush the queue to the database.
267         */
268        ChangeNumber recentChangeNumber = null;
269    
270        if (changeNumber == null)
271          flush();
272    
273        synchronized (msgQueue)
274        {
275          try
276          {
277            UpdateMessage msg = msgQueue.getFirst();
278            recentChangeNumber = msg.getChangeNumber();
279          }
280          catch (NoSuchElementException e)
281          {}
282        }
283    
284        if ( (recentChangeNumber != null) && (changeNumber != null))
285        {
286          if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
287             ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
288          {
289            flush();
290          }
291        }
292    
293        ReplicationIterator it =
294          new ReplicationIterator(serverId, db, changeNumber);
295    
296        return it;
297      }
298    
299      /**
300       * Removes message in a subList of the msgQueue from the msgQueue.
301       *
302       * @param number the number of changes to be removed.
303       */
304      private void clearQueue(int number)
305      {
306        synchronized (msgQueue)
307        {
308          int current = 0;
309          while ((current < number) && (!msgQueue.isEmpty()))
310          {
311            msgQueue.remove();
312            current++;
313          }
314          if (msgQueue.size() < MSG_QUEUE_LOWMARK)
315            msgQueue.notify();
316        }
317      }
318    
319      /**
320       * Shutdown this dbHandler.
321       */
322      public void shutdown()
323      {
324        if (shutdown == true)
325        {
326          return;
327        }
328    
329        shutdown  = true;
330        synchronized (this)
331        {
332          this.notifyAll();
333        }
334    
335        synchronized (this)
336        {
337          while (done  == false)
338          {
339            try
340            {
341              this.wait();
342            } catch (Exception e)
343            {}
344          }
345        }
346    
347        while (msgQueue.size() != 0)
348          flush();
349    
350        db.shutdown();
351        DirectoryServer.deregisterMonitorProvider(
352            dbMonitor.getMonitorInstanceName());
353      }
354    
355      /**
356       * Run method for this class.
357       * Periodically Flushes the ReplicationServerDomain cache from memory to the
358       * stable storage and trims the old updates.
359       */
360      public void run()
361      {
362        while (shutdown == false)
363        {
364          try {
365            flush();
366            trim();
367    
368            synchronized (this)
369            {
370              try
371              {
372                this.wait(1000);
373              } catch (InterruptedException e)
374              { }
375            }
376          } catch (Exception end)
377          {
378            MessageBuilder mb = new MessageBuilder();
379            mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
380            mb.append(stackTraceToSingleLineString(end));
381            logError(mb.toMessage());
382            if (replicationServer != null)
383              replicationServer.shutdown();
384            break;
385          }
386        }
387        // call flush a last time before exiting to make sure that
388        // no change was forgotten in the msgQueue
389        flush();
390    
391        synchronized (this)
392        {
393          done = true;
394          this.notifyAll();
395        }
396      }
397    
398      /**
399       * Trim old changes from this replicationServer database.
400       * @throws DatabaseException In case of database problem.
401       */
402      private void trim() throws DatabaseException, Exception
403      {
404        if (trimage == 0)
405          return;
406        int size = 0;
407        boolean finished = false;
408        boolean done = false;
409        ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
410            (short) 0, (short)0);
411    
412        // In case of deadlock detection by the Database, this thread can
413        // by aborted by a DeadlockException. This is a transient error and
414        // the transaction should be attempted again.
415        // We will try DEADLOCK_RETRIES times before failing.
416        int tries = 0;
417        while ((tries++ < DEADLOCK_RETRIES) && (!done))
418        {
419          /* the trim is done by group in order to save some CPU and IO bandwidth
420           * start the transaction then do a bunch of remove then commit
421           */
422          ReplServerDBCursor cursor;
423          cursor = db.openDeleteCursor();
424    
425          try
426          {
427            while ((size < 5000 ) &&  (!finished))
428            {
429              ChangeNumber changeNumber = cursor.nextChangeNumber();
430              if (changeNumber != null)
431              {
432                if ((!changeNumber.equals(lastChange))
433                    && (changeNumber.older(trimDate)))
434                {
435                  size++;
436                  cursor.delete();
437                }
438                else
439                {
440                  firstChange = changeNumber;
441                  finished = true;
442                }
443              }
444              else
445                finished = true;
446            }
447            cursor.close();
448            done = true;
449          }
450          catch (DeadlockException e)
451          {
452            cursor.abort();
453            if (tries == DEADLOCK_RETRIES)
454            {
455              // could not handle the Deadlock after DEADLOCK_RETRIES tries.
456              // shutdown the ReplicationServer.
457              shutdown = true;
458              throw (e);
459            }
460          }
461          catch (DatabaseException e)
462          {
463            // mark shutdown for this db so that we don't try again to
464            // stop it from cursor.close() or methods called by cursor.close()
465            shutdown = true;
466            cursor.abort();
467            throw (e);
468          }
469        }
470      }
471    
472      /**
473       * Flush a number of updates from the memory list to the stable storage.
474       */
475      private void flush()
476      {
477        int size;
478    
479        do
480        {
481          synchronized(flushLock)
482          {
483            // get N messages to save in the DB
484            List<UpdateMessage> changes = getChanges(500);
485    
486            // if no more changes to save exit immediately.
487            if ((changes == null) || ((size = changes.size()) == 0))
488              return;
489    
490            // save the change to the stable storage.
491            db.addEntries(changes);
492    
493            // remove the changes from the list of changes to be saved.
494            clearQueue(changes.size());
495          }
496        } while (size >=500);
497      }
498    
499      /**
500       * This internal class is used to implement the Monitoring capabilities
501       * of the dbHandler.
502       */
503      private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
504      {
505        private DbMonitorProvider()
506        {
507          super("ReplicationServer Database");
508        }
509    
510        /**
511         * {@inheritDoc}
512         */
513        @Override
514        public ArrayList<Attribute> getMonitorData()
515        {
516          ArrayList<Attribute> attributes = new ArrayList<Attribute>();
517          attributes.add(new Attribute("replicationServer-database",
518                                       String.valueOf(serverId)));
519          attributes.add(new Attribute("base-dn", baseDn.toString()));
520          if (firstChange != null)
521          {
522            Date firstTime = new Date(firstChange.getTime());
523            attributes.add(new Attribute("first-change",
524                firstChange.toString() + " " + firstTime.toString()));
525          }
526          if (lastChange != null)
527          {
528            Date lastTime = new Date(lastChange.getTime());
529            attributes.add(new Attribute("last-change",
530                lastChange.toString() + " " + lastTime.toString()));
531          }
532    
533          return attributes;
534        }
535    
536        /**
537         * {@inheritDoc}
538         */
539        @Override
540        public String getMonitorInstanceName()
541        {
542          return "ReplicationServer database " + baseDn.toString() +
543                 " " + String.valueOf(serverId);
544        }
545    
546        /**
547         * {@inheritDoc}
548         */
549        @Override
550        public long getUpdateInterval()
551        {
552          /* we don't wont to do polling on this monitor */
553          return 0;
554        }
555    
556        /**
557         * {@inheritDoc}
558         */
559        @Override
560        public void initializeMonitorProvider(MonitorProviderCfg configuration)
561                                throws ConfigException,InitializationException
562        {
563          // Nothing to do for now
564        }
565    
566        /**
567         * {@inheritDoc}
568         */
569        @Override
570        public void updateMonitorData()
571        {
572          // As long as getUpdateInterval() returns 0, this will never get called
573        }
574      }
575    
576      /**
577       * {@inheritDoc}
578       */
579      @Override
580      public String toString()
581      {
582        return(baseDn + " " + serverId + " " + firstChange + " " + lastChange);
583      }
584    
585      /**
586       * Set the Purge delay for this db Handler.
587       * @param delay The purge delay in Milliseconds.
588       */
589      public void setPurgeDelay(long delay)
590      {
591        trimage = delay;
592      }
593    
594      /**
595       * Clear the changes from this DB (from both memory cache and DB storage).
596       * @throws DatabaseException When an exception occurs while removing the
597       * changes from the DB.
598       * @throws Exception When an exception occurs while accessing a resource
599       * from the DB.
600       *
601       */
602      public void clear() throws DatabaseException, Exception
603      {
604        synchronized(flushLock)
605        {
606          msgQueue.clear();
607        }
608        db.clear();
609        firstChange = db.readFirstChange();
610        lastChange = db.readLastChange();
611      }
612    }