001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import org.opends.messages.MessageBuilder; 029 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.messages.ReplicationMessages.*; 032 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 033 034 import java.util.ArrayList; 035 import java.util.Date; 036 import java.util.List; 037 import java.util.LinkedList; 038 import java.util.NoSuchElementException; 039 040 import org.opends.server.admin.std.server.MonitorProviderCfg; 041 import org.opends.server.api.DirectoryThread; 042 import org.opends.server.api.MonitorProvider; 043 import org.opends.server.config.ConfigException; 044 import org.opends.server.types.Attribute; 045 import org.opends.server.types.DN; 046 import org.opends.server.types.InitializationException; 047 import org.opends.server.util.TimeThread; 048 import org.opends.server.core.DirectoryServer; 049 import org.opends.server.replication.common.ChangeNumber; 050 import org.opends.server.replication.protocol.UpdateMessage; 051 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; 052 053 import com.sleepycat.je.DatabaseException; 054 import com.sleepycat.je.DeadlockException; 055 056 /** 057 * This class is used for managing the replicationServer database for each 058 * server in the topology. 059 * It is responsible for efficiently saving the updates that is received from 060 * each master server into stable storage. 061 * This class is also able to generate a ReplicationIterator that can be 062 * used to read all changes from a given ChangeNUmber. 063 * 064 * This class publish some monitoring information below cn=monitor. 065 * 066 */ 067 public class DbHandler implements Runnable 068 { 069 // The msgQueue holds all the updates not yet saved to stable storage. 070 // This list is only used as a temporary placeholder so that the write 071 // in the stable storage can be grouped for efficiency reason. 072 // Adding an update synchronously add the update to this list. 073 // A dedicated thread loops on flush() and trim(). 074 // flush() : get a number of changes from the in memory list by block 075 // and write them to the db. 076 // trim() : deletes from the DB a number of changes that are older than a 077 // certain date. 078 // 079 // Changes are not read back by replicationServer threads that are responsible 080 // for pushing the changes to other replication server or to LDAP server 081 // 082 private final LinkedList<UpdateMessage> msgQueue = 083 new LinkedList<UpdateMessage>(); 084 private ReplicationDB db; 085 private ChangeNumber firstChange = null; 086 private ChangeNumber lastChange = null; 087 private short serverId; 088 private DN baseDn; 089 private DbMonitorProvider dbMonitor = new DbMonitorProvider(); 090 private boolean shutdown = false; 091 private boolean done = false; 092 private DirectoryThread thread = null; 093 private final Object flushLock = new Object(); 094 private ReplicationServer replicationServer; 095 096 097 // The High and low water mark for the max size of the msgQueue. 098 // the threads calling add() method will be blocked if the size of 099 // msgQueue becomes larger than the MSG_QUEUE_HIMARK and will resume 100 // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK. 101 final static int MSG_QUEUE_HIMARK = 5000; 102 final static int MSG_QUEUE_LOWMARK = 4000; 103 104 // The maximum number of retries in case of DatabaseDeadlock Exception. 105 private static final int DEADLOCK_RETRIES = 10; 106 107 /** 108 * 109 * The trim age in milliseconds. Changes record in the change DB that 110 * are older than this age are removed. 111 * 112 */ 113 private long trimage; 114 115 /** 116 * Creates a new dbHandler associated to a given LDAP server. 117 * 118 * @param id Identifier of the DB. 119 * @param baseDn the baseDn for which this DB was created. 120 * @param replicationServer The ReplicationServer that creates this dbHandler. 121 * @param dbenv the Database Env to use to create the ReplicationServer DB. 122 * server for this domain. 123 * @throws DatabaseException If a database problem happened 124 */ 125 public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, 126 ReplicationDbEnv dbenv) 127 throws DatabaseException 128 { 129 this.replicationServer = replicationServer; 130 this.serverId = id; 131 this.baseDn = baseDn; 132 this.trimage = replicationServer.getTrimage(); 133 db = new ReplicationDB(id, baseDn, replicationServer, dbenv); 134 firstChange = db.readFirstChange(); 135 lastChange = db.readLastChange(); 136 thread = new DirectoryThread(this, 137 "Replication Server db " + id + " " + baseDn); 138 thread.start(); 139 140 DirectoryServer.deregisterMonitorProvider( 141 dbMonitor.getMonitorInstanceName()); 142 DirectoryServer.registerMonitorProvider(dbMonitor); 143 } 144 145 /** 146 * Add an update to the list of messages that must be saved to the db 147 * managed by this db handler. 148 * This method is blocking if the size of the list of message is larger 149 * than its maximum. 150 * 151 * @param update The update that must be saved to the db managed by this db 152 * handler. 153 */ 154 public void add(UpdateMessage update) 155 { 156 synchronized (msgQueue) 157 { 158 int size = msgQueue.size(); 159 while (size > MSG_QUEUE_HIMARK) 160 { 161 try 162 { 163 msgQueue.wait(500); 164 } catch (InterruptedException e) 165 { 166 // simply loop to try again. 167 } 168 size = msgQueue.size(); 169 } 170 171 msgQueue.add(update); 172 if (lastChange == null || lastChange.older(update.getChangeNumber())) 173 { 174 lastChange = update.getChangeNumber(); 175 } 176 if (firstChange == null) 177 firstChange = update.getChangeNumber(); 178 } 179 } 180 181 /** 182 * Get some changes out of the message queue of the LDAP server. 183 * 184 * @param number the number of messages to extract. 185 * @return a List containing number changes extracted from the queue. 186 */ 187 private List<UpdateMessage> getChanges(int number) 188 { 189 int current = 0; 190 LinkedList<UpdateMessage> changes = new LinkedList<UpdateMessage>(); 191 192 synchronized (msgQueue) 193 { 194 int size = msgQueue.size(); 195 while ((current < number) && (current < size)) 196 { 197 UpdateMessage msg = msgQueue.get(current); 198 current++; 199 changes.add(msg); 200 } 201 } 202 return changes; 203 } 204 205 /** 206 * Get the firstChange. 207 * @return Returns the firstChange. 208 */ 209 public ChangeNumber getFirstChange() 210 { 211 return firstChange; 212 } 213 214 /** 215 * Get the lastChange. 216 * @return Returns the lastChange. 217 */ 218 public ChangeNumber getLastChange() 219 { 220 return lastChange; 221 } 222 223 /** 224 * Get the number of changes. 225 * 226 * @return Returns the number of changes. 227 */ 228 public long getChangesCount() 229 { 230 try 231 { 232 return lastChange.getSeqnum() - firstChange.getSeqnum() + 1; 233 } 234 catch (Exception e) 235 { 236 return 0; 237 } 238 } 239 240 /** 241 * Generate a new ReplicationIterator that allows to browse the db 242 * managed by this dbHandler and starting at the position defined 243 * by a given changeNumber. 244 * 245 * @param changeNumber The position where the iterator must start. 246 * 247 * @return a new ReplicationIterator that allows to browse the db 248 * managed by this dbHandler and starting at the position defined 249 * by a given changeNumber. 250 * 251 * @throws DatabaseException if a database problem happened. 252 * @throws Exception If there is no other change to push after change 253 * with changeNumber number. 254 */ 255 public ReplicationIterator generateIterator(ChangeNumber changeNumber) 256 throws DatabaseException, Exception 257 { 258 /* 259 * When we create an iterator we need to make sure that we 260 * don't miss some changes because the iterator is created 261 * close to the limit of the changed that have not yet been 262 * flushed to the database. 263 * We detect this by comparing the date of the changeNumber where 264 * we want to start with the date of the first ChangeNumber 265 * of the msgQueue. 266 * If this is the case we flush the queue to the database. 267 */ 268 ChangeNumber recentChangeNumber = null; 269 270 if (changeNumber == null) 271 flush(); 272 273 synchronized (msgQueue) 274 { 275 try 276 { 277 UpdateMessage msg = msgQueue.getFirst(); 278 recentChangeNumber = msg.getChangeNumber(); 279 } 280 catch (NoSuchElementException e) 281 {} 282 } 283 284 if ( (recentChangeNumber != null) && (changeNumber != null)) 285 { 286 if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) || 287 ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20)) 288 { 289 flush(); 290 } 291 } 292 293 ReplicationIterator it = 294 new ReplicationIterator(serverId, db, changeNumber); 295 296 return it; 297 } 298 299 /** 300 * Removes message in a subList of the msgQueue from the msgQueue. 301 * 302 * @param number the number of changes to be removed. 303 */ 304 private void clearQueue(int number) 305 { 306 synchronized (msgQueue) 307 { 308 int current = 0; 309 while ((current < number) && (!msgQueue.isEmpty())) 310 { 311 msgQueue.remove(); 312 current++; 313 } 314 if (msgQueue.size() < MSG_QUEUE_LOWMARK) 315 msgQueue.notify(); 316 } 317 } 318 319 /** 320 * Shutdown this dbHandler. 321 */ 322 public void shutdown() 323 { 324 if (shutdown == true) 325 { 326 return; 327 } 328 329 shutdown = true; 330 synchronized (this) 331 { 332 this.notifyAll(); 333 } 334 335 synchronized (this) 336 { 337 while (done == false) 338 { 339 try 340 { 341 this.wait(); 342 } catch (Exception e) 343 {} 344 } 345 } 346 347 while (msgQueue.size() != 0) 348 flush(); 349 350 db.shutdown(); 351 DirectoryServer.deregisterMonitorProvider( 352 dbMonitor.getMonitorInstanceName()); 353 } 354 355 /** 356 * Run method for this class. 357 * Periodically Flushes the ReplicationServerDomain cache from memory to the 358 * stable storage and trims the old updates. 359 */ 360 public void run() 361 { 362 while (shutdown == false) 363 { 364 try { 365 flush(); 366 trim(); 367 368 synchronized (this) 369 { 370 try 371 { 372 this.wait(1000); 373 } catch (InterruptedException e) 374 { } 375 } 376 } catch (Exception end) 377 { 378 MessageBuilder mb = new MessageBuilder(); 379 mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); 380 mb.append(stackTraceToSingleLineString(end)); 381 logError(mb.toMessage()); 382 if (replicationServer != null) 383 replicationServer.shutdown(); 384 break; 385 } 386 } 387 // call flush a last time before exiting to make sure that 388 // no change was forgotten in the msgQueue 389 flush(); 390 391 synchronized (this) 392 { 393 done = true; 394 this.notifyAll(); 395 } 396 } 397 398 /** 399 * Trim old changes from this replicationServer database. 400 * @throws DatabaseException In case of database problem. 401 */ 402 private void trim() throws DatabaseException, Exception 403 { 404 if (trimage == 0) 405 return; 406 int size = 0; 407 boolean finished = false; 408 boolean done = false; 409 ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage, 410 (short) 0, (short)0); 411 412 // In case of deadlock detection by the Database, this thread can 413 // by aborted by a DeadlockException. This is a transient error and 414 // the transaction should be attempted again. 415 // We will try DEADLOCK_RETRIES times before failing. 416 int tries = 0; 417 while ((tries++ < DEADLOCK_RETRIES) && (!done)) 418 { 419 /* the trim is done by group in order to save some CPU and IO bandwidth 420 * start the transaction then do a bunch of remove then commit 421 */ 422 ReplServerDBCursor cursor; 423 cursor = db.openDeleteCursor(); 424 425 try 426 { 427 while ((size < 5000 ) && (!finished)) 428 { 429 ChangeNumber changeNumber = cursor.nextChangeNumber(); 430 if (changeNumber != null) 431 { 432 if ((!changeNumber.equals(lastChange)) 433 && (changeNumber.older(trimDate))) 434 { 435 size++; 436 cursor.delete(); 437 } 438 else 439 { 440 firstChange = changeNumber; 441 finished = true; 442 } 443 } 444 else 445 finished = true; 446 } 447 cursor.close(); 448 done = true; 449 } 450 catch (DeadlockException e) 451 { 452 cursor.abort(); 453 if (tries == DEADLOCK_RETRIES) 454 { 455 // could not handle the Deadlock after DEADLOCK_RETRIES tries. 456 // shutdown the ReplicationServer. 457 shutdown = true; 458 throw (e); 459 } 460 } 461 catch (DatabaseException e) 462 { 463 // mark shutdown for this db so that we don't try again to 464 // stop it from cursor.close() or methods called by cursor.close() 465 shutdown = true; 466 cursor.abort(); 467 throw (e); 468 } 469 } 470 } 471 472 /** 473 * Flush a number of updates from the memory list to the stable storage. 474 */ 475 private void flush() 476 { 477 int size; 478 479 do 480 { 481 synchronized(flushLock) 482 { 483 // get N messages to save in the DB 484 List<UpdateMessage> changes = getChanges(500); 485 486 // if no more changes to save exit immediately. 487 if ((changes == null) || ((size = changes.size()) == 0)) 488 return; 489 490 // save the change to the stable storage. 491 db.addEntries(changes); 492 493 // remove the changes from the list of changes to be saved. 494 clearQueue(changes.size()); 495 } 496 } while (size >=500); 497 } 498 499 /** 500 * This internal class is used to implement the Monitoring capabilities 501 * of the dbHandler. 502 */ 503 private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> 504 { 505 private DbMonitorProvider() 506 { 507 super("ReplicationServer Database"); 508 } 509 510 /** 511 * {@inheritDoc} 512 */ 513 @Override 514 public ArrayList<Attribute> getMonitorData() 515 { 516 ArrayList<Attribute> attributes = new ArrayList<Attribute>(); 517 attributes.add(new Attribute("replicationServer-database", 518 String.valueOf(serverId))); 519 attributes.add(new Attribute("base-dn", baseDn.toString())); 520 if (firstChange != null) 521 { 522 Date firstTime = new Date(firstChange.getTime()); 523 attributes.add(new Attribute("first-change", 524 firstChange.toString() + " " + firstTime.toString())); 525 } 526 if (lastChange != null) 527 { 528 Date lastTime = new Date(lastChange.getTime()); 529 attributes.add(new Attribute("last-change", 530 lastChange.toString() + " " + lastTime.toString())); 531 } 532 533 return attributes; 534 } 535 536 /** 537 * {@inheritDoc} 538 */ 539 @Override 540 public String getMonitorInstanceName() 541 { 542 return "ReplicationServer database " + baseDn.toString() + 543 " " + String.valueOf(serverId); 544 } 545 546 /** 547 * {@inheritDoc} 548 */ 549 @Override 550 public long getUpdateInterval() 551 { 552 /* we don't wont to do polling on this monitor */ 553 return 0; 554 } 555 556 /** 557 * {@inheritDoc} 558 */ 559 @Override 560 public void initializeMonitorProvider(MonitorProviderCfg configuration) 561 throws ConfigException,InitializationException 562 { 563 // Nothing to do for now 564 } 565 566 /** 567 * {@inheritDoc} 568 */ 569 @Override 570 public void updateMonitorData() 571 { 572 // As long as getUpdateInterval() returns 0, this will never get called 573 } 574 } 575 576 /** 577 * {@inheritDoc} 578 */ 579 @Override 580 public String toString() 581 { 582 return(baseDn + " " + serverId + " " + firstChange + " " + lastChange); 583 } 584 585 /** 586 * Set the Purge delay for this db Handler. 587 * @param delay The purge delay in Milliseconds. 588 */ 589 public void setPurgeDelay(long delay) 590 { 591 trimage = delay; 592 } 593 594 /** 595 * Clear the changes from this DB (from both memory cache and DB storage). 596 * @throws DatabaseException When an exception occurs while removing the 597 * changes from the DB. 598 * @throws Exception When an exception occurs while accessing a resource 599 * from the DB. 600 * 601 */ 602 public void clear() throws DatabaseException, Exception 603 { 604 synchronized(flushLock) 605 { 606 msgQueue.clear(); 607 } 608 db.clear(); 609 firstChange = db.readFirstChange(); 610 lastChange = db.readLastChange(); 611 } 612 }