001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import org.opends.messages.MessageBuilder; 029 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.messages.ReplicationMessages.*; 032 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 033 034 import java.util.List; 035 import java.io.UnsupportedEncodingException; 036 037 import org.opends.server.types.DN; 038 import org.opends.server.replication.common.ChangeNumber; 039 import org.opends.server.replication.protocol.UpdateMessage; 040 import java.util.concurrent.locks.ReentrantReadWriteLock; 041 042 import com.sleepycat.je.Cursor; 043 import com.sleepycat.je.DatabaseEntry; 044 import com.sleepycat.je.DatabaseException; 045 import com.sleepycat.je.Database; 046 import com.sleepycat.je.DeadlockException; 047 import com.sleepycat.je.LockMode; 048 import com.sleepycat.je.OperationStatus; 049 import com.sleepycat.je.Transaction; 050 051 /** 052 * This class implements the interface between the underlying database 053 * and the dbHandler class. 054 * This is the only class that should have code using the BDB interfaces. 055 */ 056 public class ReplicationDB 057 { 058 private Database db = null; 059 private ReplicationDbEnv dbenv = null; 060 private ReplicationServer replicationServer; 061 private Short serverId; 062 private DN baseDn; 063 064 // The maximum number of retries in case of DatabaseDeadlock Exception. 065 private static final int DEADLOCK_RETRIES = 10; 066 067 // The lock used to provide exclusive access to the thread that 068 // close the db (shutdown or clear). 069 private ReentrantReadWriteLock dbCloseLock; 070 071 /** 072 * Creates a new database or open existing database that will be used 073 * to store and retrieve changes from an LDAP server. 074 * @param serverId The identifier of the LDAP server. 075 * @param baseDn The baseDn of the replication domain. 076 * @param replicationServer The ReplicationServer that needs to be shutdown. 077 * @param dbenv The Db environment to use to create the db. 078 * @throws DatabaseException If a database problem happened. 079 */ 080 public ReplicationDB(Short serverId, DN baseDn, 081 ReplicationServer replicationServer, 082 ReplicationDbEnv dbenv) 083 throws DatabaseException 084 { 085 this.serverId = serverId; 086 this.baseDn = baseDn; 087 this.dbenv = dbenv; 088 this.replicationServer = replicationServer; 089 090 // Get or create the associated ReplicationServerDomain and Db. 091 db = dbenv.getOrAddDb(serverId, baseDn, 092 replicationServer.getReplicationServerDomain(baseDn, 093 true).getGenerationId()); 094 095 dbCloseLock = new ReentrantReadWriteLock(true); 096 } 097 098 /** 099 * add a list of changes to the underlying db. 100 * 101 * @param changes The list of changes to add to the underlying db. 102 */ 103 public void addEntries(List<UpdateMessage> changes) 104 { 105 Transaction txn = null; 106 107 try 108 { 109 int tries = 0; 110 boolean done = false; 111 112 // The database can return a Deadlock Exception if several threads are 113 // accessing the database at the same time. This Exception is a 114 // transient state, when it happens the transaction is aborted and 115 // the operation is attempted again up to DEADLOCK_RETRIES times. 116 while ((tries++ < DEADLOCK_RETRIES) && (!done)) 117 { 118 dbCloseLock.readLock().lock(); 119 try 120 { 121 txn = dbenv.beginTransaction(); 122 123 for (UpdateMessage change : changes) 124 { 125 DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); 126 DatabaseEntry data = new ReplicationData(change); 127 db.put(txn, key, data); 128 } 129 130 txn.commitWriteNoSync(); 131 txn = null; 132 done = true; 133 } 134 catch (DeadlockException e) 135 { 136 txn.abort(); 137 txn = null; 138 } 139 finally 140 { 141 dbCloseLock.readLock().unlock(); 142 } 143 } 144 if (!done) 145 { 146 // Could not write to the DB after DEADLOCK_RETRIES tries. 147 // This ReplicationServer is not reliable and will be shutdown. 148 MessageBuilder mb = new MessageBuilder(); 149 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 150 logError(mb.toMessage()); 151 if (txn != null) 152 { 153 txn.abort(); 154 } 155 replicationServer.shutdown(); 156 } 157 } 158 catch (DatabaseException e) 159 { 160 MessageBuilder mb = new MessageBuilder(); 161 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 162 mb.append(stackTraceToSingleLineString(e)); 163 logError(mb.toMessage()); 164 if (txn != null) 165 { 166 try 167 { 168 txn.abort(); 169 } catch (DatabaseException e1) 170 { 171 // can't do much more. The ReplicationServer is shuting down. 172 } 173 } 174 replicationServer.shutdown(); 175 } 176 catch (UnsupportedEncodingException e) 177 { 178 MessageBuilder mb = new MessageBuilder(); 179 mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); 180 mb.append(stackTraceToSingleLineString(e)); 181 logError(mb.toMessage()); 182 replicationServer.shutdown(); 183 if (txn != null) 184 { 185 try 186 { 187 txn.abort(); 188 } catch (DatabaseException e1) 189 { 190 // can't do much more. The ReplicationServer is shuting down. 191 } 192 } 193 replicationServer.shutdown(); 194 } 195 } 196 197 198 /** 199 * Shutdown the database. 200 */ 201 public void shutdown() 202 { 203 try 204 { 205 dbCloseLock.writeLock().lock(); 206 try 207 { 208 db.close(); 209 } 210 finally 211 { 212 dbCloseLock.writeLock().unlock(); 213 } 214 } 215 catch (DatabaseException e) 216 { 217 MessageBuilder mb = new MessageBuilder(); 218 mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString())); 219 mb.append(stackTraceToSingleLineString(e)); 220 logError(mb.toMessage()); 221 } 222 } 223 224 /** 225 * Create a cursor that can be used to search or iterate on this 226 * ReplicationServer DB. 227 * 228 * @param changeNumber The ChangeNumber from which the cursor must start. 229 * @throws DatabaseException If a database error prevented the cursor 230 * creation. 231 * @throws Exception if the ReplServerDBCursor creation failed. 232 * @return The ReplServerDBCursor. 233 */ 234 public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber) 235 throws DatabaseException, Exception 236 { 237 return new ReplServerDBCursor(changeNumber); 238 } 239 240 /** 241 * Create a cursor that can be used to delete some record from this 242 * ReplicationServer database. 243 * 244 * @throws DatabaseException If a database error prevented the cursor 245 * creation. 246 * @throws Exception if the ReplServerDBCursor creation failed. 247 * 248 * @return The ReplServerDBCursor. 249 */ 250 public ReplServerDBCursor openDeleteCursor() 251 throws DatabaseException, Exception 252 { 253 return new ReplServerDBCursor(); 254 } 255 256 private void closeLockedCursor(Cursor cursor) 257 throws DatabaseException 258 { 259 try 260 { 261 if (cursor != null) 262 cursor.close(); 263 } 264 finally 265 { 266 dbCloseLock.readLock().unlock(); 267 } 268 } 269 270 /** 271 * Read the first Change from the database. 272 * @return the first ChangeNumber. 273 */ 274 public ChangeNumber readFirstChange() 275 { 276 Cursor cursor = null; 277 String str = null; 278 279 try 280 { 281 dbCloseLock.readLock().lock(); 282 cursor = db.openCursor(null, null); 283 } 284 catch (DatabaseException e1) 285 { 286 dbCloseLock.readLock().unlock(); 287 return null; 288 } 289 try 290 { 291 try 292 { 293 DatabaseEntry key = new DatabaseEntry(); 294 DatabaseEntry data = new DatabaseEntry(); 295 OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); 296 if (status != OperationStatus.SUCCESS) 297 { 298 /* database is empty */ 299 return null; 300 } 301 try 302 { 303 str = new String(key.getData(), "UTF-8"); 304 } catch (UnsupportedEncodingException e) 305 { 306 // never happens 307 } 308 return new ChangeNumber(str); 309 } 310 finally 311 { 312 closeLockedCursor(cursor); 313 } 314 } 315 catch (DatabaseException e) 316 { 317 /* database is faulty */ 318 MessageBuilder mb = new MessageBuilder(); 319 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 320 mb.append(stackTraceToSingleLineString(e)); 321 logError(mb.toMessage()); 322 replicationServer.shutdown(); 323 return null; 324 } 325 } 326 327 /** 328 * Read the last Change from the database. 329 * @return the last ChangeNumber. 330 */ 331 public ChangeNumber readLastChange() 332 { 333 Cursor cursor = null; 334 String str = null; 335 336 try 337 { 338 dbCloseLock.readLock().lock(); 339 try 340 { 341 cursor = db.openCursor(null, null); 342 DatabaseEntry key = new DatabaseEntry(); 343 DatabaseEntry data = new DatabaseEntry(); 344 OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); 345 if (status != OperationStatus.SUCCESS) 346 { 347 /* database is empty */ 348 return null; 349 } 350 try 351 { 352 str = new String(key.getData(), "UTF-8"); 353 } 354 catch (UnsupportedEncodingException e) 355 { 356 // never happens 357 } 358 return new ChangeNumber(str); 359 } 360 finally 361 { 362 closeLockedCursor(cursor); 363 } 364 } 365 catch (DatabaseException e) 366 { 367 MessageBuilder mb = new MessageBuilder(); 368 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 369 mb.append(stackTraceToSingleLineString(e)); 370 logError(mb.toMessage()); 371 replicationServer.shutdown(); 372 return null; 373 } 374 } 375 376 /** 377 * {@inheritDoc} 378 */ 379 @Override 380 public String toString() 381 { 382 return serverId.toString() + baseDn.toString(); 383 } 384 385 /** 386 * This Class implements a cursor that can be used to browse a 387 * replicationServer database. 388 */ 389 public class ReplServerDBCursor 390 { 391 private Cursor cursor = null; 392 393 // The transaction that will protect the actions done with the cursor 394 // Will be let null for a read cursor 395 // Will be set non null for a write cursor 396 private Transaction txn = null; 397 DatabaseEntry key = new DatabaseEntry(); 398 DatabaseEntry data = new DatabaseEntry(); 399 400 /** 401 * Creates a ReplServerDBCursor that can be used for browsing a 402 * replicationServer db. 403 * 404 * @param startingChangeNumber The ChangeNumber from which the cursor must 405 * start. 406 * @throws Exception When the startingChangeNumber does not exist. 407 */ 408 private ReplServerDBCursor(ChangeNumber startingChangeNumber) 409 throws Exception 410 { 411 try 412 { 413 // Take the lock. From now on, whatever error that happen in the life 414 // of this cursor should end by unlocking that lock. We must also 415 // unlock it when throwing an exception. 416 dbCloseLock.readLock().lock(); 417 418 cursor = db.openCursor(txn, null); 419 if (startingChangeNumber != null) 420 { 421 key = new ReplicationKey(startingChangeNumber); 422 data = new DatabaseEntry(); 423 424 if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != 425 OperationStatus.SUCCESS) 426 { 427 // We could not move the cursor to the expected startingChangeNumber 428 if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != 429 OperationStatus.SUCCESS) 430 { 431 // We could not even move the cursor closed to it => failure 432 throw new Exception("ChangeNumber not available"); 433 } 434 else 435 { 436 // We can move close to the startingChangeNumber. 437 // Let's create a cursor from that point. 438 DatabaseEntry key = new DatabaseEntry(); 439 DatabaseEntry data = new DatabaseEntry(); 440 if (cursor.getPrev(key, data, LockMode.DEFAULT) != 441 OperationStatus.SUCCESS) 442 { 443 closeLockedCursor(cursor); 444 dbCloseLock.readLock().lock(); 445 cursor = db.openCursor(txn, null); 446 } 447 } 448 } 449 } 450 } 451 catch (Exception e) 452 { 453 // Unlocking is required before throwing any exception 454 closeLockedCursor(cursor); 455 throw (e); 456 } 457 } 458 459 private ReplServerDBCursor() throws DatabaseException 460 { 461 try 462 { 463 // We'll go on only if no close or no clear is running 464 dbCloseLock.readLock().lock(); 465 466 // Create the transaction that will protect whatever done with this 467 // write cursor. 468 txn = dbenv.beginTransaction(); 469 470 cursor = db.openCursor(txn, null); 471 } 472 catch(DatabaseException e) 473 { 474 if (txn != null) 475 { 476 try 477 { 478 txn.abort(); 479 } 480 catch (DatabaseException dbe) 481 {} 482 } 483 closeLockedCursor(cursor); 484 throw (e); 485 } 486 } 487 488 /** 489 * Close the ReplicationServer Cursor. 490 */ 491 public void close() 492 { 493 try 494 { 495 closeLockedCursor(cursor); 496 cursor = null; 497 } 498 catch (DatabaseException e) 499 { 500 MessageBuilder mb = new MessageBuilder(); 501 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 502 mb.append(stackTraceToSingleLineString(e)); 503 logError(mb.toMessage()); 504 replicationServer.shutdown(); 505 } 506 if (txn != null) 507 { 508 try 509 { 510 txn.commit(); 511 } catch (DatabaseException e) 512 { 513 MessageBuilder mb = new MessageBuilder(); 514 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 515 mb.append(stackTraceToSingleLineString(e)); 516 logError(mb.toMessage()); 517 replicationServer.shutdown(); 518 } 519 } 520 } 521 522 /** 523 * Abort the Cursor after a Deadlock Exception. 524 * This method catch and ignore the DeadlockException because 525 * this must be done when aborting a cursor after a DeadlockException 526 * (per the Cursor documentation). 527 * This should not be used in any other case. 528 */ 529 public void abort() 530 { 531 if (cursor == null) 532 return; 533 try 534 { 535 closeLockedCursor(cursor); 536 cursor = null; 537 } 538 catch (DeadlockException e1) 539 { 540 // The DB documentation states that a DeadlockException 541 // on the close method of a cursor that is aborting should 542 // be ignored. 543 } 544 catch (DatabaseException e) 545 { 546 MessageBuilder mb = new MessageBuilder(); 547 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 548 mb.append(stackTraceToSingleLineString(e)); 549 logError(mb.toMessage()); 550 replicationServer.shutdown(); 551 } 552 if (txn != null) 553 { 554 try 555 { 556 txn.abort(); 557 } catch (DatabaseException e) 558 { 559 MessageBuilder mb = new MessageBuilder(); 560 mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); 561 mb.append(stackTraceToSingleLineString(e)); 562 logError(mb.toMessage()); 563 replicationServer.shutdown(); 564 } 565 } 566 } 567 568 /** 569 * Get the next ChangeNumber in the database from this Cursor. 570 * 571 * @return The next ChangeNumber in the database from this cursor. 572 * @throws DatabaseException In case of underlying database problem. 573 */ 574 public ChangeNumber nextChangeNumber() throws DatabaseException 575 { 576 OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT); 577 578 if (status != OperationStatus.SUCCESS) 579 { 580 return null; 581 } 582 try 583 { 584 String csnString = new String(key.getData(), "UTF-8"); 585 return new ChangeNumber(csnString); 586 } catch (UnsupportedEncodingException e) 587 { 588 // can't happen 589 return null; 590 } 591 } 592 593 /** 594 * Get the next UpdateMessage from this cursor. 595 * 596 * @return the next UpdateMessage. 597 */ 598 public UpdateMessage next() 599 { 600 UpdateMessage currentChange = null; 601 while (currentChange == null) 602 { 603 try 604 { 605 OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT); 606 if (status != OperationStatus.SUCCESS) 607 { 608 return null; 609 } 610 } catch (DatabaseException e) 611 { 612 return null; 613 } 614 try { 615 currentChange = ReplicationData.generateChange(data.getData()); 616 } catch (Exception e) { 617 /* 618 * An error happening trying to convert the data from the 619 * replicationServer database to an Update Message. 620 * This can only happen if the database is corrupted. 621 * There is not much more that we can do at this point except trying 622 * to continue with the next record. 623 * In such case, it is therefore possible that we miss some changes. 624 * TODO. log an error message. 625 * TODO : REPAIR : Such problem should be handled by the 626 * repair functionality. 627 */ 628 } 629 } 630 return currentChange; 631 } 632 633 /** 634 * Delete the record at the current cursor position. 635 * 636 * @throws DatabaseException In case of database problem. 637 */ 638 public void delete() throws DatabaseException 639 { 640 cursor.delete(); 641 } 642 } // ReplServerDBCursor 643 644 /** 645 * Clears this change DB from the changes it contains. 646 * 647 * @throws Exception Throws an exception it occurs. 648 * @throws DatabaseException Throws a DatabaseException when it occurs. 649 */ 650 public void clear() throws Exception, DatabaseException 651 { 652 // The coming users will be blocked until the clear is done 653 dbCloseLock.writeLock().lock(); 654 try 655 { 656 String dbName = db.getDatabaseName(); 657 658 // Clears the reference to this serverID 659 dbenv.clearServerId(baseDn, serverId); 660 661 // Closing is requested by the Berkeley DB before truncate 662 db.close(); 663 664 // Clears the changes 665 dbenv.clearDb(dbName); 666 667 db = null; 668 669 // RE-create the db 670 db = dbenv.getOrAddDb(serverId, baseDn, (long)-1); 671 } 672 catch(Exception e) 673 { 674 MessageBuilder mb = new MessageBuilder(); 675 mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(), 676 e.getMessage() + " " + 677 stackTraceToSingleLineString(e))); 678 logError(mb.toMessage()); 679 } 680 finally 681 { 682 // Relax the waiting users 683 dbCloseLock.writeLock().unlock(); 684 } 685 } 686 }