001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2008 Sun Microsystems, Inc. 026 */ 027 028 package org.opends.server.backends.jeb.importLDIF; 029 030 import org.opends.server.types.*; 031 import org.opends.server.loggers.debug.DebugTracer; 032 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 034 import static org.opends.server.loggers.ErrorLogger.logError; 035 import org.opends.server.admin.std.server.LocalDBBackendCfg; 036 import org.opends.server.util.LDIFReader; 037 import org.opends.server.util.StaticUtils; 038 import org.opends.server.util.LDIFException; 039 import org.opends.server.util.RuntimeInformation; 040 import static org.opends.server.util.DynamicConstants.BUILD_ID; 041 import static org.opends.server.util.DynamicConstants.REVISION_NUMBER; 042 import org.opends.server.config.ConfigException; 043 import org.opends.server.core.DirectoryServer; 044 import org.opends.server.backends.jeb.*; 045 import org.opends.server.protocols.asn1.ASN1OctetString; 046 import org.opends.messages.Message; 047 import org.opends.messages.JebMessages; 048 import static org.opends.messages.JebMessages.*; 049 import java.util.concurrent.CopyOnWriteArrayList; 050 import java.util.concurrent.LinkedBlockingQueue; 051 import java.util.concurrent.TimeUnit; 052 import java.util.*; 053 import java.io.IOException; 054 055 import com.sleepycat.je.*; 056 057 /** 058 * Performs a LDIF import. 059 */ 060 061 public class Importer implements Thread.UncaughtExceptionHandler { 062 063 064 /** 065 * The tracer object for the debug logger. 066 */ 067 private static final DebugTracer TRACER = getTracer(); 068 069 /** 070 * The JE backend configuration. 071 */ 072 private LocalDBBackendCfg config; 073 074 /** 075 * The root container used for this import job. 076 */ 077 private RootContainer rootContainer; 078 079 /** 080 * The LDIF import configuration. 081 */ 082 private LDIFImportConfig ldifImportConfig; 083 084 /** 085 * The LDIF reader. 086 */ 087 private LDIFReader reader; 088 089 /** 090 * Map of base DNs to their import context. 091 */ 092 private LinkedHashMap<DN, DNContext> importMap = 093 new LinkedHashMap<DN, DNContext>(); 094 095 096 /** 097 * The number of entries migrated. 098 */ 099 private int migratedCount; 100 101 /** 102 * The number of entries imported. 103 */ 104 private int importedCount; 105 106 /** 107 * The number of milliseconds between job progress reports. 108 */ 109 private long progressInterval = 10000; 110 111 /** 112 * The progress report timer. 113 */ 114 private Timer timer; 115 116 //Thread array. 117 private CopyOnWriteArrayList<WorkThread> threads; 118 119 //Progress task. 120 private ProgressTask pTask; 121 122 //Number of entries import before checking if cleaning is needed after 123 //eviction has been detected. 124 private static final int entryCleanInterval = 250000; 125 126 //Minimum buffer amount to give to a buffer manager. 127 private static final long minBuffer = 1024 * 1024; 128 129 //Total available memory for the buffer managers. 130 private long totalAvailBufferMemory = 0; 131 132 //Memory size to be used for the DB cache in string format. 133 private String dbCacheSizeStr; 134 135 //Used to do an initial clean after eviction has been detected. 136 private boolean firstClean=false; 137 138 //A thread threw an Runtime exception stop the import. 139 private boolean unCaughtExceptionThrown = false; 140 141 /** 142 * Create a new import job with the specified ldif import config. 143 * 144 * @param ldifImportConfig The LDIF import config. 145 */ 146 public Importer(LDIFImportConfig ldifImportConfig) 147 { 148 this.ldifImportConfig = ldifImportConfig; 149 this.threads = new CopyOnWriteArrayList<WorkThread>(); 150 calcMemoryLimits(); 151 } 152 153 /** 154 * Start the worker threads. 155 * 156 * @throws DatabaseException If a DB problem occurs. 157 */ 158 private void startWorkerThreads() 159 throws DatabaseException { 160 161 int importThreadCount = config.getImportThreadCount(); 162 //Figure out how much buffer memory to give to each context. 163 int contextCount = importMap.size(); 164 long memoryPerContext = totalAvailBufferMemory / contextCount; 165 //Below min, use the min value. 166 if(memoryPerContext < minBuffer) { 167 Message msg = 168 NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext, 169 minBuffer); 170 logError(msg); 171 memoryPerContext = minBuffer; 172 } 173 // Create one set of worker threads/buffer managers for each base DN. 174 for (DNContext context : importMap.values()) { 175 BufferManager bufferManager = new BufferManager(memoryPerContext, 176 importThreadCount); 177 context.setBufferManager(bufferManager); 178 for (int i = 0; i < importThreadCount; i++) { 179 WorkThread t = new WorkThread(context.getWorkQueue(), i, 180 bufferManager, rootContainer); 181 t.setUncaughtExceptionHandler(this); 182 threads.add(t); 183 t.start(); 184 } 185 } 186 // Start a timer for the progress report. 187 timer = new Timer(); 188 TimerTask progressTask = new ProgressTask(); 189 //Used to get at extra functionality such as eviction detected. 190 pTask = (ProgressTask) progressTask; 191 timer.scheduleAtFixedRate(progressTask, progressInterval, 192 progressInterval); 193 194 } 195 196 197 /** 198 * Import a ldif using the specified root container. 199 * 200 * @param rootContainer The root container. 201 * @return A LDIF result. 202 * @throws DatabaseException If a DB error occurs. 203 * @throws IOException If a IO error occurs. 204 * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs. 205 * @throws DirectoryException If a directory error occurs. 206 * @throws ConfigException If a configuration has an error. 207 */ 208 public LDIFImportResult processImport(RootContainer rootContainer) 209 throws DatabaseException, IOException, JebException, DirectoryException, 210 ConfigException { 211 212 // Create an LDIF reader. Throws an exception if the file does not exist. 213 reader = new LDIFReader(ldifImportConfig); 214 this.rootContainer = rootContainer; 215 this.config = rootContainer.getConfiguration(); 216 217 Message message; 218 long startTime; 219 try { 220 int importThreadCount = config.getImportThreadCount(); 221 message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(), 222 BUILD_ID, REVISION_NUMBER); 223 logError(message); 224 message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount); 225 logError(message); 226 RuntimeInformation.logInfo(); 227 for (EntryContainer entryContainer : rootContainer.getEntryContainers()) { 228 DNContext DNContext = getImportContext(entryContainer); 229 if(DNContext != null) { 230 importMap.put(entryContainer.getBaseDN(), DNContext); 231 } 232 } 233 // Make a note of the time we started. 234 startTime = System.currentTimeMillis(); 235 startWorkerThreads(); 236 try { 237 importedCount = 0; 238 migratedCount = 0; 239 migrateExistingEntries(); 240 processLDIF(); 241 migrateExcludedEntries(); 242 } finally { 243 if(!unCaughtExceptionThrown) { 244 cleanUp(); 245 switchContainers(); 246 } 247 } 248 } 249 finally { 250 reader.close(); 251 } 252 importProlog(startTime); 253 return new LDIFImportResult(reader.getEntriesRead(), 254 reader.getEntriesRejected(), 255 reader.getEntriesIgnored()); 256 } 257 258 /** 259 * Switch containers if the migrated entries were written to the temporary 260 * container. 261 * 262 * @throws DatabaseException If a DB problem occurs. 263 * @throws JebException If a JEB problem occurs. 264 */ 265 private void switchContainers() throws DatabaseException, JebException { 266 267 for(DNContext importContext : importMap.values()) { 268 DN baseDN = importContext.getBaseDN(); 269 EntryContainer srcEntryContainer = 270 importContext.getSrcEntryContainer(); 271 if(srcEntryContainer != null) { 272 if (debugEnabled()) { 273 TRACER.debugInfo("Deleteing old entry container for base DN " + 274 "%s and renaming temp entry container", baseDN); 275 } 276 EntryContainer unregEC = 277 rootContainer.unregisterEntryContainer(baseDN); 278 //Make sure the unregistered EC for the base DN is the same as 279 //the one in the import context. 280 if(unregEC != srcEntryContainer) { 281 if(debugEnabled()) { 282 TRACER.debugInfo("Current entry container used for base DN " + 283 "%s is not the same as the source entry container used " + 284 "during the migration process.", baseDN); 285 } 286 rootContainer.registerEntryContainer(baseDN, unregEC); 287 continue; 288 } 289 srcEntryContainer.lock(); 290 srcEntryContainer.delete(); 291 srcEntryContainer.unlock(); 292 EntryContainer newEC = importContext.getEntryContainer(); 293 newEC.lock(); 294 newEC.setDatabasePrefix(baseDN.toNormalizedString()); 295 newEC.unlock(); 296 rootContainer.registerEntryContainer(baseDN, newEC); 297 } 298 } 299 } 300 301 /** 302 * Create and log messages at the end of the successful import. 303 * 304 * @param startTime The time the import started. 305 */ 306 private void importProlog(long startTime) { 307 Message message; 308 long finishTime = System.currentTimeMillis(); 309 long importTime = (finishTime - startTime); 310 311 float rate = 0; 312 if (importTime > 0) 313 { 314 rate = 1000f*importedCount / importTime; 315 } 316 317 message = NOTE_JEB_IMPORT_FINAL_STATUS. 318 get(reader.getEntriesRead(), importedCount, 319 reader.getEntriesIgnored(), reader.getEntriesRejected(), 320 migratedCount, importTime/1000, rate); 321 logError(message); 322 323 message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get( 324 getEntryLimitExceededCount()); 325 logError(message); 326 327 } 328 329 330 /** 331 * Run the cleaner if it is needed. 332 * 333 * @param entriesRead The number of entries read so far. 334 * @param evictEntryNumber The number of entries to run the cleaner after 335 * being read. 336 * @throws DatabaseException If a DB problem occurs. 337 */ 338 private void 339 runCleanerIfNeeded(long entriesRead, long evictEntryNumber) 340 throws DatabaseException { 341 if(!firstClean || (entriesRead % evictEntryNumber) == 0) { 342 //Make sure work queue is empty before starting. 343 drainWorkQueue(); 344 Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get(); 345 runCleaner(msg); 346 if(!firstClean) { 347 firstClean=true; 348 } 349 } 350 } 351 352 /** 353 * Run the cleaner, pausing the task thread output. 354 * 355 * @param header Message to be printed before cleaning. 356 * @throws DatabaseException If a DB problem occurs. 357 */ 358 private void runCleaner(Message header) throws DatabaseException { 359 Message msg; 360 long startTime = System.currentTimeMillis(); 361 //Need to force a checkpoint. 362 rootContainer.importForceCheckPoint(); 363 logError(header); 364 pTask.setPause(true); 365 //Actually clean the files. 366 int cleaned = rootContainer.cleanedLogFiles(); 367 //This checkpoint removes the files if any were cleaned. 368 if(cleaned > 0) { 369 msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned); 370 logError(msg); 371 rootContainer.importForceCheckPoint(); 372 } 373 pTask.setPause(false); 374 long finishTime = System.currentTimeMillis(); 375 long cleanTime = (finishTime - startTime) / 1000; 376 msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned); 377 logError(msg); 378 } 379 380 /** 381 * Process a LDIF reader. 382 * 383 * @throws JebException If a JEB problem occurs. 384 * @throws DatabaseException If a DB problem occurs. 385 * @throws IOException If an IO exception occurs. 386 */ 387 private void 388 processLDIF() throws JebException, DatabaseException, IOException { 389 Message message = NOTE_JEB_IMPORT_LDIF_START.get(); 390 logError(message); 391 do { 392 if (ldifImportConfig.isCancelled()) { 393 break; 394 } 395 if(threads.size() <= 0) { 396 message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); 397 throw new JebException(message); 398 } 399 if(unCaughtExceptionThrown) { 400 abortImport(); 401 } 402 try { 403 // Read the next entry. 404 Entry entry = reader.readEntry(); 405 // Check for end of file. 406 if (entry == null) { 407 message = NOTE_JEB_IMPORT_LDIF_END.get(); 408 logError(message); 409 410 break; 411 } 412 // Route it according to base DN. 413 DNContext DNContext = getImportConfig(entry.getDN()); 414 processEntry(DNContext, entry); 415 //If the progress task has noticed eviction proceeding, start running 416 //the cleaner. 417 if(pTask.isEvicting()) { 418 runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval); 419 } 420 } catch (LDIFException e) { 421 if (debugEnabled()) { 422 TRACER.debugCaught(DebugLogLevel.ERROR, e); 423 } 424 } catch (DirectoryException e) { 425 if (debugEnabled()) { 426 TRACER.debugCaught(DebugLogLevel.ERROR, e); 427 } 428 } catch (DatabaseException e) { 429 if (debugEnabled()) { 430 TRACER.debugCaught(DebugLogLevel.ERROR, e); 431 } 432 } 433 } while (true); 434 } 435 436 /** 437 * Process an entry using the specified import context. 438 * 439 * @param DNContext The import context. 440 * @param entry The entry to process. 441 */ 442 private void processEntry(DNContext DNContext, Entry entry) { 443 //Add this DN to the pending map. 444 DNContext.addPending(entry.getDN()); 445 addEntryQueue(DNContext, entry); 446 } 447 448 /** 449 * Add work item to specified import context's queue. 450 * @param context The import context. 451 * @param item The work item to add. 452 * @return <CODE>True</CODE> if the the work item was added to the queue. 453 */ 454 private boolean 455 addQueue(DNContext context, WorkElement item) { 456 try { 457 while(!context.getWorkQueue().offer(item, 1000, 458 TimeUnit.MILLISECONDS)) { 459 if(threads.size() <= 0) { 460 // All worker threads died. We must stop now. 461 return false; 462 } 463 } 464 } catch (InterruptedException e) { 465 if (debugEnabled()) { 466 TRACER.debugCaught(DebugLogLevel.ERROR, e); 467 } 468 } 469 return true; 470 } 471 472 473 /** 474 * Wait until the work queue is empty. 475 */ 476 private void drainWorkQueue() { 477 if(threads.size() > 0) { 478 for (DNContext context : importMap.values()) { 479 while (context.getWorkQueue().size() > 0) { 480 try { 481 Thread.sleep(100); 482 } catch (Exception e) { 483 // No action needed. 484 } 485 } 486 } 487 } 488 } 489 490 private void abortImport() throws JebException { 491 //Stop work threads telling them to skip substring flush. 492 stopWorkThreads(false); 493 timer.cancel(); 494 Message message = ERR_JEB_IMPORT_LDIF_ABORT.get(); 495 throw new JebException(message); 496 } 497 498 /** 499 * Stop work threads. 500 * 501 * @param abort <CODE>True</CODE> if stop work threads was called from an 502 * abort. 503 * @throws JebException if a Jeb error occurs. 504 */ 505 private void 506 stopWorkThreads(boolean abort) throws JebException { 507 for (WorkThread t : threads) { 508 t.stopProcessing(); 509 } 510 // Wait for each thread to stop. 511 for (WorkThread t : threads) { 512 try { 513 if(!abort && unCaughtExceptionThrown) { 514 timer.cancel(); 515 Message message = ERR_JEB_IMPORT_LDIF_ABORT.get(); 516 throw new JebException(message); 517 } 518 t.join(); 519 importedCount += t.getImportedCount(); 520 } catch (InterruptedException ie) { 521 // No action needed? 522 } 523 } 524 } 525 526 /** 527 * Clean up after a successful import. 528 * 529 * @throws DatabaseException If a DB error occurs. 530 * @throws JebException If a Jeb error occurs. 531 */ 532 private void cleanUp() throws DatabaseException, JebException { 533 Message msg; 534 //Drain the work queue. 535 drainWorkQueue(); 536 pTask.setPause(true); 537 long startTime = System.currentTimeMillis(); 538 stopWorkThreads(true); 539 //Flush the buffer managers. 540 for(DNContext context : importMap.values()) { 541 context.getBufferManager().prepareFlush(); 542 context.getBufferManager().flushAll(); 543 } 544 long finishTime = System.currentTimeMillis(); 545 long flushTime = (finishTime - startTime) / 1000; 546 msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime); 547 logError(msg); 548 timer.cancel(); 549 for(DNContext context : importMap.values()) { 550 context.setIndexesTrusted(); 551 } 552 msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get(); 553 //Run the cleaner. 554 runCleaner(msg); 555 } 556 557 /** 558 * Uncaught exception handler. 559 * 560 * @param t The thread working when the exception was thrown. 561 * @param e The exception. 562 */ 563 public void uncaughtException(Thread t, Throwable e) { 564 unCaughtExceptionThrown = true; 565 threads.remove(t); 566 Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get( 567 t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause())); 568 logError(msg); 569 } 570 571 /** 572 * Get the entry limit exceeded counts from the indexes. 573 * 574 * @return Count of the index with entry limit exceeded values. 575 */ 576 private int getEntryLimitExceededCount() { 577 int count = 0; 578 for (DNContext ic : importMap.values()) 579 { 580 count += ic.getEntryContainer().getEntryLimitExceededCount(); 581 } 582 return count; 583 } 584 585 /** 586 * Return an import context related to the specified DN. 587 * @param dn The dn. 588 * @return An import context. 589 * @throws DirectoryException If an directory error occurs. 590 */ 591 private DNContext getImportConfig(DN dn) throws DirectoryException { 592 DNContext DNContext = null; 593 DN nodeDN = dn; 594 595 while (DNContext == null && nodeDN != null) { 596 DNContext = importMap.get(nodeDN); 597 if (DNContext == null) 598 { 599 nodeDN = nodeDN.getParentDNInSuffix(); 600 } 601 } 602 603 if (nodeDN == null) { 604 // The entry should not have been given to this backend. 605 Message message = 606 JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn)); 607 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 608 } 609 610 return DNContext; 611 } 612 613 /** 614 * Creates an import context for the specified entry container. 615 * 616 * @param entryContainer The entry container. 617 * @return Import context to use during import. 618 * @throws DatabaseException If a database error occurs. 619 * @throws JebException If a JEB error occurs. 620 * @throws ConfigException If a configuration contains error. 621 */ 622 private DNContext getImportContext(EntryContainer entryContainer) 623 throws DatabaseException, JebException, ConfigException { 624 DN baseDN = entryContainer.getBaseDN(); 625 EntryContainer srcEntryContainer = null; 626 List<DN> includeBranches = new ArrayList<DN>(); 627 List<DN> excludeBranches = new ArrayList<DN>(); 628 629 if(!ldifImportConfig.appendToExistingData() && 630 !ldifImportConfig.clearBackend()) 631 { 632 for(DN dn : ldifImportConfig.getExcludeBranches()) 633 { 634 if(baseDN.equals(dn)) 635 { 636 // This entire base DN was explicitly excluded. Skip. 637 return null; 638 } 639 if(baseDN.isAncestorOf(dn)) 640 { 641 excludeBranches.add(dn); 642 } 643 } 644 645 if(!ldifImportConfig.getIncludeBranches().isEmpty()) 646 { 647 for(DN dn : ldifImportConfig.getIncludeBranches()) 648 { 649 if(baseDN.isAncestorOf(dn)) 650 { 651 includeBranches.add(dn); 652 } 653 } 654 655 if(includeBranches.isEmpty()) 656 { 657 // There are no branches in the explicitly defined include list under 658 // this base DN. Skip this base DN alltogether. 659 660 return null; 661 } 662 663 // Remove any overlapping include branches. 664 Iterator<DN> includeBranchIterator = includeBranches.iterator(); 665 while(includeBranchIterator.hasNext()) 666 { 667 DN includeDN = includeBranchIterator.next(); 668 boolean keep = true; 669 for(DN dn : includeBranches) 670 { 671 if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) 672 { 673 keep = false; 674 break; 675 } 676 } 677 if(!keep) 678 { 679 includeBranchIterator.remove(); 680 } 681 } 682 683 // Remvoe any exclude branches that are not are not under a include 684 // branch since they will be migrated as part of the existing entries 685 // outside of the include branches anyways. 686 Iterator<DN> excludeBranchIterator = excludeBranches.iterator(); 687 while(excludeBranchIterator.hasNext()) 688 { 689 DN excludeDN = excludeBranchIterator.next(); 690 boolean keep = false; 691 for(DN includeDN : includeBranches) 692 { 693 if(includeDN.isAncestorOf(excludeDN)) 694 { 695 keep = true; 696 break; 697 } 698 } 699 if(!keep) 700 { 701 excludeBranchIterator.remove(); 702 } 703 } 704 705 if(includeBranches.size() == 1 && excludeBranches.size() == 0 && 706 includeBranches.get(0).equals(baseDN)) 707 { 708 // This entire base DN is explicitly included in the import with 709 // no exclude branches that we need to migrate. Just clear the entry 710 // container. 711 entryContainer.lock(); 712 entryContainer.clear(); 713 entryContainer.unlock(); 714 } 715 else 716 { 717 // Create a temp entry container 718 srcEntryContainer = entryContainer; 719 entryContainer = 720 rootContainer.openEntryContainer(baseDN, 721 baseDN.toNormalizedString() + 722 "_importTmp"); 723 } 724 } 725 } 726 727 // Create an import context. 728 DNContext DNContext = new DNContext(); 729 DNContext.setConfig(config); 730 DNContext.setLDIFImportConfig(this.ldifImportConfig); 731 DNContext.setLDIFReader(reader); 732 733 DNContext.setBaseDN(baseDN); 734 DNContext.setEntryContainer(entryContainer); 735 DNContext.setSrcEntryContainer(srcEntryContainer); 736 737 //Create queue. 738 LinkedBlockingQueue<WorkElement> works = 739 new LinkedBlockingQueue<WorkElement> 740 (config.getImportQueueSize()); 741 DNContext.setWorkQueue(works); 742 743 // Set the include and exclude branches 744 DNContext.setIncludeBranches(includeBranches); 745 DNContext.setExcludeBranches(excludeBranches); 746 747 return DNContext; 748 } 749 750 /** 751 * Add specified context and entry to the work queue. 752 * 753 * @param context The context related to the entry DN. 754 * @param entry The entry to work on. 755 * @return <CODE>True</CODE> if the element was added to the work queue. 756 */ 757 private boolean 758 addEntryQueue(DNContext context, Entry entry) { 759 WorkElement element = 760 WorkElement.decode(entry, context); 761 return addQueue(context, element); 762 } 763 764 /** 765 * Calculate the memory usage for the substring buffer and the DB cache. 766 */ 767 private void calcMemoryLimits() { 768 Message msg; 769 Runtime runtime = Runtime.getRuntime(); 770 long freeMemory = runtime.freeMemory(); 771 long maxMemory = runtime.maxMemory(); 772 long totMemory = runtime.totalMemory(); 773 long totFreeMemory = (freeMemory + (maxMemory - totMemory)); 774 long dbCacheLimit = (totFreeMemory * 40) / 100; 775 dbCacheSizeStr = Long.toString(dbCacheLimit); 776 totalAvailBufferMemory = (totFreeMemory * 10) / 100; 777 if(totalAvailBufferMemory < (10 * minBuffer)) { 778 msg = 779 NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory, 780 (10 * minBuffer)); 781 logError(msg); 782 totalAvailBufferMemory = (10 * minBuffer); 783 } 784 msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit, 785 totalAvailBufferMemory); 786 logError(msg); 787 } 788 789 /** 790 * Return the string representation of the DB cache size. 791 * 792 * @return DB cache size string. 793 */ 794 public String getDBCacheSize() { 795 return dbCacheSizeStr; 796 } 797 798 /** 799 * Migrate any existing entries. 800 * 801 * @throws JebException If a JEB error occurs. 802 * @throws DatabaseException If a DB error occurs. 803 * @throws DirectoryException If a directory error occurs. 804 */ 805 private void migrateExistingEntries() 806 throws JebException, DatabaseException, DirectoryException { 807 for(DNContext context : importMap.values()) { 808 EntryContainer srcEntryContainer = context.getSrcEntryContainer(); 809 if(srcEntryContainer != null && 810 !context.getIncludeBranches().isEmpty()) { 811 DatabaseEntry key = new DatabaseEntry(); 812 DatabaseEntry data = new DatabaseEntry(); 813 LockMode lockMode = LockMode.DEFAULT; 814 OperationStatus status; 815 Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( 816 "existing", String.valueOf(context.getBaseDN())); 817 logError(message); 818 Cursor cursor = 819 srcEntryContainer.getDN2ID().openCursor(null, 820 CursorConfig.READ_COMMITTED); 821 try { 822 status = cursor.getFirst(key, data, lockMode); 823 while(status == OperationStatus.SUCCESS && 824 !ldifImportConfig.isCancelled()) { 825 if(threads.size() <= 0) { 826 message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); 827 throw new JebException(message); 828 } 829 DN dn = DN.decode(new ASN1OctetString(key.getData())); 830 if(!context.getIncludeBranches().contains(dn)) { 831 EntryID id = new EntryID(data); 832 Entry entry = 833 srcEntryContainer.getID2Entry().get(null, 834 id, LockMode.DEFAULT); 835 processEntry(context, entry); 836 migratedCount++; 837 status = cursor.getNext(key, data, lockMode); 838 } else { 839 // This is the base entry for a branch that will be included 840 // in the import so we don't want to copy the branch to the new 841 // entry container. 842 843 /** 844 * Advance the cursor to next entry at the same level in the DIT 845 * skipping all the entries in this branch. 846 * Set the next starting value to a value of equal length but 847 * slightly greater than the previous DN. Since keys are compared 848 * in reverse order we must set the first byte (the comma). 849 * No possibility of overflow here. 850 */ 851 byte[] begin = 852 StaticUtils.getBytes("," + dn.toNormalizedString()); 853 begin[0] = (byte) (begin[0] + 1); 854 key.setData(begin); 855 status = cursor.getSearchKeyRange(key, data, lockMode); 856 } 857 } 858 } finally { 859 cursor.close(); 860 } 861 } 862 } 863 } 864 865 866 /** 867 * Migrate excluded entries. 868 * 869 * @throws JebException If a JEB error occurs. 870 * @throws DatabaseException If a DB error occurs. 871 * @throws DirectoryException If a directory error occurs. 872 */ 873 private void migrateExcludedEntries() 874 throws JebException, DatabaseException, DirectoryException { 875 for(DNContext importContext : importMap.values()) { 876 EntryContainer srcEntryContainer = importContext.getSrcEntryContainer(); 877 if(srcEntryContainer != null && 878 !importContext.getExcludeBranches().isEmpty()) { 879 DatabaseEntry key = new DatabaseEntry(); 880 DatabaseEntry data = new DatabaseEntry(); 881 LockMode lockMode = LockMode.DEFAULT; 882 OperationStatus status; 883 Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( 884 "excluded", String.valueOf(importContext.getBaseDN())); 885 logError(message); 886 Cursor cursor = 887 srcEntryContainer.getDN2ID().openCursor(null, 888 CursorConfig.READ_COMMITTED); 889 Comparator<byte[]> dn2idComparator = 890 srcEntryContainer.getDN2ID().getComparator(); 891 try { 892 for(DN excludedDN : importContext.getExcludeBranches()) { 893 byte[] suffix = 894 StaticUtils.getBytes(excludedDN.toNormalizedString()); 895 key.setData(suffix); 896 status = cursor.getSearchKeyRange(key, data, lockMode); 897 if(status == OperationStatus.SUCCESS && 898 Arrays.equals(key.getData(), suffix)) { 899 // This is the base entry for a branch that was excluded in the 900 // import so we must migrate all entries in this branch over to 901 // the new entry container. 902 byte[] end = 903 StaticUtils.getBytes("," + excludedDN.toNormalizedString()); 904 end[0] = (byte) (end[0] + 1); 905 906 while(status == OperationStatus.SUCCESS && 907 dn2idComparator.compare(key.getData(), end) < 0 && 908 !ldifImportConfig.isCancelled()) { 909 if(threads.size() <= 0) { 910 message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); 911 throw new JebException(message); 912 } 913 EntryID id = new EntryID(data); 914 Entry entry = srcEntryContainer.getID2Entry().get(null, 915 id, LockMode.DEFAULT); 916 processEntry(importContext, entry); 917 migratedCount++; 918 status = cursor.getNext(key, data, lockMode); 919 } 920 } 921 } 922 } 923 finally 924 { 925 cursor.close(); 926 } 927 } 928 } 929 } 930 931 932 /** 933 * This class reports progress of the import job at fixed intervals. 934 */ 935 private final class ProgressTask extends TimerTask 936 { 937 /** 938 * The number of entries that had been read at the time of the 939 * previous progress report. 940 */ 941 private long previousCount = 0; 942 943 /** 944 * The time in milliseconds of the previous progress report. 945 */ 946 private long previousTime; 947 948 /** 949 * The environment statistics at the time of the previous report. 950 */ 951 private EnvironmentStats prevEnvStats; 952 953 /** 954 * The number of bytes in a megabyte. 955 * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB). 956 */ 957 public static final int bytesPerMegabyte = 1024*1024; 958 959 //Determines if the ldif is being read. 960 private boolean ldifRead = false; 961 962 //Determines if eviction has been detected. 963 private boolean evicting = false; 964 965 //Entry count when eviction was detected. 966 private long evictionEntryCount = 0; 967 968 //Suspend output. 969 private boolean pause = false; 970 971 /** 972 * Create a new import progress task. 973 * @throws DatabaseException If an error occurs in the JE database. 974 */ 975 public ProgressTask() throws DatabaseException 976 { 977 previousTime = System.currentTimeMillis(); 978 prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig()); 979 } 980 981 /** 982 * Return if reading the LDIF file. 983 */ 984 public void ldifRead() { 985 ldifRead=true; 986 } 987 988 /** 989 * Return value of evicting flag. 990 * 991 * @return <CODE>True</CODE> if eviction is detected. 992 */ 993 public boolean isEvicting() { 994 return evicting; 995 } 996 997 /** 998 * Return count of entries when eviction was detected. 999 * 1000 * @return The entry count when eviction was detected. 1001 */ 1002 public long getEvictionEntryCount() { 1003 return evictionEntryCount; 1004 } 1005 1006 /** 1007 * Suspend output if true. 1008 * 1009 * @param v The value to set the suspend value to. 1010 */ 1011 public void setPause(boolean v) { 1012 pause=v; 1013 } 1014 1015 /** 1016 * The action to be performed by this timer task. 1017 */ 1018 public void run() { 1019 long latestCount = reader.getEntriesRead() + 0; 1020 long deltaCount = (latestCount - previousCount); 1021 long latestTime = System.currentTimeMillis(); 1022 long deltaTime = latestTime - previousTime; 1023 Message message; 1024 if (deltaTime == 0) { 1025 return; 1026 } 1027 if(pause) { 1028 return; 1029 } 1030 if(!ldifRead) { 1031 long numRead = reader.getEntriesRead(); 1032 long numIgnored = reader.getEntriesIgnored(); 1033 long numRejected = reader.getEntriesRejected(); 1034 float rate = 1000f*deltaCount / deltaTime; 1035 message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get( 1036 numRead, numIgnored, numRejected, 0, rate); 1037 logError(message); 1038 } 1039 try 1040 { 1041 Runtime runtime = Runtime.getRuntime(); 1042 long freeMemory = runtime.freeMemory() / bytesPerMegabyte; 1043 EnvironmentStats envStats = 1044 rootContainer.getEnvironmentStats(new StatsConfig()); 1045 long nCacheMiss = 1046 envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); 1047 1048 float cacheMissRate = 0; 1049 if (deltaCount > 0) { 1050 cacheMissRate = nCacheMiss/(float)deltaCount; 1051 } 1052 message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get( 1053 freeMemory, cacheMissRate); 1054 logError(message); 1055 long evictPasses = envStats.getNEvictPasses(); 1056 long evictNodes = envStats.getNNodesExplicitlyEvicted(); 1057 long evictBinsStrip = envStats.getNBINsStripped(); 1058 int cleanerRuns = envStats.getNCleanerRuns(); 1059 int cleanerDeletions = envStats.getNCleanerDeletions(); 1060 int cleanerEntriesRead = envStats.getNCleanerEntriesRead(); 1061 int cleanerINCleaned = envStats.getNINsCleaned(); 1062 int checkPoints = envStats.getNCheckpoints(); 1063 if(evictPasses != 0) { 1064 if(!evicting) { 1065 evicting=true; 1066 if(!ldifRead) { 1067 evictionEntryCount=reader.getEntriesRead(); 1068 message = 1069 NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount); 1070 logError(message); 1071 } 1072 } 1073 message = 1074 NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses, 1075 evictNodes, evictBinsStrip); 1076 logError(message); 1077 } 1078 if(cleanerRuns != 0) { 1079 message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, 1080 cleanerDeletions, cleanerEntriesRead, cleanerINCleaned); 1081 logError(message); 1082 } 1083 if(checkPoints > 1) { 1084 message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); 1085 logError(message); 1086 } 1087 prevEnvStats = envStats; 1088 } catch (DatabaseException e) { 1089 // Unlikely to happen and not critical. 1090 } 1091 previousCount = latestCount; 1092 previousTime = latestTime; 1093 } 1094 } 1095 } 1096