001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.backends.jeb; 028 import org.opends.messages.Message; 029 030 import org.opends.server.types.*; 031 032 import java.util.ArrayList; 033 import java.util.TimerTask; 034 import java.util.Timer; 035 import java.util.concurrent.CopyOnWriteArrayList; 036 import java.util.concurrent.locks.ReentrantLock; 037 038 import com.sleepycat.je.DatabaseException; 039 import com.sleepycat.je.StatsConfig; 040 import com.sleepycat.je.EnvironmentStats; 041 042 import static org.opends.server.loggers.ErrorLogger.logError; 043 import static org.opends.server.loggers.debug.DebugLogger.*; 044 import org.opends.server.loggers.debug.DebugTracer; 045 import org.opends.server.core.DirectoryServer; 046 import static org.opends.messages.JebMessages. 047 ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED; 048 import static org.opends.messages.JebMessages. 049 NOTE_JEB_REBUILD_PROGRESS_REPORT; 050 import static org.opends.messages.JebMessages. 051 NOTE_JEB_REBUILD_FINAL_STATUS; 052 import static org.opends.messages.JebMessages. 053 NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT; 054 import static org.opends.messages.JebMessages. 055 ERR_JEB_REBUILD_INDEX_CONFLICT; 056 import static org.opends.messages.JebMessages. 057 NOTE_JEB_REBUILD_START; 058 import static org.opends.messages.JebMessages. 059 ERR_JEB_VLV_INDEX_NOT_CONFIGURED; 060 /** 061 * Runs a index rebuild process on the backend. Each index selected for rebuild 062 * will be done from scratch by first clearing out the database for that index. 063 * Different threads will be used to rebuild each index. 064 * The rebuild process can run concurrently with the backend online and 065 * performing write and read operations. However, during the rebuild process, 066 * other reader and writer activeThreads might notice inconsistencies in index 067 * databases being rebuilt. They can safely ignore these inconsistencies as long 068 * as a rebuild is in progress. 069 */ 070 public class RebuildJob 071 { 072 /** 073 * The tracer object for the debug logger. 074 */ 075 private static final DebugTracer TRACER = getTracer(); 076 077 /** 078 * The rebuild configuraiton. 079 */ 080 private RebuildConfig rebuildConfig; 081 082 /** 083 * The root container used for the verify job. 084 */ 085 private RootContainer rootContainer; 086 087 /** 088 * The number of milliseconds between job progress reports. 089 */ 090 private long progressInterval = 10000; 091 092 /** 093 * The waiting rebuild threads created to process the rebuild. 094 */ 095 private CopyOnWriteArrayList<IndexRebuildThread> waitingThreads = 096 new CopyOnWriteArrayList<IndexRebuildThread>(); 097 098 /** 099 * The active rebuild threads created to process the rebuild. 100 */ 101 private CopyOnWriteArrayList<IndexRebuildThread> activeThreads = 102 new CopyOnWriteArrayList<IndexRebuildThread>(); 103 104 /** 105 * The completed rebuild threads used to process the rebuild. 106 */ 107 private CopyOnWriteArrayList<IndexRebuildThread> completedThreads = 108 new CopyOnWriteArrayList<IndexRebuildThread>(); 109 110 /** 111 * Rebuild jobs currently running. 112 */ 113 private static CopyOnWriteArrayList<RebuildJob> rebuildJobs = 114 new CopyOnWriteArrayList<RebuildJob>(); 115 116 /** 117 * A mutex that will be used to provide threadsafe access to methods changing 118 * the set of currently running rebuild jobs. 119 */ 120 private static ReentrantLock jobsMutex = new ReentrantLock(); 121 122 /** 123 * This class reports progress of the rebuild job at fixed intervals. 124 */ 125 class ProgressTask extends TimerTask 126 { 127 /** 128 * The number of records that had been processed at the time of the 129 * previous progress report. 130 */ 131 private long previousProcessed = 0; 132 133 /** 134 * The time in milliseconds of the previous progress report. 135 */ 136 private long previousTime; 137 138 /** 139 * The environment statistics at the time of the previous report. 140 */ 141 private EnvironmentStats prevEnvStats; 142 143 /** 144 * The number of bytes in a megabyte. 145 * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB). 146 */ 147 private static final int bytesPerMegabyte = 1024*1024; 148 /** 149 * Create a new verify progress task. 150 * @throws DatabaseException An error occurred while accessing the JE 151 * database. 152 */ 153 public ProgressTask() throws DatabaseException 154 { 155 previousTime = System.currentTimeMillis(); 156 prevEnvStats = 157 rootContainer.getEnvironmentStats(new StatsConfig()); 158 } 159 160 /** 161 * The action to be performed by this timer task. 162 */ 163 public void run() 164 { 165 long latestTime = System.currentTimeMillis(); 166 long deltaTime = latestTime - previousTime; 167 168 if (deltaTime == 0) 169 { 170 return; 171 } 172 173 long totalEntries = 0; 174 long latestProcessed = 0; 175 176 ArrayList<IndexRebuildThread> allThreads = 177 new ArrayList<IndexRebuildThread>(waitingThreads); 178 allThreads.addAll(activeThreads); 179 allThreads.addAll(completedThreads); 180 181 for(IndexRebuildThread thread : allThreads) 182 { 183 try 184 { 185 totalEntries += thread.getTotalEntries(); 186 latestProcessed += thread.getProcessedEntries(); 187 188 if(debugEnabled()) 189 { 190 TRACER.debugVerbose("Rebuild thread %s stats: total %d " + 191 "processed %d rebuilt %d duplicated %d skipped %d", 192 thread.getTotalEntries(), thread.getProcessedEntries(), 193 thread.getRebuiltEntries(), 194 thread.getDuplicatedEntries(), 195 thread.getSkippedEntries()); 196 } 197 } 198 catch(Exception e) 199 { 200 if(debugEnabled()) 201 { 202 TRACER.debugCaught(DebugLogLevel.ERROR, e); 203 } 204 } 205 } 206 207 long deltaCount = (latestProcessed - previousProcessed); 208 float rate = 1000f*deltaCount / deltaTime; 209 float completed = 0; 210 if(totalEntries > 0) 211 { 212 completed = 100f*latestProcessed / totalEntries; 213 } 214 215 Message message = NOTE_JEB_REBUILD_PROGRESS_REPORT.get( 216 completed, latestProcessed, totalEntries, rate); 217 logError(message); 218 219 try 220 { 221 Runtime runtime = Runtime.getRuntime(); 222 long freeMemory = runtime.freeMemory() / bytesPerMegabyte; 223 224 EnvironmentStats envStats = 225 rootContainer.getEnvironmentStats(new StatsConfig()); 226 long nCacheMiss = 227 envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); 228 229 float cacheMissRate = 0; 230 if (deltaCount > 0) 231 { 232 cacheMissRate = nCacheMiss/(float)deltaCount; 233 } 234 235 message = NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get( 236 freeMemory, cacheMissRate); 237 logError(message); 238 239 prevEnvStats = envStats; 240 } 241 catch (DatabaseException e) 242 { 243 if (debugEnabled()) 244 { 245 TRACER.debugCaught(DebugLogLevel.ERROR, e); 246 } 247 } 248 249 250 previousProcessed = latestProcessed; 251 previousTime = latestTime; 252 } 253 } 254 255 /** 256 * Construct a new rebuild job. 257 * 258 * @param rebuildConfig The configuration to use for this rebuild job. 259 */ 260 public RebuildJob(RebuildConfig rebuildConfig) 261 { 262 this.rebuildConfig = rebuildConfig; 263 } 264 265 private static void addJob(RebuildJob job) 266 throws DatabaseException, JebException 267 { 268 //Make sure there are no running rebuild jobs 269 jobsMutex.lock(); 270 271 try 272 { 273 for(RebuildJob otherJob : rebuildJobs) 274 { 275 String conflictIndex = 276 job.rebuildConfig.checkConflicts(otherJob.rebuildConfig); 277 if(conflictIndex != null) 278 { 279 if(debugEnabled()) 280 { 281 TRACER.debugError("Conflit detected. This job config: %s, " + 282 "That job config: %s.", 283 job.rebuildConfig, otherJob.rebuildConfig); 284 } 285 286 Message msg = ERR_JEB_REBUILD_INDEX_CONFLICT.get(conflictIndex); 287 throw new JebException(msg); 288 } 289 } 290 291 //No conflicts are found. Add the job to the list of currently running 292 // jobs. 293 rebuildJobs.add(job); 294 } 295 finally 296 { 297 jobsMutex.unlock(); 298 } 299 } 300 301 private static void removeJob(RebuildJob job) 302 { 303 jobsMutex.lock(); 304 305 rebuildJobs.remove(job); 306 307 jobsMutex.unlock(); 308 } 309 310 /** 311 * Initiate the rebuild process on a backend. 312 * 313 * @param rootContainer The root container to rebuild in. 314 * @throws DirectoryException If an error occurs during the rebuild process. 315 * @throws DatabaseException If a JE database error occurs during the rebuild 316 * process. 317 * @throws JebException If a JE database error occurs during the rebuild 318 * process. 319 */ 320 public void rebuildBackend(RootContainer rootContainer) 321 throws DirectoryException, DatabaseException, JebException 322 { 323 //TODO: Add check for only performing internal indexType rebuilds when 324 // backend is offline. 325 326 addJob(this); 327 328 try 329 { 330 this.rootContainer = rootContainer; 331 EntryContainer entryContainer = 332 rootContainer.getEntryContainer(rebuildConfig.getBaseDN()); 333 334 ArrayList<String> rebuildList = rebuildConfig.getRebuildList(); 335 336 if(!rebuildList.isEmpty()) 337 { 338 339 for (String index : rebuildList) 340 { 341 IndexRebuildThread rebuildThread; 342 String lowerName = index.toLowerCase(); 343 if (lowerName.equals("dn2id")) 344 { 345 rebuildThread = new IndexRebuildThread(entryContainer, 346 IndexRebuildThread.IndexType.DN2ID); 347 } 348 else if (lowerName.equals("dn2uri")) 349 { 350 rebuildThread = new IndexRebuildThread(entryContainer, 351 IndexRebuildThread.IndexType.DN2URI); 352 } 353 else if (lowerName.equals("id2children")) 354 { 355 rebuildThread = new IndexRebuildThread(entryContainer, 356 IndexRebuildThread.IndexType.ID2CHILDREN); 357 } 358 else if (lowerName.equals("id2subtree")) 359 { 360 rebuildThread = new IndexRebuildThread(entryContainer, 361 IndexRebuildThread.IndexType.ID2SUBTREE); 362 } 363 else if (lowerName.startsWith("vlv.")) 364 { 365 if(lowerName.length() < 5) 366 { 367 Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName); 368 throw new JebException(msg); 369 } 370 371 VLVIndex vlvIndex = 372 entryContainer.getVLVIndex(lowerName.substring(4)); 373 if(vlvIndex == null) 374 { 375 Message msg = 376 ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName.substring(4)); 377 throw new JebException(msg); 378 } 379 380 rebuildThread = new IndexRebuildThread(entryContainer, vlvIndex); 381 } 382 else 383 { 384 String[] attrIndexParts = lowerName.split("\\."); 385 if(attrIndexParts.length <= 0) 386 { 387 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); 388 throw new JebException(msg); 389 } 390 391 AttributeType attrType = 392 DirectoryServer.getAttributeType(attrIndexParts[0]); 393 394 if (attrType == null) 395 { 396 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); 397 throw new JebException(msg); 398 } 399 AttributeIndex attrIndex = 400 entryContainer.getAttributeIndex(attrType); 401 if (attrIndex == null) 402 { 403 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); 404 throw new JebException(msg); 405 } 406 407 if(attrIndexParts.length > 1) 408 { 409 Index partialAttrIndex = null; 410 if(attrIndexParts[1].equals("presence")) 411 { 412 partialAttrIndex = attrIndex.presenceIndex; 413 } 414 else if(attrIndexParts[1].equals("equality")) 415 { 416 partialAttrIndex = attrIndex.equalityIndex; 417 } 418 else if(attrIndexParts[1].equals("substring")) 419 { 420 partialAttrIndex = attrIndex.substringIndex; 421 } 422 else if(attrIndexParts[1].equals("ordering")) 423 { 424 partialAttrIndex = attrIndex.orderingIndex; 425 } 426 else if(attrIndexParts[1].equals("approximate")) 427 { 428 partialAttrIndex = attrIndex.approximateIndex; 429 } 430 431 if(partialAttrIndex == null) 432 { 433 Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); 434 throw new JebException(msg); 435 } 436 437 rebuildThread = 438 new IndexRebuildThread(entryContainer, partialAttrIndex); 439 } 440 else 441 { 442 rebuildThread = new IndexRebuildThread(entryContainer, 443 attrIndex); 444 } 445 } 446 447 waitingThreads.add(rebuildThread); 448 449 if(debugEnabled()) 450 { 451 TRACER.debugInfo("Created rebuild thread %s", 452 rebuildThread.getName()); 453 } 454 } 455 456 //Log a start message. 457 long totalToProcess = 0; 458 459 for(IndexRebuildThread thread : waitingThreads) 460 { 461 totalToProcess += thread.getTotalEntries(); 462 } 463 464 StringBuilder sb = new StringBuilder(); 465 for(String index : rebuildList) 466 { 467 if(sb.length() > 0) 468 { 469 sb.append(", "); 470 } 471 sb.append(index); 472 } 473 Message message = 474 NOTE_JEB_REBUILD_START.get(sb.toString(), totalToProcess); 475 logError(message); 476 477 // Make a note of the time we started. 478 long startTime = System.currentTimeMillis(); 479 480 // Start a timer for the progress report. 481 Timer timer = new Timer(); 482 TimerTask progressTask = new ProgressTask(); 483 timer.scheduleAtFixedRate(progressTask, progressInterval, 484 progressInterval); 485 486 entryContainer.exclusiveLock.lock(); 487 try 488 { 489 for(IndexRebuildThread thread : waitingThreads) 490 { 491 thread.clearDatabase(); 492 } 493 } 494 finally 495 { 496 if(!rebuildConfig.includesSystemIndex()) 497 { 498 entryContainer.exclusiveLock.unlock(); 499 } 500 } 501 502 503 if(!rebuildConfig.includesSystemIndex()) 504 { 505 entryContainer.sharedLock.lock(); 506 } 507 try 508 { 509 while(!waitingThreads.isEmpty()) 510 { 511 dispatchThreads(); 512 joinThreads(); 513 } 514 } 515 finally 516 { 517 timer.cancel(); 518 if(rebuildConfig.includesSystemIndex()) 519 { 520 entryContainer.exclusiveLock.unlock(); 521 } 522 else 523 { 524 entryContainer.sharedLock.unlock(); 525 } 526 } 527 528 long totalProcessed = 0; 529 long totalRebuilt = 0; 530 long totalDuplicated = 0; 531 long totalSkipped = 0; 532 533 for(IndexRebuildThread thread : completedThreads) 534 { 535 totalProcessed += thread.getProcessedEntries(); 536 totalRebuilt += thread.getRebuiltEntries(); 537 totalDuplicated += thread.getDuplicatedEntries(); 538 totalSkipped += thread.getSkippedEntries(); 539 } 540 541 long finishTime = System.currentTimeMillis(); 542 long totalTime = (finishTime - startTime); 543 544 float rate = 0; 545 if (totalTime > 0) 546 { 547 rate = 1000f*totalProcessed / totalTime; 548 } 549 550 message = NOTE_JEB_REBUILD_FINAL_STATUS.get( 551 totalProcessed, totalTime/1000, rate); 552 logError(message); 553 554 if(debugEnabled()) 555 { 556 TRACER.debugInfo("Detailed overall rebuild job stats: rebuilt %d, " + 557 "duplicated %d, skipped %d", 558 totalRebuilt, totalDuplicated, totalSkipped); 559 } 560 } 561 } 562 finally 563 { 564 removeJob(this); 565 } 566 567 } 568 569 /** 570 * Dispatch a set of threads based on their dependency and ordering. 571 */ 572 private void dispatchThreads() throws DatabaseException 573 { 574 for(IndexRebuildThread t : waitingThreads) 575 { 576 boolean start = true; 577 578 //Check to see if we have exceeded the max number of threads to use at 579 //one time. 580 if(rebuildConfig.getMaxRebuildThreads() > 0 && 581 activeThreads.size() > rebuildConfig.getMaxRebuildThreads()) 582 { 583 if(debugEnabled()) 584 { 585 TRACER.debugInfo("Delaying the start of thread %s because " + 586 "the max number of rebuild threads has been reached."); 587 } 588 start = false; 589 } 590 591 /** 592 * We may need to start the threads in stages since the rebuild process 593 * of some index types (id2children, id2subtree) depends on another 594 * index being rebuilt to be completed first. 595 */ 596 if(t.getIndexType() == IndexRebuildThread.IndexType.ID2CHILDREN || 597 t.getIndexType() == IndexRebuildThread.IndexType.ID2SUBTREE) 598 { 599 //Check to see if we have any waiting threads that needs to go 600 //first 601 for(IndexRebuildThread t2 : waitingThreads) 602 { 603 if(t2.getIndexType() == IndexRebuildThread.IndexType.DN2ID || 604 t2.getIndexType() == IndexRebuildThread.IndexType.DN2URI) 605 { 606 //We gotta wait for these to start before running the 607 //rebuild on ID2CHILDREN or ID2SUBTREE 608 609 if(debugEnabled()) 610 { 611 TRACER.debugInfo("Delaying the start of thread %s because " + 612 "it depends on another index rebuilt to " + 613 "go first.", t.getName()); 614 } 615 start = false; 616 break; 617 } 618 } 619 620 //Check to see if we have any active threads that needs to 621 //finish first 622 for(IndexRebuildThread t3 : activeThreads) 623 { 624 if(t3.getIndexType() == IndexRebuildThread.IndexType.DN2ID || 625 t3.getIndexType() == IndexRebuildThread.IndexType.DN2URI) 626 { 627 //We gotta wait for these to start before running the 628 //rebuild on ID2CHILDREN or ID2SUBTREE 629 630 if(debugEnabled()) 631 { 632 TRACER.debugInfo("Delaying the start of thread %s because " + 633 "it depends on another index being rebuilt to " + 634 "finish.", t.getName()); 635 } 636 start = false; 637 break; 638 } 639 } 640 } 641 642 if(start) 643 { 644 if(debugEnabled()) 645 { 646 TRACER.debugInfo("Starting rebuild thread %s.", t.getName()); 647 } 648 waitingThreads.remove(t); 649 activeThreads.add(t); 650 t.start(); 651 } 652 } 653 } 654 655 /** 656 * Wait for all worker activeThreads to exit. 657 */ 658 private void joinThreads() 659 { 660 for (IndexRebuildThread t : activeThreads) 661 { 662 try 663 { 664 t.join(); 665 666 if(debugEnabled()) 667 { 668 TRACER.debugInfo("Rebuild thread %s finished.", t.getName()); 669 } 670 activeThreads.remove(t); 671 completedThreads.add(t); 672 } 673 catch (InterruptedException ie) 674 { 675 // No action needed? 676 } 677 } 678 } 679 }