001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2008 Sun Microsystems, Inc.
026     */
027    
028    package org.opends.server.backends.jeb.importLDIF;
029    
030    import org.opends.server.types.*;
031    import org.opends.server.loggers.debug.DebugTracer;
032    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
034    import static org.opends.server.loggers.ErrorLogger.logError;
035    import org.opends.server.admin.std.server.LocalDBBackendCfg;
036    import org.opends.server.util.LDIFReader;
037    import org.opends.server.util.StaticUtils;
038    import org.opends.server.util.LDIFException;
039    import org.opends.server.util.RuntimeInformation;
040    import static org.opends.server.util.DynamicConstants.BUILD_ID;
041    import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
042    import org.opends.server.config.ConfigException;
043    import org.opends.server.core.DirectoryServer;
044    import org.opends.server.backends.jeb.*;
045    import org.opends.server.protocols.asn1.ASN1OctetString;
046    import org.opends.messages.Message;
047    import org.opends.messages.JebMessages;
048    import static org.opends.messages.JebMessages.*;
049    import java.util.concurrent.CopyOnWriteArrayList;
050    import java.util.concurrent.LinkedBlockingQueue;
051    import java.util.concurrent.TimeUnit;
052    import java.util.*;
053    import java.io.IOException;
054    
055    import com.sleepycat.je.*;
056    
057    /**
058     * Performs a LDIF import.
059     */
060    
061    public class Importer implements Thread.UncaughtExceptionHandler {
062    
063    
064      /**
065       * The tracer object for the debug logger.
066       */
067      private static final DebugTracer TRACER = getTracer();
068    
069      /**
070       * The JE backend configuration.
071       */
072      private LocalDBBackendCfg config;
073    
074      /**
075       * The root container used for this import job.
076       */
077      private RootContainer rootContainer;
078    
079      /**
080       * The LDIF import configuration.
081       */
082      private LDIFImportConfig ldifImportConfig;
083    
084      /**
085       * The LDIF reader.
086       */
087      private LDIFReader reader;
088    
089      /**
090       * Map of base DNs to their import context.
091       */
092      private LinkedHashMap<DN, DNContext> importMap =
093          new LinkedHashMap<DN, DNContext>();
094    
095    
096      /**
097        * The number of entries migrated.
098        */
099       private int migratedCount;
100    
101      /**
102       * The number of entries imported.
103       */
104      private int importedCount;
105    
106      /**
107       * The number of milliseconds between job progress reports.
108       */
109      private long progressInterval = 10000;
110    
111      /**
112       * The progress report timer.
113       */
114      private Timer timer;
115    
116      //Thread array.
117      private CopyOnWriteArrayList<WorkThread> threads;
118    
119      //Progress task.
120      private ProgressTask pTask;
121    
122      //Number of entries import before checking if cleaning is needed after
123      //eviction has been detected.
124      private static final int entryCleanInterval = 250000;
125    
126      //Minimum buffer amount to give to a buffer manager.
127      private static final long minBuffer = 1024 * 1024;
128    
129      //Total available memory for the buffer managers.
130      private long totalAvailBufferMemory = 0;
131    
132      //Memory size to be used for the DB cache in string format.
133      private String dbCacheSizeStr;
134    
135      //Used to do an initial clean after eviction has been detected.
136      private boolean firstClean=false;
137    
138      //A thread threw an Runtime exception stop the import.
139      private boolean unCaughtExceptionThrown = false;
140    
141      /**
142       * Create a new import job with the specified ldif import config.
143       *
144       * @param ldifImportConfig The LDIF import config.
145       */
146      public Importer(LDIFImportConfig ldifImportConfig)
147      {
148        this.ldifImportConfig = ldifImportConfig;
149        this.threads = new CopyOnWriteArrayList<WorkThread>();
150        calcMemoryLimits();
151      }
152    
153      /**
154       * Start the worker threads.
155       *
156       * @throws DatabaseException If a DB problem occurs.
157       */
158      private void startWorkerThreads()
159              throws DatabaseException {
160    
161        int importThreadCount = config.getImportThreadCount();
162        //Figure out how much buffer memory to give to each context.
163        int contextCount = importMap.size();
164        long memoryPerContext = totalAvailBufferMemory / contextCount;
165        //Below min, use the min value.
166        if(memoryPerContext < minBuffer) {
167          Message msg =
168                NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
169                                                                 minBuffer);
170          logError(msg);
171          memoryPerContext = minBuffer;
172        }
173        // Create one set of worker threads/buffer managers for each base DN.
174        for (DNContext context : importMap.values()) {
175          BufferManager bufferManager = new BufferManager(memoryPerContext,
176                                                          importThreadCount);
177          context.setBufferManager(bufferManager);
178          for (int i = 0; i < importThreadCount; i++) {
179            WorkThread t = new WorkThread(context.getWorkQueue(), i,
180                                          bufferManager, rootContainer);
181            t.setUncaughtExceptionHandler(this);
182            threads.add(t);
183            t.start();
184          }
185        }
186        // Start a timer for the progress report.
187        timer = new Timer();
188        TimerTask progressTask = new ProgressTask();
189        //Used to get at extra functionality such as eviction detected.
190        pTask = (ProgressTask) progressTask;
191        timer.scheduleAtFixedRate(progressTask, progressInterval,
192                                  progressInterval);
193    
194      }
195    
196    
197      /**
198       * Import a ldif using the specified root container.
199       *
200       * @param rootContainer  The root container.
201       * @return A LDIF result.
202       * @throws DatabaseException  If a DB error occurs.
203       * @throws IOException If a IO error occurs.
204       * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
205       * @throws DirectoryException If a directory error occurs.
206       * @throws ConfigException If a configuration has an error.
207       */
208      public LDIFImportResult processImport(RootContainer rootContainer)
209          throws DatabaseException, IOException, JebException, DirectoryException,
210                ConfigException {
211    
212        // Create an LDIF reader. Throws an exception if the file does not exist.
213        reader = new LDIFReader(ldifImportConfig);
214        this.rootContainer = rootContainer;
215        this.config = rootContainer.getConfiguration();
216    
217        Message message;
218        long startTime;
219        try {
220          int importThreadCount = config.getImportThreadCount();
221          message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
222                                                         BUILD_ID, REVISION_NUMBER);
223          logError(message);
224          message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
225          logError(message);
226          RuntimeInformation.logInfo();
227          for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
228            DNContext DNContext =  getImportContext(entryContainer);
229            if(DNContext != null) {
230              importMap.put(entryContainer.getBaseDN(), DNContext);
231            }
232          }
233          // Make a note of the time we started.
234          startTime = System.currentTimeMillis();
235          startWorkerThreads();
236          try {
237            importedCount = 0;
238            migratedCount = 0;
239            migrateExistingEntries();
240            processLDIF();
241            migrateExcludedEntries();
242          } finally {
243            if(!unCaughtExceptionThrown) {
244              cleanUp();
245              switchContainers();
246            }
247          }
248        }
249        finally {
250          reader.close();
251        }
252        importProlog(startTime);
253        return new LDIFImportResult(reader.getEntriesRead(),
254                                    reader.getEntriesRejected(),
255                                    reader.getEntriesIgnored());
256      }
257    
258      /**
259       * Switch containers if the migrated entries were written to the temporary
260       * container.
261       *
262       * @throws DatabaseException If a DB problem occurs.
263       * @throws JebException If a JEB problem occurs.
264       */
265      private void switchContainers() throws DatabaseException, JebException {
266    
267        for(DNContext importContext : importMap.values()) {
268          DN baseDN = importContext.getBaseDN();
269          EntryContainer srcEntryContainer =
270                  importContext.getSrcEntryContainer();
271          if(srcEntryContainer != null) {
272            if (debugEnabled()) {
273              TRACER.debugInfo("Deleteing old entry container for base DN " +
274                      "%s and renaming temp entry container", baseDN);
275            }
276            EntryContainer unregEC =
277                    rootContainer.unregisterEntryContainer(baseDN);
278            //Make sure the unregistered EC for the base DN is the same as
279            //the one in the import context.
280            if(unregEC != srcEntryContainer) {
281              if(debugEnabled()) {
282                TRACER.debugInfo("Current entry container used for base DN " +
283                        "%s is not the same as the source entry container used " +
284                        "during the migration process.", baseDN);
285              }
286              rootContainer.registerEntryContainer(baseDN, unregEC);
287              continue;
288            }
289            srcEntryContainer.lock();
290            srcEntryContainer.delete();
291            srcEntryContainer.unlock();
292            EntryContainer newEC = importContext.getEntryContainer();
293            newEC.lock();
294            newEC.setDatabasePrefix(baseDN.toNormalizedString());
295            newEC.unlock();
296            rootContainer.registerEntryContainer(baseDN, newEC);
297          }
298        }
299      }
300    
301      /**
302       * Create and log messages at the end of the successful import.
303       *
304       * @param startTime The time the import started.
305       */
306      private void importProlog(long startTime) {
307        Message message;
308        long finishTime = System.currentTimeMillis();
309        long importTime = (finishTime - startTime);
310    
311        float rate = 0;
312        if (importTime > 0)
313        {
314          rate = 1000f*importedCount / importTime;
315        }
316    
317        message = NOTE_JEB_IMPORT_FINAL_STATUS.
318            get(reader.getEntriesRead(), importedCount,
319                reader.getEntriesIgnored(), reader.getEntriesRejected(),
320                migratedCount, importTime/1000, rate);
321        logError(message);
322    
323        message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
324            getEntryLimitExceededCount());
325        logError(message);
326    
327      }
328    
329    
330      /**
331       * Run the cleaner if it is needed.
332       *
333       * @param entriesRead The number of entries read so far.
334       * @param evictEntryNumber The number of entries to run the cleaner after
335       * being read.
336       * @throws DatabaseException If a DB problem occurs.
337       */
338      private void
339      runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
340              throws DatabaseException {
341        if(!firstClean || (entriesRead %  evictEntryNumber) == 0) {
342          //Make sure work queue is empty before starting.
343          drainWorkQueue();
344          Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get();
345          runCleaner(msg);
346          if(!firstClean) {
347            firstClean=true;
348          }
349        }
350      }
351    
352      /**
353       * Run the cleaner, pausing the task thread output.
354       *
355       * @param header Message to be printed before cleaning.
356       * @throws DatabaseException If a DB problem occurs.
357       */
358      private void runCleaner(Message header) throws DatabaseException {
359        Message msg;
360        long startTime = System.currentTimeMillis();
361        //Need to force a checkpoint.
362        rootContainer.importForceCheckPoint();
363        logError(header);
364        pTask.setPause(true);
365        //Actually clean the files.
366        int cleaned = rootContainer.cleanedLogFiles();
367        //This checkpoint removes the files if any were cleaned.
368        if(cleaned > 0) {
369          msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
370          logError(msg);
371          rootContainer.importForceCheckPoint();
372        }
373        pTask.setPause(false);
374        long finishTime = System.currentTimeMillis();
375        long cleanTime = (finishTime - startTime) / 1000;
376        msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
377        logError(msg);
378      }
379    
380      /**
381       * Process a LDIF reader.
382       *
383       * @throws JebException If a JEB problem occurs.
384       * @throws DatabaseException If a DB problem occurs.
385       * @throws IOException If an IO exception occurs.
386       */
387      private void
388      processLDIF() throws JebException, DatabaseException, IOException {
389        Message message = NOTE_JEB_IMPORT_LDIF_START.get();
390        logError(message);
391        do {
392          if (ldifImportConfig.isCancelled()) {
393            break;
394          }
395          if(threads.size() <= 0) {
396            message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
397            throw new JebException(message);
398          }
399          if(unCaughtExceptionThrown) {
400            abortImport();
401          }
402          try {
403            // Read the next entry.
404            Entry entry = reader.readEntry();
405            // Check for end of file.
406            if (entry == null) {
407              message = NOTE_JEB_IMPORT_LDIF_END.get();
408              logError(message);
409    
410              break;
411            }
412            // Route it according to base DN.
413            DNContext DNContext = getImportConfig(entry.getDN());
414            processEntry(DNContext, entry);
415            //If the progress task has noticed eviction proceeding, start running
416            //the cleaner.
417            if(pTask.isEvicting()) {
418              runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
419            }
420          }  catch (LDIFException e) {
421            if (debugEnabled()) {
422              TRACER.debugCaught(DebugLogLevel.ERROR, e);
423            }
424          } catch (DirectoryException e) {
425            if (debugEnabled()) {
426              TRACER.debugCaught(DebugLogLevel.ERROR, e);
427            }
428          } catch (DatabaseException e)  {
429            if (debugEnabled()) {
430              TRACER.debugCaught(DebugLogLevel.ERROR, e);
431            }
432          }
433        } while (true);
434      }
435    
436      /**
437       * Process an entry using the specified import context.
438       *
439       * @param DNContext The import context.
440       * @param entry The entry to process.
441       */
442      private void processEntry(DNContext DNContext, Entry entry) {
443        //Add this DN to the pending map.
444        DNContext.addPending(entry.getDN());
445        addEntryQueue(DNContext, entry);
446      }
447    
448      /**
449       * Add work item to specified import context's queue.
450       * @param context The import context.
451       * @param item The work item to add.
452       * @return <CODE>True</CODE> if the the work  item was added to the queue.
453       */
454      private boolean
455      addQueue(DNContext context, WorkElement item) {
456        try {
457          while(!context.getWorkQueue().offer(item, 1000,
458                                                TimeUnit.MILLISECONDS)) {
459            if(threads.size() <= 0) {
460              // All worker threads died. We must stop now.
461              return false;
462            }
463          }
464        } catch (InterruptedException e) {
465          if (debugEnabled()) {
466            TRACER.debugCaught(DebugLogLevel.ERROR, e);
467          }
468        }
469        return true;
470      }
471    
472    
473      /**
474       * Wait until the work queue is empty.
475       */
476      private void drainWorkQueue() {
477        if(threads.size() > 0) {
478          for (DNContext context : importMap.values()) {
479            while (context.getWorkQueue().size() > 0) {
480              try {
481                Thread.sleep(100);
482              } catch (Exception e) {
483                // No action needed.
484              }
485            }
486          }
487        }
488      }
489    
490      private void abortImport() throws JebException {
491        //Stop work threads telling them to skip substring flush.
492         stopWorkThreads(false);
493         timer.cancel();
494         Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
495         throw new JebException(message);
496      }
497    
498      /**
499       * Stop work threads.
500       *
501       * @param abort <CODE>True</CODE> if stop work threads was called from an
502       *              abort.
503       * @throws JebException if a Jeb error occurs.
504       */
505      private void
506      stopWorkThreads(boolean abort) throws JebException {
507        for (WorkThread t : threads) {
508          t.stopProcessing();
509        }
510        // Wait for each thread to stop.
511        for (WorkThread t : threads) {
512          try {
513            if(!abort && unCaughtExceptionThrown) {
514              timer.cancel();
515              Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
516              throw new JebException(message);
517            }
518            t.join();
519            importedCount += t.getImportedCount();
520          } catch (InterruptedException ie) {
521            // No action needed?
522          }
523        }
524      }
525    
526      /**
527       * Clean up after a successful import.
528       *
529       * @throws DatabaseException If a DB error occurs.
530       * @throws JebException If a Jeb error occurs.
531       */
532      private void cleanUp() throws DatabaseException, JebException {
533         Message msg;
534        //Drain the work queue.
535        drainWorkQueue();
536        pTask.setPause(true);
537        long startTime = System.currentTimeMillis();
538        stopWorkThreads(true);
539        //Flush the buffer managers.
540        for(DNContext context : importMap.values()) {
541          context.getBufferManager().prepareFlush();
542          context.getBufferManager().flushAll();
543        }
544        long finishTime = System.currentTimeMillis();
545        long flushTime = (finishTime - startTime) / 1000;
546         msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
547        logError(msg);
548        timer.cancel();
549        for(DNContext context : importMap.values()) {
550          context.setIndexesTrusted();
551        }
552        msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
553        //Run the cleaner.
554        runCleaner(msg);
555      }
556    
557      /**
558       * Uncaught exception handler.
559       *
560       * @param t The thread working when the exception was thrown.
561       * @param e The exception.
562       */
563      public void uncaughtException(Thread t, Throwable e) {
564         unCaughtExceptionThrown = true;
565         threads.remove(t);
566         Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
567             t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
568         logError(msg);
569       }
570    
571      /**
572       * Get the entry limit exceeded counts from the indexes.
573       *
574       * @return Count of the index with entry limit exceeded values.
575       */
576      private int getEntryLimitExceededCount() {
577        int count = 0;
578        for (DNContext ic : importMap.values())
579        {
580          count += ic.getEntryContainer().getEntryLimitExceededCount();
581        }
582        return count;
583      }
584    
585      /**
586       * Return an import context related to the specified DN.
587       * @param dn The dn.
588       * @return  An import context.
589       * @throws DirectoryException If an directory error occurs.
590       */
591      private DNContext getImportConfig(DN dn) throws DirectoryException {
592        DNContext DNContext = null;
593        DN nodeDN = dn;
594    
595        while (DNContext == null && nodeDN != null) {
596          DNContext = importMap.get(nodeDN);
597          if (DNContext == null)
598          {
599            nodeDN = nodeDN.getParentDNInSuffix();
600          }
601        }
602    
603        if (nodeDN == null) {
604          // The entry should not have been given to this backend.
605          Message message =
606                  JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
607          throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
608        }
609    
610        return DNContext;
611      }
612    
613      /**
614       * Creates an import context for the specified entry container.
615       *
616       * @param entryContainer The entry container.
617       * @return Import context to use during import.
618       * @throws DatabaseException If a database error occurs.
619       * @throws JebException If a JEB error occurs.
620       * @throws ConfigException If a configuration contains error.
621       */
622       private DNContext getImportContext(EntryContainer entryContainer)
623          throws DatabaseException, JebException, ConfigException {
624        DN baseDN = entryContainer.getBaseDN();
625        EntryContainer srcEntryContainer = null;
626        List<DN> includeBranches = new ArrayList<DN>();
627        List<DN> excludeBranches = new ArrayList<DN>();
628    
629        if(!ldifImportConfig.appendToExistingData() &&
630            !ldifImportConfig.clearBackend())
631        {
632          for(DN dn : ldifImportConfig.getExcludeBranches())
633          {
634            if(baseDN.equals(dn))
635            {
636              // This entire base DN was explicitly excluded. Skip.
637              return null;
638            }
639            if(baseDN.isAncestorOf(dn))
640            {
641              excludeBranches.add(dn);
642            }
643          }
644    
645          if(!ldifImportConfig.getIncludeBranches().isEmpty())
646          {
647            for(DN dn : ldifImportConfig.getIncludeBranches())
648            {
649              if(baseDN.isAncestorOf(dn))
650              {
651                includeBranches.add(dn);
652              }
653            }
654    
655            if(includeBranches.isEmpty())
656            {
657              // There are no branches in the explicitly defined include list under
658              // this base DN. Skip this base DN alltogether.
659    
660              return null;
661            }
662    
663            // Remove any overlapping include branches.
664            Iterator<DN> includeBranchIterator = includeBranches.iterator();
665            while(includeBranchIterator.hasNext())
666            {
667              DN includeDN = includeBranchIterator.next();
668              boolean keep = true;
669              for(DN dn : includeBranches)
670              {
671                if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
672                {
673                  keep = false;
674                  break;
675                }
676              }
677              if(!keep)
678              {
679                includeBranchIterator.remove();
680              }
681            }
682    
683            // Remvoe any exclude branches that are not are not under a include
684            // branch since they will be migrated as part of the existing entries
685            // outside of the include branches anyways.
686            Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
687            while(excludeBranchIterator.hasNext())
688            {
689              DN excludeDN = excludeBranchIterator.next();
690              boolean keep = false;
691              for(DN includeDN : includeBranches)
692              {
693                if(includeDN.isAncestorOf(excludeDN))
694                {
695                  keep = true;
696                  break;
697                }
698              }
699              if(!keep)
700              {
701                excludeBranchIterator.remove();
702              }
703            }
704    
705            if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
706                includeBranches.get(0).equals(baseDN))
707            {
708              // This entire base DN is explicitly included in the import with
709              // no exclude branches that we need to migrate. Just clear the entry
710              // container.
711              entryContainer.lock();
712              entryContainer.clear();
713              entryContainer.unlock();
714            }
715            else
716            {
717              // Create a temp entry container
718              srcEntryContainer = entryContainer;
719              entryContainer =
720                  rootContainer.openEntryContainer(baseDN,
721                                                   baseDN.toNormalizedString() +
722                                                       "_importTmp");
723            }
724          }
725        }
726    
727        // Create an import context.
728        DNContext DNContext = new DNContext();
729        DNContext.setConfig(config);
730        DNContext.setLDIFImportConfig(this.ldifImportConfig);
731        DNContext.setLDIFReader(reader);
732    
733        DNContext.setBaseDN(baseDN);
734        DNContext.setEntryContainer(entryContainer);
735        DNContext.setSrcEntryContainer(srcEntryContainer);
736    
737        //Create queue.
738        LinkedBlockingQueue<WorkElement> works =
739            new LinkedBlockingQueue<WorkElement>
740                         (config.getImportQueueSize());
741        DNContext.setWorkQueue(works);
742    
743        // Set the include and exclude branches
744        DNContext.setIncludeBranches(includeBranches);
745        DNContext.setExcludeBranches(excludeBranches);
746    
747        return DNContext;
748      }
749    
750      /**
751       * Add specified context and entry to the work queue.
752       *
753       * @param context The context related to the entry DN.
754       * @param entry The entry to work on.
755       * @return  <CODE>True</CODE> if the element was added to the work queue.
756       */
757      private boolean
758      addEntryQueue(DNContext context,  Entry entry) {
759        WorkElement element =
760                WorkElement.decode(entry, context);
761        return addQueue(context, element);
762      }
763    
764      /**
765       * Calculate the memory usage for the substring buffer and the DB cache.
766       */
767      private void calcMemoryLimits() {
768        Message msg;
769        Runtime runtime = Runtime.getRuntime();
770        long freeMemory = runtime.freeMemory();
771        long maxMemory = runtime.maxMemory();
772        long totMemory = runtime.totalMemory();
773        long totFreeMemory = (freeMemory + (maxMemory - totMemory));
774        long dbCacheLimit = (totFreeMemory * 40) / 100;
775        dbCacheSizeStr = Long.toString(dbCacheLimit);
776        totalAvailBufferMemory = (totFreeMemory * 10) / 100;
777        if(totalAvailBufferMemory < (10 * minBuffer)) {
778           msg =
779              NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
780                                                          (10 * minBuffer));
781          logError(msg);
782          totalAvailBufferMemory = (10 * minBuffer);
783        }
784        msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
785                                                 totalAvailBufferMemory);
786        logError(msg);
787      }
788    
789      /**
790       * Return the string representation of the DB cache size.
791       *
792       * @return DB cache size string.
793       */
794      public String getDBCacheSize() {
795        return dbCacheSizeStr;
796      }
797    
798      /**
799       * Migrate any existing entries.
800       *
801       * @throws JebException If a JEB error occurs.
802       * @throws DatabaseException  If a DB error occurs.
803       * @throws DirectoryException If a directory error occurs.
804       */
805      private void migrateExistingEntries()
806          throws JebException, DatabaseException, DirectoryException {
807        for(DNContext context : importMap.values()) {
808          EntryContainer srcEntryContainer = context.getSrcEntryContainer();
809          if(srcEntryContainer != null &&
810              !context.getIncludeBranches().isEmpty()) {
811            DatabaseEntry key = new DatabaseEntry();
812            DatabaseEntry data = new DatabaseEntry();
813            LockMode lockMode = LockMode.DEFAULT;
814            OperationStatus status;
815            Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
816                "existing", String.valueOf(context.getBaseDN()));
817            logError(message);
818            Cursor cursor =
819                srcEntryContainer.getDN2ID().openCursor(null,
820                                                       CursorConfig.READ_COMMITTED);
821            try {
822              status = cursor.getFirst(key, data, lockMode);
823              while(status == OperationStatus.SUCCESS &&
824                    !ldifImportConfig.isCancelled()) {
825                if(threads.size() <= 0) {
826                  message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
827                  throw new JebException(message);
828                }
829                DN dn = DN.decode(new ASN1OctetString(key.getData()));
830                if(!context.getIncludeBranches().contains(dn)) {
831                  EntryID id = new EntryID(data);
832                  Entry entry =
833                      srcEntryContainer.getID2Entry().get(null,
834                          id, LockMode.DEFAULT);
835                  processEntry(context, entry);
836                  migratedCount++;
837                  status = cursor.getNext(key, data, lockMode);
838                }  else {
839                  // This is the base entry for a branch that will be included
840                  // in the import so we don't want to copy the branch to the new
841                  // entry container.
842    
843                  /**
844                   * Advance the cursor to next entry at the same level in the DIT
845                   * skipping all the entries in this branch.
846                   * Set the next starting value to a value of equal length but
847                   * slightly greater than the previous DN. Since keys are compared
848                   * in reverse order we must set the first byte (the comma).
849                   * No possibility of overflow here.
850                   */
851                  byte[] begin =
852                      StaticUtils.getBytes("," + dn.toNormalizedString());
853                  begin[0] = (byte) (begin[0] + 1);
854                  key.setData(begin);
855                  status = cursor.getSearchKeyRange(key, data, lockMode);
856                }
857              }
858            } finally {
859              cursor.close();
860            }
861          }
862        }
863      }
864    
865    
866      /**
867       * Migrate excluded entries.
868       *
869       * @throws JebException If a JEB error occurs.
870       * @throws DatabaseException  If a DB error occurs.
871       * @throws DirectoryException If a directory error occurs.
872       */
873      private void migrateExcludedEntries()
874          throws JebException, DatabaseException, DirectoryException {
875        for(DNContext importContext : importMap.values()) {
876          EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
877          if(srcEntryContainer != null &&
878              !importContext.getExcludeBranches().isEmpty()) {
879            DatabaseEntry key = new DatabaseEntry();
880            DatabaseEntry data = new DatabaseEntry();
881            LockMode lockMode = LockMode.DEFAULT;
882            OperationStatus status;
883            Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
884                "excluded", String.valueOf(importContext.getBaseDN()));
885            logError(message);
886            Cursor cursor =
887                srcEntryContainer.getDN2ID().openCursor(null,
888                                                       CursorConfig.READ_COMMITTED);
889            Comparator<byte[]> dn2idComparator =
890                srcEntryContainer.getDN2ID().getComparator();
891            try {
892              for(DN excludedDN : importContext.getExcludeBranches()) {
893                byte[] suffix =
894                    StaticUtils.getBytes(excludedDN.toNormalizedString());
895                key.setData(suffix);
896                status = cursor.getSearchKeyRange(key, data, lockMode);
897                if(status == OperationStatus.SUCCESS &&
898                    Arrays.equals(key.getData(), suffix)) {
899                  // This is the base entry for a branch that was excluded in the
900                  // import so we must migrate all entries in this branch over to
901                  // the new entry container.
902                  byte[] end =
903                      StaticUtils.getBytes("," + excludedDN.toNormalizedString());
904                  end[0] = (byte) (end[0] + 1);
905    
906                  while(status == OperationStatus.SUCCESS &&
907                      dn2idComparator.compare(key.getData(), end) < 0 &&
908                      !ldifImportConfig.isCancelled()) {
909                    if(threads.size() <= 0) {
910                      message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
911                      throw new JebException(message);
912                    }
913                    EntryID id = new EntryID(data);
914                    Entry entry = srcEntryContainer.getID2Entry().get(null,
915                        id, LockMode.DEFAULT);
916                    processEntry(importContext, entry);
917                    migratedCount++;
918                    status = cursor.getNext(key, data, lockMode);
919                  }
920                }
921              }
922            }
923            finally
924            {
925              cursor.close();
926            }
927          }
928        }
929      }
930    
931    
932      /**
933       * This class reports progress of the import job at fixed intervals.
934       */
935      private final class ProgressTask extends TimerTask
936      {
937        /**
938         * The number of entries that had been read at the time of the
939         * previous progress report.
940         */
941        private long previousCount = 0;
942    
943        /**
944         * The time in milliseconds of the previous progress report.
945         */
946        private long previousTime;
947    
948        /**
949         * The environment statistics at the time of the previous report.
950         */
951        private EnvironmentStats prevEnvStats;
952    
953        /**
954         * The number of bytes in a megabyte.
955         * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
956         */
957        public static final int bytesPerMegabyte = 1024*1024;
958    
959        //Determines if the ldif is being read.
960        private boolean ldifRead = false;
961    
962        //Determines if eviction has been detected.
963        private boolean evicting = false;
964    
965        //Entry count when eviction was detected.
966        private long evictionEntryCount = 0;
967    
968        //Suspend output.
969        private boolean pause = false;
970    
971        /**
972         * Create a new import progress task.
973         * @throws DatabaseException If an error occurs in the JE database.
974         */
975        public ProgressTask() throws DatabaseException
976        {
977          previousTime = System.currentTimeMillis();
978          prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
979        }
980    
981        /**
982         * Return if reading the LDIF file.
983         */
984        public void ldifRead() {
985          ldifRead=true;
986        }
987    
988       /**
989        * Return value of evicting flag.
990        *
991        * @return <CODE>True</CODE> if eviction is detected.
992        */
993        public  boolean isEvicting() {
994         return evicting;
995        }
996    
997        /**
998         * Return count of entries when eviction was detected.
999         *
1000         * @return  The entry count when eviction was detected.
1001         */
1002        public long getEvictionEntryCount() {
1003          return evictionEntryCount;
1004        }
1005    
1006        /**
1007         * Suspend output if true.
1008         *
1009         * @param v The value to set the suspend value to.
1010         */
1011        public void setPause(boolean v) {
1012        pause=v;
1013       }
1014    
1015        /**
1016         * The action to be performed by this timer task.
1017         */
1018        public void run() {
1019          long latestCount = reader.getEntriesRead() + 0;
1020          long deltaCount = (latestCount - previousCount);
1021          long latestTime = System.currentTimeMillis();
1022          long deltaTime = latestTime - previousTime;
1023          Message message;
1024          if (deltaTime == 0) {
1025            return;
1026          }
1027          if(pause) {
1028            return;
1029          }
1030          if(!ldifRead) {
1031            long numRead     = reader.getEntriesRead();
1032            long numIgnored  = reader.getEntriesIgnored();
1033            long numRejected = reader.getEntriesRejected();
1034             float rate = 1000f*deltaCount / deltaTime;
1035             message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(
1036                numRead, numIgnored, numRejected, 0, rate);
1037            logError(message);
1038          }
1039          try
1040          {
1041            Runtime runtime = Runtime.getRuntime();
1042            long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
1043            EnvironmentStats envStats =
1044                rootContainer.getEnvironmentStats(new StatsConfig());
1045            long nCacheMiss =
1046                envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
1047    
1048            float cacheMissRate = 0;
1049            if (deltaCount > 0) {
1050              cacheMissRate = nCacheMiss/(float)deltaCount;
1051            }
1052            message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
1053                freeMemory, cacheMissRate);
1054            logError(message);
1055            long evictPasses = envStats.getNEvictPasses();
1056            long evictNodes = envStats.getNNodesExplicitlyEvicted();
1057            long evictBinsStrip = envStats.getNBINsStripped();
1058            int cleanerRuns = envStats.getNCleanerRuns();
1059            int cleanerDeletions = envStats.getNCleanerDeletions();
1060            int cleanerEntriesRead = envStats.getNCleanerEntriesRead();
1061            int cleanerINCleaned = envStats.getNINsCleaned();
1062            int checkPoints = envStats.getNCheckpoints();
1063            if(evictPasses != 0) {
1064              if(!evicting) {
1065                evicting=true;
1066                if(!ldifRead) {
1067                  evictionEntryCount=reader.getEntriesRead();
1068                  message =
1069                     NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
1070                  logError(message);
1071                }
1072              }
1073              message =
1074                      NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
1075                              evictNodes, evictBinsStrip);
1076              logError(message);
1077            }
1078            if(cleanerRuns != 0) {
1079              message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
1080                      cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
1081              logError(message);
1082            }
1083            if(checkPoints  > 1) {
1084              message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
1085              logError(message);
1086            }
1087            prevEnvStats = envStats;
1088          } catch (DatabaseException e) {
1089            // Unlikely to happen and not critical.
1090          }
1091          previousCount = latestCount;
1092          previousTime = latestTime;
1093        }
1094      }
1095    }
1096