001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 import static org.opends.messages.ReplicationMessages.*; 029 import static org.opends.messages.ToolMessages.*; 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 032 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 033 import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME; 034 import static org.opends.server.replication.protocol.OperationContext.*; 035 import static org.opends.server.util.ServerConstants.*; 036 import static org.opends.server.util.StaticUtils.createEntry; 037 import static org.opends.server.util.StaticUtils.getFileForPath; 038 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 039 040 import java.io.File; 041 import java.io.IOException; 042 import java.io.OutputStream; 043 import java.net.SocketTimeoutException; 044 import java.util.ArrayList; 045 import java.util.Collection; 046 import java.util.HashSet; 047 import java.util.LinkedHashMap; 048 import java.util.LinkedHashSet; 049 import java.util.LinkedList; 050 import java.util.List; 051 import java.util.NoSuchElementException; 052 import java.util.SortedMap; 053 import java.util.TreeMap; 054 import java.util.concurrent.LinkedBlockingQueue; 055 import java.util.concurrent.atomic.AtomicInteger; 056 import java.util.zip.Adler32; 057 import java.util.zip.CheckedOutputStream; 058 import java.util.zip.DataFormatException; 059 060 import org.opends.messages.Message; 061 import org.opends.messages.MessageBuilder; 062 import org.opends.server.admin.server.ConfigurationChangeListener; 063 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; 064 import org.opends.server.admin.std.server.ReplicationDomainCfg; 065 import org.opends.server.api.AlertGenerator; 066 import org.opends.server.api.Backend; 067 import org.opends.server.api.DirectoryThread; 068 import org.opends.server.api.SynchronizationProvider; 069 import org.opends.server.backends.jeb.BackendImpl; 070 import org.opends.server.backends.task.Task; 071 import org.opends.server.config.ConfigException; 072 import org.opends.server.core.AddOperation; 073 import org.opends.server.core.DeleteOperation; 074 import org.opends.server.core.DirectoryServer; 075 import org.opends.server.core.LockFileManager; 076 import org.opends.server.core.ModifyDNOperation; 077 import org.opends.server.core.ModifyDNOperationBasis; 078 import org.opends.server.core.ModifyOperation; 079 import org.opends.server.core.ModifyOperationBasis; 080 import org.opends.server.loggers.debug.DebugTracer; 081 import org.opends.server.protocols.asn1.ASN1Exception; 082 import org.opends.server.protocols.asn1.ASN1OctetString; 083 import org.opends.server.protocols.internal.InternalClientConnection; 084 import org.opends.server.protocols.internal.InternalSearchOperation; 085 import org.opends.server.protocols.ldap.LDAPAttribute; 086 import org.opends.server.protocols.ldap.LDAPFilter; 087 import org.opends.server.protocols.ldap.LDAPModification; 088 import org.opends.server.replication.common.ChangeNumber; 089 import org.opends.server.replication.common.ChangeNumberGenerator; 090 import org.opends.server.replication.common.ServerState; 091 import org.opends.server.replication.protocol.AckMessage; 092 import org.opends.server.replication.protocol.AddContext; 093 import org.opends.server.replication.protocol.AddMsg; 094 import org.opends.server.replication.protocol.DeleteContext; 095 import org.opends.server.replication.protocol.DoneMessage; 096 import org.opends.server.replication.protocol.EntryMessage; 097 import org.opends.server.replication.protocol.ErrorMessage; 098 import org.opends.server.replication.protocol.HeartbeatMessage; 099 import org.opends.server.replication.protocol.InitializeRequestMessage; 100 import org.opends.server.replication.protocol.InitializeTargetMessage; 101 import org.opends.server.replication.protocol.ModifyContext; 102 import org.opends.server.replication.protocol.ModifyDNMsg; 103 import org.opends.server.replication.protocol.ModifyDnContext; 104 import org.opends.server.replication.protocol.OperationContext; 105 import org.opends.server.replication.protocol.ReplSessionSecurity; 106 import org.opends.server.replication.protocol.ReplicationMessage; 107 import org.opends.server.replication.protocol.ResetGenerationId; 108 import org.opends.server.replication.protocol.RoutableMessage; 109 import org.opends.server.replication.protocol.UpdateMessage; 110 import org.opends.server.tasks.InitializeTargetTask; 111 import org.opends.server.tasks.InitializeTask; 112 import org.opends.server.tasks.TaskUtils; 113 import org.opends.server.types.ExistingFileBehavior; 114 import org.opends.server.types.AbstractOperation; 115 import org.opends.server.types.Attribute; 116 import org.opends.server.types.AttributeType; 117 import org.opends.server.types.AttributeValue; 118 import org.opends.server.types.ConfigChangeResult; 119 import org.opends.server.types.Control; 120 import org.opends.server.types.DN; 121 import org.opends.server.types.DereferencePolicy; 122 import org.opends.server.types.DirectoryException; 123 import org.opends.server.types.Entry; 124 import org.opends.server.types.LDAPException; 125 import org.opends.server.types.LDIFExportConfig; 126 import org.opends.server.types.LDIFImportConfig; 127 import org.opends.server.types.Modification; 128 import org.opends.server.types.ModificationType; 129 import org.opends.server.types.Operation; 130 import org.opends.server.types.RDN; 131 import org.opends.server.types.RawModification; 132 import org.opends.server.types.ResultCode; 133 import org.opends.server.types.SearchFilter; 134 import org.opends.server.types.SearchResultEntry; 135 import org.opends.server.types.SearchScope; 136 import org.opends.server.types.SynchronizationProviderResult; 137 import org.opends.server.types.operation.PluginOperation; 138 import org.opends.server.types.operation.PostOperationOperation; 139 import org.opends.server.types.operation.PreOperationAddOperation; 140 import org.opends.server.types.operation.PreOperationDeleteOperation; 141 import org.opends.server.types.operation.PreOperationModifyDNOperation; 142 import org.opends.server.types.operation.PreOperationModifyOperation; 143 import org.opends.server.types.operation.PreOperationOperation; 144 import org.opends.server.workflowelement.localbackend.*; 145 146 /** 147 * This class implements the bulk part of the.of the Directory Server side 148 * of the replication code. 149 * It contains the root method for publishing a change, 150 * processing a change received from the replicationServer service, 151 * handle conflict resolution, 152 * handle protocol messages from the replicationServer. 153 */ 154 public class ReplicationDomain extends DirectoryThread 155 implements ConfigurationChangeListener<ReplicationDomainCfg>, 156 AlertGenerator 157 { 158 /** 159 * The fully-qualified name of this class. 160 */ 161 private static final String CLASS_NAME = 162 "org.opends.server.replication.plugin.ReplicationDomain"; 163 164 /** 165 * The attribute used to mark conflicting entries. 166 * The value of this attribute should be the dn that this entry was 167 * supposed to have when it was marked as conflicting. 168 */ 169 public static final String DS_SYNC_CONFLICT = "ds-sync-conflict"; 170 171 /** 172 * The tracer object for the debug logger. 173 */ 174 private static final DebugTracer TRACER = getTracer(); 175 176 private ReplicationMonitor monitor; 177 178 private ReplicationBroker broker; 179 // Thread waiting for incoming update messages for this domain and pushing 180 // them to the global incoming update message queue for later processing by 181 // replay threads. 182 private ListenerThread listenerThread; 183 // The update to replay message queue where the listener thread is going to 184 // push incoming update messages. 185 private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue; 186 private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs = 187 new TreeMap<ChangeNumber, UpdateMessage>(); 188 private AtomicInteger numRcvdUpdates = new AtomicInteger(0); 189 private AtomicInteger numSentUpdates = new AtomicInteger(0); 190 private AtomicInteger numProcessedUpdates = new AtomicInteger(); 191 private AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); 192 private AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); 193 private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger(); 194 private int debugCount = 0; 195 private PersistentServerState state; 196 private int numReplayedPostOpCalled = 0; 197 198 private int maxReceiveQueue = 0; 199 private int maxSendQueue = 0; 200 private int maxReceiveDelay = 0; 201 private int maxSendDelay = 0; 202 203 private long generationId = -1; 204 private boolean generationIdSavedStatus = false; 205 206 ChangeNumberGenerator generator; 207 208 /** 209 * This object is used to store the list of update currently being 210 * done on the local database. 211 * Is is usefull to make sure that the local operations are sent in a 212 * correct order to the replication server and that the ServerState 213 * is not updated too early. 214 */ 215 private PendingChanges pendingChanges; 216 217 /** 218 * It contain the updates that were done on other servers, transmitted 219 * by the replication server and that are currently replayed. 220 * It is usefull to make sure that dependencies between operations 221 * are correctly fullfilled and to to make sure that the ServerState is 222 * not updated too early. 223 */ 224 private RemotePendingChanges remotePendingChanges; 225 226 /** 227 * The time in milliseconds between heartbeats from the replication 228 * server. Zero means heartbeats are off. 229 */ 230 private long heartbeatInterval = 0; 231 short serverId; 232 233 // The context related to an import or export being processed 234 // Null when none is being processed. 235 private IEContext ieContext = null; 236 237 private Collection<String> replicationServers; 238 239 private DN baseDN; 240 241 private boolean shutdown = false; 242 243 private InternalClientConnection conn = 244 InternalClientConnection.getRootConnection(); 245 246 private boolean solveConflictFlag = true; 247 248 private boolean disabled = false; 249 private boolean stateSavingDisabled = false; 250 251 private int window = 100; 252 253 /** 254 * The isolation policy that this domain is going to use. 255 * This field describes the behavior of the domain when an update is 256 * attempted and the domain could not connect to any Replication Server. 257 * Possible values are accept-updates or deny-updates, but other values 258 * may be added in the futur. 259 */ 260 private IsolationPolicy isolationpolicy; 261 262 /** 263 * The DN of the configuration entry of this domain. 264 */ 265 private DN configDn; 266 267 /** 268 * A boolean indicating if the thread used to save the persistentServerState 269 * is terminated. 270 */ 271 private boolean done = true; 272 273 /** 274 * This class contain the context related to an import or export 275 * launched on the domain. 276 */ 277 private class IEContext 278 { 279 // The task that initiated the operation. 280 Task initializeTask; 281 // The input stream for the import 282 ReplLDIFInputStream ldifImportInputStream = null; 283 // The target in the case of an export 284 short exportTarget = RoutableMessage.UNKNOWN_SERVER; 285 // The source in the case of an import 286 short importSource = RoutableMessage.UNKNOWN_SERVER; 287 288 // The total entry count expected to be processed 289 long entryCount = 0; 290 // The count for the entry not yet processed 291 long entryLeftCount = 0; 292 293 boolean checksumOutput = false; 294 295 // The exception raised when any 296 DirectoryException exception = null; 297 long checksumOutputValue = (long)0; 298 299 /** 300 * Initializes the import/export counters with the provider value. 301 * @param count The value with which to initialize the counters. 302 */ 303 public void setCounters(long total, long left) 304 throws DirectoryException 305 { 306 entryCount = total; 307 entryLeftCount = left; 308 309 if (initializeTask != null) 310 { 311 if (initializeTask instanceof InitializeTask) 312 { 313 ((InitializeTask)initializeTask).setTotal(entryCount); 314 ((InitializeTask)initializeTask).setLeft(entryCount); 315 } 316 else if (initializeTask instanceof InitializeTargetTask) 317 { 318 ((InitializeTargetTask)initializeTask).setTotal(entryCount); 319 ((InitializeTargetTask)initializeTask).setLeft(entryCount); 320 } 321 } 322 } 323 324 /** 325 * Update the counters of the task for each entry processed during 326 * an import or export. 327 */ 328 public void updateCounters() 329 throws DirectoryException 330 { 331 entryLeftCount--; 332 333 if (initializeTask != null) 334 { 335 if (initializeTask instanceof InitializeTask) 336 { 337 ((InitializeTask)initializeTask).setLeft(entryLeftCount); 338 } 339 else if (initializeTask instanceof InitializeTargetTask) 340 { 341 ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); 342 } 343 } 344 } 345 346 /** 347 * {@inheritDoc} 348 */ 349 public String toString() 350 { 351 return new String("[ Entry count=" + this.entryCount + 352 ", Entry left count=" + this.entryLeftCount + "]"); 353 } 354 } 355 356 /** 357 * This thread is launched when we want to export data to another server that 358 * has requested to be initialized with the data of our backend. 359 */ 360 private class ExportThread extends DirectoryThread 361 { 362 // Id of server that will receive updates 363 private short target; 364 365 /** 366 * Constructor for the ExportThread. 367 * 368 * @param target Id of server that will receive updates 369 */ 370 public ExportThread(short target) 371 { 372 super("Export thread"); 373 this.target = target; 374 } 375 376 /** 377 * Run method for this class. 378 */ 379 public void run() 380 { 381 if (debugEnabled()) 382 { 383 TRACER.debugInfo("Export thread starting."); 384 } 385 386 try 387 { 388 initializeRemote(target, target, null); 389 } catch (DirectoryException de) 390 { 391 // An error message has been sent to the peer 392 // Nothing more to do locally 393 } 394 if (debugEnabled()) 395 { 396 TRACER.debugInfo("Export thread stopping."); 397 } 398 } 399 } 400 401 /** 402 * Creates a new ReplicationDomain using configuration from configEntry. 403 * 404 * @param configuration The configuration of this ReplicationDomain. 405 * @param updateToReplayQueue The queue for update messages to replay. 406 * @throws ConfigException In case of invalid configuration. 407 */ 408 public ReplicationDomain(ReplicationDomainCfg configuration, 409 LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue) 410 throws ConfigException 411 { 412 super("replicationDomain_" + configuration.getBaseDN()); 413 414 // Read the configuration parameters. 415 replicationServers = configuration.getReplicationServer(); 416 serverId = (short) configuration.getServerId(); 417 baseDN = configuration.getBaseDN(); 418 window = configuration.getWindowSize(); 419 heartbeatInterval = configuration.getHeartbeatInterval(); 420 isolationpolicy = configuration.getIsolationPolicy(); 421 configDn = configuration.dn(); 422 this.updateToReplayQueue = updateToReplayQueue; 423 424 /* 425 * Modify conflicts are solved for all suffixes but the schema suffix 426 * because we don't want to store extra information in the schema 427 * ldif files. 428 * This has no negative impact because the changes on schema should 429 * not produce conflicts. 430 */ 431 if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0) 432 { 433 solveConflictFlag = false; 434 } 435 else 436 { 437 solveConflictFlag = true; 438 } 439 440 /* 441 * Create a new Persistent Server State that will be used to store 442 * the last ChangeNmber seen from all LDAP servers in the topology. 443 */ 444 state = new PersistentServerState(baseDN, serverId); 445 446 /* 447 * Create a replication monitor object responsible for publishing 448 * monitoring information below cn=monitor. 449 */ 450 monitor = new ReplicationMonitor(this); 451 DirectoryServer.registerMonitorProvider(monitor); 452 453 Backend backend = retrievesBackend(baseDN); 454 if (backend == null) 455 { 456 throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( 457 baseDN.toNormalizedString())); 458 } 459 460 try 461 { 462 generationId = loadGenerationId(); 463 } 464 catch (DirectoryException e) 465 { 466 logError(ERR_LOADING_GENERATION_ID.get( 467 baseDN.toNormalizedString(), e.getLocalizedMessage())); 468 } 469 470 /* 471 * create the broker object used to publish and receive changes 472 */ 473 broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue, 474 maxReceiveDelay, maxSendQueue, maxSendDelay, window, 475 heartbeatInterval, generationId, 476 new ReplSessionSecurity(configuration)); 477 478 broker.start(replicationServers); 479 480 /* 481 * ChangeNumberGenerator is used to create new unique ChangeNumbers 482 * for each operation done on this replication domain. 483 * 484 * The generator time is adjusted to the time of the last CN received from 485 * remote other servers. 486 */ 487 generator = 488 new ChangeNumberGenerator(serverId, state); 489 490 pendingChanges = 491 new PendingChanges(generator, 492 broker, state); 493 494 remotePendingChanges = new RemotePendingChanges(generator, state); 495 496 // listen for changes on the configuration 497 configuration.addChangeListener(this); 498 499 // register as an AltertGenerator 500 DirectoryServer.registerAlertGenerator(this); 501 } 502 503 504 /** 505 * Returns the base DN of this ReplicationDomain. 506 * 507 * @return The base DN of this ReplicationDomain 508 */ 509 public DN getBaseDN() 510 { 511 return baseDN; 512 } 513 514 /** 515 * Implement the handleConflictResolution phase of the deleteOperation. 516 * 517 * @param deleteOperation The deleteOperation. 518 * @return A SynchronizationProviderResult indicating if the operation 519 * can continue. 520 */ 521 public SynchronizationProviderResult handleConflictResolution( 522 PreOperationDeleteOperation deleteOperation) 523 { 524 if ((!deleteOperation.isSynchronizationOperation()) 525 && (!brokerIsConnected(deleteOperation))) 526 { 527 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); 528 return new SynchronizationProviderResult.StopProcessing( 529 ResultCode.UNWILLING_TO_PERFORM, msg); 530 } 531 532 DeleteContext ctx = 533 (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); 534 Entry deletedEntry = deleteOperation.getEntryToDelete(); 535 536 if (ctx != null) 537 { 538 /* 539 * This is a replication operation 540 * Check that the modified entry has the same entryuuid 541 * has was in the original message. 542 */ 543 String operationEntryUUID = ctx.getEntryUid(); 544 String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); 545 if (!operationEntryUUID.equals(modifiedEntryUUID)) 546 { 547 /* 548 * The changes entry is not the same entry as the one on 549 * the original change was performed. 550 * Probably the original entry was renamed and replaced with 551 * another entry. 552 * We must not let the change proceed, return a negative 553 * result and set the result code to NO_SUCH_OBJET. 554 * When the operation will return, the thread that started the 555 * operation will try to find the correct entry and restart a new 556 * operation. 557 */ 558 return new SynchronizationProviderResult.StopProcessing( 559 ResultCode.NO_SUCH_OBJECT, null); 560 } 561 } 562 else 563 { 564 // There is no replication context attached to the operation 565 // so this is not a replication operation. 566 ChangeNumber changeNumber = generateChangeNumber(deleteOperation); 567 String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); 568 ctx = new DeleteContext(changeNumber, modifiedEntryUUID); 569 deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); 570 } 571 return new SynchronizationProviderResult.ContinueProcessing(); 572 } 573 574 /** 575 * Implement the handleConflictResolution phase of the addOperation. 576 * 577 * @param addOperation The AddOperation. 578 * @return A SynchronizationProviderResult indicating if the operation 579 * can continue. 580 */ 581 public SynchronizationProviderResult handleConflictResolution( 582 PreOperationAddOperation addOperation) 583 { 584 if ((!addOperation.isSynchronizationOperation()) 585 && (!brokerIsConnected(addOperation))) 586 { 587 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); 588 return new SynchronizationProviderResult.StopProcessing( 589 ResultCode.UNWILLING_TO_PERFORM, msg); 590 } 591 592 if (addOperation.isSynchronizationOperation()) 593 { 594 AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); 595 /* 596 * If an entry with the same entry uniqueID already exist then 597 * this operation has already been replayed in the past. 598 */ 599 String uuid = ctx.getEntryUid(); 600 if (findEntryDN(uuid) != null) 601 { 602 return new SynchronizationProviderResult.StopProcessing( 603 ResultCode.CANCELED, null); 604 } 605 606 /* The parent entry may have been renamed here since the change was done 607 * on the first server, and another entry have taken the former dn 608 * of the parent entry 609 */ 610 611 String parentUid = ctx.getParentUid(); 612 // root entry have no parent, 613 // there is no need to check for it. 614 if (parentUid != null) 615 { 616 // There is a potential of perfs improvement here 617 // if we could avoid the following parent entry retrieval 618 DN parentDnFromCtx = findEntryDN(ctx.getParentUid()); 619 620 if (parentDnFromCtx == null) 621 { 622 // The parent does not exist with the specified unique id 623 // stop the operation with NO_SUCH_OBJECT and let the 624 // conflict resolution or the dependency resolution solve this. 625 return new SynchronizationProviderResult.StopProcessing( 626 ResultCode.NO_SUCH_OBJECT, null); 627 } 628 else 629 { 630 DN entryDN = addOperation.getEntryDN(); 631 DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); 632 if ((parentDnFromEntryDn != null) 633 && (!parentDnFromCtx.equals(parentDnFromEntryDn))) 634 { 635 // parentEntry has been renamed 636 // replication name conflict resolution is expected to fix that 637 // later in the flow 638 return new SynchronizationProviderResult.StopProcessing( 639 ResultCode.NO_SUCH_OBJECT, null); 640 } 641 } 642 } 643 } 644 return new SynchronizationProviderResult.ContinueProcessing(); 645 } 646 647 /** 648 * Check that the broker associated to this ReplicationDomain has found 649 * a Replication Server and that this LDAP server is therefore able to 650 * process operations. 651 * If not set the ResultCode and the response message, 652 * interrupt the operation, and return false 653 * 654 * @param op The Operation that needs to be checked. 655 * 656 * @return true when it OK to process the Operation, false otherwise. 657 * When false is returned the resultCode and the reponse message 658 * is also set in the Operation. 659 */ 660 private boolean brokerIsConnected(PreOperationOperation op) 661 { 662 if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) 663 { 664 // this policy imply that we always accept updates. 665 return true; 666 } 667 if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) 668 { 669 // this isolation policy specifies that the updates are denied 670 // when the broker is not connected. 671 return broker.isConnected(); 672 } 673 // we should never get there as the only possible policies are 674 // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES 675 return true; 676 } 677 678 679 /** 680 * Implement the handleConflictResolution phase of the ModifyDNOperation. 681 * 682 * @param modifyDNOperation The ModifyDNOperation. 683 * @return A SynchronizationProviderResult indicating if the operation 684 * can continue. 685 */ 686 public SynchronizationProviderResult handleConflictResolution( 687 PreOperationModifyDNOperation modifyDNOperation) 688 { 689 if ((!modifyDNOperation.isSynchronizationOperation()) 690 && (!brokerIsConnected(modifyDNOperation))) 691 { 692 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); 693 return new SynchronizationProviderResult.StopProcessing( 694 ResultCode.UNWILLING_TO_PERFORM, msg); 695 } 696 697 ModifyDnContext ctx = 698 (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); 699 if (ctx != null) 700 { 701 /* 702 * This is a replication operation 703 * Check that the modified entry has the same entryuuid 704 * as was in the original message. 705 */ 706 String modifiedEntryUUID = 707 Historical.getEntryUuid(modifyDNOperation.getOriginalEntry()); 708 if (!modifiedEntryUUID.equals(ctx.getEntryUid())) 709 { 710 /* 711 * The modified entry is not the same entry as the one on 712 * the original change was performed. 713 * Probably the original entry was renamed and replaced with 714 * another entry. 715 * We must not let the change proceed, return a negative 716 * result and set the result code to NO_SUCH_OBJET. 717 * When the operation will return, the thread that started the 718 * operation will try to find the correct entry and restart a new 719 * operation. 720 */ 721 return new SynchronizationProviderResult.StopProcessing( 722 ResultCode.NO_SUCH_OBJECT, null); 723 } 724 if (modifyDNOperation.getNewSuperior() != null) 725 { 726 /* 727 * Also check that the current id of the 728 * parent is the same as when the operation was performed. 729 */ 730 String newParentId = findEntryId(modifyDNOperation.getNewSuperior()); 731 if ((newParentId != null) && 732 (!newParentId.equals(ctx.getNewParentId()))) 733 { 734 return new SynchronizationProviderResult.StopProcessing( 735 ResultCode.NO_SUCH_OBJECT, null); 736 } 737 } 738 } 739 else 740 { 741 // There is no replication context attached to the operation 742 // so this is not a replication operation. 743 ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation); 744 String newParentId = null; 745 if (modifyDNOperation.getNewSuperior() != null) 746 { 747 newParentId = findEntryId(modifyDNOperation.getNewSuperior()); 748 } 749 750 Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); 751 String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); 752 ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId); 753 modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); 754 } 755 return new SynchronizationProviderResult.ContinueProcessing(); 756 } 757 758 /** 759 * Handle the conflict resolution. 760 * Called by the core server after locking the entry and before 761 * starting the actual modification. 762 * @param modifyOperation the operation 763 * @return code indicating is operation must proceed 764 */ 765 public SynchronizationProviderResult handleConflictResolution( 766 PreOperationModifyOperation modifyOperation) 767 { 768 if ((!modifyOperation.isSynchronizationOperation()) 769 && (!brokerIsConnected(modifyOperation))) 770 { 771 Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); 772 return new SynchronizationProviderResult.StopProcessing( 773 ResultCode.UNWILLING_TO_PERFORM, msg); 774 } 775 776 ModifyContext ctx = 777 (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); 778 779 Entry modifiedEntry = modifyOperation.getModifiedEntry(); 780 if (ctx == null) 781 { 782 // There is no replication context attached to the operation 783 // so this is not a replication operation. 784 ChangeNumber changeNumber = generateChangeNumber(modifyOperation); 785 String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); 786 if (modifiedEntryUUID == null) 787 modifiedEntryUUID = modifyOperation.getEntryDN().toString(); 788 ctx = new ModifyContext(changeNumber, modifiedEntryUUID); 789 modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); 790 } 791 else 792 { 793 // This is a replayed operation, it is necessary to 794 // - check if the entry has been renamed 795 // - check for conflicts 796 String modifiedEntryUUID = ctx.getEntryUid(); 797 String currentEntryUUID = Historical.getEntryUuid(modifiedEntry); 798 if ((currentEntryUUID != null) && 799 (!currentEntryUUID.equals(modifiedEntryUUID))) 800 { 801 /* 802 * The current modified entry is not the same entry as the one on 803 * the original modification was performed. 804 * Probably the original entry was renamed and replaced with 805 * another entry. 806 * We must not let the modification proceed, return a negative 807 * result and set the result code to NO_SUCH_OBJET. 808 * When the operation will return, the thread that started the 809 * operation will try to find the correct entry and restart a new 810 * operation. 811 */ 812 return new SynchronizationProviderResult.StopProcessing( 813 ResultCode.NO_SUCH_OBJECT, null); 814 } 815 816 /* 817 * Solve the conflicts between modify operations 818 */ 819 Historical historicalInformation = Historical.load(modifiedEntry); 820 modifyOperation.setAttachment(Historical.HISTORICAL, 821 historicalInformation); 822 823 if (historicalInformation.replayOperation(modifyOperation, modifiedEntry)) 824 { 825 numResolvedModifyConflicts.incrementAndGet(); 826 } 827 828 if (modifyOperation.getModifications().isEmpty()) 829 { 830 /* 831 * This operation becomes a no-op due to conflict resolution 832 * stop the processing and send an OK result 833 */ 834 return new SynchronizationProviderResult.StopProcessing( 835 ResultCode.SUCCESS, null); 836 } 837 } 838 return new SynchronizationProviderResult.ContinueProcessing(); 839 } 840 841 /** 842 * The preOperation phase for the add Operation. 843 * Its job is to generate the replication context associated to the 844 * operation. It is necessary to do it in this phase because contrary to 845 * the other operations, the entry uid is not set when the handleConflict 846 * phase is called. 847 * 848 * @param addOperation The Add Operation. 849 */ 850 public void doPreOperation(PreOperationAddOperation addOperation) 851 { 852 AddContext ctx = new AddContext(generateChangeNumber(addOperation), 853 Historical.getEntryUuid(addOperation), 854 findEntryId(addOperation.getEntryDN().getParentDNInSuffix())); 855 856 addOperation.setAttachment(SYNCHROCONTEXT, ctx); 857 } 858 859 /** 860 * Receives an update message from the replicationServer. 861 * also responsible for updating the list of pending changes 862 * @return the received message - null if none 863 */ 864 public UpdateMessage receive() 865 { 866 UpdateMessage update = null; 867 868 while (update == null) 869 { 870 InitializeRequestMessage initMsg = null; 871 ReplicationMessage msg; 872 try 873 { 874 msg = broker.receive(); 875 if (msg == null) 876 { 877 // The server is in the shutdown process 878 return null; 879 } 880 881 if (debugEnabled()) 882 if (!(msg instanceof HeartbeatMessage)) 883 TRACER.debugVerbose("Message received <" + msg + ">"); 884 885 if (msg instanceof AckMessage) 886 { 887 AckMessage ack = (AckMessage) msg; 888 receiveAck(ack); 889 } 890 else if (msg instanceof InitializeRequestMessage) 891 { 892 // Another server requests us to provide entries 893 // for a total update 894 initMsg = (InitializeRequestMessage)msg; 895 } 896 else if (msg instanceof InitializeTargetMessage) 897 { 898 // Another server is exporting its entries to us 899 InitializeTargetMessage importMsg = (InitializeTargetMessage) msg; 900 901 try 902 { 903 // This must be done while we are still holding the 904 // broker lock because we are now going to receive a 905 // bunch of entries from the remote server and we 906 // want the import thread to catch them and 907 // not the ListenerThread. 908 initialize(importMsg); 909 } 910 catch(DirectoryException de) 911 { 912 // Returns an error message to notify the sender 913 ErrorMessage errorMsg = 914 new ErrorMessage(importMsg.getsenderID(), 915 de.getMessageObject()); 916 MessageBuilder mb = new MessageBuilder(); 917 mb.append(de.getMessageObject()); 918 TRACER.debugInfo(Message.toString(mb.toMessage())); 919 broker.publish(errorMsg); 920 } 921 } 922 else if (msg instanceof ErrorMessage) 923 { 924 if (ieContext != null) 925 { 926 // This is an error termination for the 2 following cases : 927 // - either during an export 928 // - or before an import really started 929 // For example, when we publish a request and the 930 // replicationServer did not find any import source. 931 abandonImportExport((ErrorMessage)msg); 932 } 933 else 934 { 935 /* We can receive an error message from the replication server 936 * in the following cases : 937 * - we connected with an incorrect generation id 938 */ 939 ErrorMessage errorMsg = (ErrorMessage)msg; 940 logError(ERR_ERROR_MSG_RECEIVED.get( 941 errorMsg.getDetails())); 942 } 943 } 944 else if (msg instanceof UpdateMessage) 945 { 946 update = (UpdateMessage) msg; 947 receiveUpdate(update); 948 } 949 } 950 catch (SocketTimeoutException e) 951 { 952 // just retry 953 } 954 // Test if we have received and export request message and 955 // if that's the case handle it now. 956 // This must be done outside of the portion of code protected 957 // by the broker lock so that we keep receiveing update 958 // when we are doing and export and so that a possible 959 // closure of the socket happening when we are publishing the 960 // entries to the remote can be handled by the other 961 // replay thread when they call this method and therefore the 962 // broker.receive() method. 963 if (initMsg != null) 964 { 965 // Do this work in a thread to allow replay thread continue working 966 ExportThread exportThread = new ExportThread(initMsg.getsenderID()); 967 exportThread.start(); 968 } 969 } 970 return update; 971 } 972 973 /** 974 * Do the necessary processing when an UpdateMessage was received. 975 * 976 * @param update The received UpdateMessage. 977 */ 978 public void receiveUpdate(UpdateMessage update) 979 { 980 remotePendingChanges.putRemoteUpdate(update); 981 numRcvdUpdates.incrementAndGet(); 982 } 983 984 /** 985 * Do the necessary processing when an AckMessage is received. 986 * 987 * @param ack The AckMessage that was received. 988 */ 989 public void receiveAck(AckMessage ack) 990 { 991 UpdateMessage update; 992 ChangeNumber changeNumber = ack.getChangeNumber(); 993 994 synchronized (waitingAckMsgs) 995 { 996 update = waitingAckMsgs.remove(changeNumber); 997 } 998 if (update != null) 999 { 1000 synchronized (update) 1001 { 1002 update.notify(); 1003 } 1004 } 1005 } 1006 1007 /** 1008 * Check if an operation must be synchronized. 1009 * Also update the list of pending changes and the server RUV 1010 * @param op the operation 1011 */ 1012 public void synchronize(PostOperationOperation op) 1013 { 1014 ResultCode result = op.getResultCode(); 1015 if ((result == ResultCode.SUCCESS) && op.isSynchronizationOperation()) 1016 { 1017 numReplayedPostOpCalled++; 1018 } 1019 UpdateMessage msg = null; 1020 1021 // Note that a failed non-replication operation might not have a change 1022 // number. 1023 ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op); 1024 1025 boolean isAssured = isAssured(op); 1026 1027 if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation())) 1028 { 1029 // Generate a replication message for a successful non-replication 1030 // operation. 1031 msg = UpdateMessage.generateMsg(op, isAssured); 1032 1033 if (msg == null) 1034 { 1035 /* 1036 * This is an operation type that we do not know about 1037 * It should never happen. 1038 */ 1039 pendingChanges.remove(curChangeNumber); 1040 Message message = 1041 ERR_UNKNOWN_TYPE.get(op.getOperationType().toString()); 1042 logError(message); 1043 return; 1044 } 1045 } 1046 1047 if (result == ResultCode.SUCCESS) 1048 { 1049 try 1050 { 1051 if (op.isSynchronizationOperation()) 1052 { 1053 remotePendingChanges.commit(curChangeNumber); 1054 } 1055 else 1056 { 1057 pendingChanges.commit(curChangeNumber, msg); 1058 } 1059 } 1060 catch (NoSuchElementException e) 1061 { 1062 Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get( 1063 curChangeNumber.toString(), op.toString()); 1064 logError(message); 1065 return; 1066 } 1067 1068 if (msg != null && isAssured) 1069 { 1070 synchronized (waitingAckMsgs) 1071 { 1072 // Add the assured message to the list of update that are 1073 // waiting acknowledgements 1074 waitingAckMsgs.put(curChangeNumber, msg); 1075 } 1076 } 1077 1078 if (generationIdSavedStatus != true) 1079 { 1080 this.saveGenerationId(generationId); 1081 } 1082 } 1083 else if (!op.isSynchronizationOperation()) 1084 { 1085 // Remove an unsuccessful non-replication operation from the pending 1086 // changes list. 1087 if (curChangeNumber != null) 1088 { 1089 pendingChanges.remove(curChangeNumber); 1090 } 1091 } 1092 1093 if (!op.isSynchronizationOperation()) 1094 { 1095 int pushedChanges = pendingChanges.pushCommittedChanges(); 1096 numSentUpdates.addAndGet(pushedChanges); 1097 } 1098 1099 // Wait for acknowledgement of an assured message. 1100 if (msg != null && isAssured) 1101 { 1102 synchronized (msg) 1103 { 1104 while (waitingAckMsgs.containsKey(msg.getChangeNumber())) 1105 { 1106 // TODO : should have a configurable timeout to get 1107 // out of this loop 1108 try 1109 { 1110 msg.wait(1000); 1111 } catch (InterruptedException e) 1112 { } 1113 } 1114 } 1115 } 1116 } 1117 1118 /** 1119 * get the number of updates received by the replication plugin. 1120 * 1121 * @return the number of updates received 1122 */ 1123 public int getNumRcvdUpdates() 1124 { 1125 if (numRcvdUpdates != null) 1126 return numRcvdUpdates.get(); 1127 else 1128 return 0; 1129 } 1130 1131 /** 1132 * Get the number of updates sent by the replication plugin. 1133 * 1134 * @return the number of updates sent 1135 */ 1136 public int getNumSentUpdates() 1137 { 1138 if (numSentUpdates != null) 1139 return numSentUpdates.get(); 1140 else 1141 return 0; 1142 } 1143 1144 /** 1145 * Get the number of updates in the pending list. 1146 * 1147 * @return The number of updates in the pending list 1148 */ 1149 public int getPendingUpdatesCount() 1150 { 1151 if (pendingChanges != null) 1152 return pendingChanges.size(); 1153 else 1154 return 0; 1155 } 1156 1157 /** 1158 * Increment the number of processed updates. 1159 */ 1160 public void incProcessedUpdates() 1161 { 1162 numProcessedUpdates.incrementAndGet(); 1163 } 1164 1165 /** 1166 * get the number of updates replayed by the replication. 1167 * 1168 * @return The number of updates replayed by the replication 1169 */ 1170 public int getNumProcessedUpdates() 1171 { 1172 if (numProcessedUpdates != null) 1173 return numProcessedUpdates.get(); 1174 else 1175 return 0; 1176 } 1177 1178 /** 1179 * get the number of updates replayed successfully by the replication. 1180 * 1181 * @return The number of updates replayed successfully 1182 */ 1183 public int getNumReplayedPostOpCalled() 1184 { 1185 return numReplayedPostOpCalled; 1186 } 1187 1188 /** 1189 * get the ServerState. 1190 * 1191 * @return the ServerState 1192 */ 1193 public ServerState getServerState() 1194 { 1195 return state; 1196 } 1197 1198 /** 1199 * Get the debugCount. 1200 * 1201 * @return Returns the debugCount. 1202 */ 1203 public int getDebugCount() 1204 { 1205 return debugCount; 1206 } 1207 1208 /** 1209 * Send an Ack message. 1210 * 1211 * @param changeNumber The ChangeNumber for which the ack must be sent. 1212 */ 1213 public void ack(ChangeNumber changeNumber) 1214 { 1215 broker.publish(new AckMessage(changeNumber)); 1216 } 1217 1218 /** 1219 * {@inheritDoc} 1220 */ 1221 @Override 1222 public void run() 1223 { 1224 done = false; 1225 1226 // Create the listener thread 1227 listenerThread = new ListenerThread(this, updateToReplayQueue); 1228 listenerThread.start(); 1229 1230 while (shutdown == false) 1231 { 1232 try 1233 { 1234 synchronized (this) 1235 { 1236 this.wait(1000); 1237 if (!disabled && !stateSavingDisabled ) 1238 { 1239 // save the RUV 1240 state.save(); 1241 } 1242 } 1243 } catch (InterruptedException e) 1244 { } 1245 } 1246 state.save(); 1247 1248 done = true; 1249 } 1250 1251 /** 1252 * Shutdown this ReplicationDomain. 1253 */ 1254 public void shutdown() 1255 { 1256 // stop the flush thread 1257 shutdown = true; 1258 1259 // Stop the listener thread 1260 if (listenerThread != null) 1261 { 1262 listenerThread.shutdown(); 1263 } 1264 1265 synchronized (this) 1266 { 1267 this.notify(); 1268 } 1269 1270 DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName()); 1271 1272 DirectoryServer.deregisterAlertGenerator(this); 1273 1274 // stop the ReplicationBroker 1275 broker.stop(); 1276 1277 // Wait for the listener thread to stop 1278 if (listenerThread != null) 1279 listenerThread.waitForShutdown(); 1280 1281 // wait for completion of the persistentServerState thread. 1282 try 1283 { 1284 while (!done) 1285 { 1286 Thread.sleep(50); 1287 } 1288 } catch (InterruptedException e) 1289 { 1290 // stop waiting when interrupted. 1291 } 1292 } 1293 1294 /** 1295 * Get the name of the replicationServer to which this domain is currently 1296 * connected. 1297 * 1298 * @return the name of the replicationServer to which this domain 1299 * is currently connected. 1300 */ 1301 public String getReplicationServer() 1302 { 1303 if (broker != null) 1304 return broker.getReplicationServer(); 1305 else 1306 return "Not connected"; 1307 } 1308 1309 /** 1310 * Create and replay a synchronized Operation from an UpdateMessage. 1311 * 1312 * @param msg The UpdateMessage to be replayed. 1313 */ 1314 public void replay(UpdateMessage msg) 1315 { 1316 Operation op = null; 1317 boolean done = false; 1318 boolean dependency = false; 1319 ChangeNumber changeNumber = null; 1320 int retryCount = 10; 1321 boolean firstTry = true; 1322 1323 // Try replay the operation, then flush (replaying) any pending operation 1324 // whose dependency has been replayed until no more left. 1325 do 1326 { 1327 try 1328 { 1329 while ((!dependency) && (!done) && (retryCount-- > 0)) 1330 { 1331 op = msg.createOperation(conn); 1332 1333 op.setInternalOperation(true); 1334 op.setSynchronizationOperation(true); 1335 changeNumber = OperationContext.getChangeNumber(op); 1336 ((AbstractOperation) op).run(); 1337 1338 // Try replay the operation 1339 ResultCode result = op.getResultCode(); 1340 1341 if (result != ResultCode.SUCCESS) 1342 { 1343 if (op instanceof ModifyOperation) 1344 { 1345 ModifyOperation newOp = (ModifyOperation) op; 1346 dependency = remotePendingChanges.checkDependencies(newOp); 1347 if ((!dependency) && (!firstTry)) 1348 { 1349 done = solveNamingConflict(newOp, msg); 1350 } 1351 } else if (op instanceof DeleteOperation) 1352 { 1353 DeleteOperation newOp = (DeleteOperation) op; 1354 dependency = remotePendingChanges.checkDependencies(newOp); 1355 if ((!dependency) && (!firstTry)) 1356 { 1357 done = solveNamingConflict(newOp, msg); 1358 } 1359 } else if (op instanceof AddOperation) 1360 { 1361 AddOperation newOp = (AddOperation) op; 1362 AddMsg addMsg = (AddMsg) msg; 1363 dependency = remotePendingChanges.checkDependencies(newOp); 1364 if ((!dependency) && (!firstTry)) 1365 { 1366 done = solveNamingConflict(newOp, addMsg); 1367 } 1368 } else if (op instanceof ModifyDNOperationBasis) 1369 { 1370 ModifyDNMsg newMsg = (ModifyDNMsg) msg; 1371 dependency = remotePendingChanges.checkDependencies(newMsg); 1372 if ((!dependency) && (!firstTry)) 1373 { 1374 ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op; 1375 done = solveNamingConflict(newOp, msg); 1376 } 1377 } else 1378 { 1379 done = true; // unknown type of operation ?! 1380 } 1381 if (done) 1382 { 1383 // the update became a dummy update and the result 1384 // of the conflict resolution phase is to do nothing. 1385 // however we still need to push this change to the serverState 1386 updateError(changeNumber); 1387 } 1388 } else 1389 { 1390 done = true; 1391 } 1392 firstTry = false; 1393 } 1394 1395 if (!done && !dependency) 1396 { 1397 // Continue with the next change but the servers could now become 1398 // inconsistent. 1399 // Let the repair tool know about this. 1400 Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(), 1401 op.getErrorMessage().toString()); 1402 logError(message); 1403 numUnresolvedNamingConflicts.incrementAndGet(); 1404 1405 updateError(changeNumber); 1406 } 1407 } catch (ASN1Exception e) 1408 { 1409 Message message = ERR_EXCEPTION_DECODING_OPERATION.get( 1410 String.valueOf(msg) + stackTraceToSingleLineString(e)); 1411 logError(message); 1412 } catch (LDAPException e) 1413 { 1414 Message message = ERR_EXCEPTION_DECODING_OPERATION.get( 1415 String.valueOf(msg) + stackTraceToSingleLineString(e)); 1416 logError(message); 1417 } catch (DataFormatException e) 1418 { 1419 Message message = ERR_EXCEPTION_DECODING_OPERATION.get( 1420 String.valueOf(msg) + stackTraceToSingleLineString(e)); 1421 logError(message); 1422 } catch (Exception e) 1423 { 1424 if (changeNumber != null) 1425 { 1426 /* 1427 * An Exception happened during the replay process. 1428 * Continue with the next change but the servers will now start 1429 * to be inconsistent. 1430 * Let the repair tool know about this. 1431 */ 1432 Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get( 1433 stackTraceToSingleLineString(e), op.toString()); 1434 logError(message); 1435 updateError(changeNumber); 1436 } else 1437 { 1438 Message message = ERR_EXCEPTION_DECODING_OPERATION.get( 1439 String.valueOf(msg) + stackTraceToSingleLineString(e)); 1440 logError(message); 1441 } 1442 } finally 1443 { 1444 if (!dependency) 1445 { 1446 broker.updateWindowAfterReplay(); 1447 if (msg.isAssured()) 1448 ack(msg.getChangeNumber()); 1449 incProcessedUpdates(); 1450 } 1451 } 1452 1453 // Now replay any pending update that had a dependency and whose 1454 // dependency has been replayed, do that until no more updates of that 1455 // type left... 1456 msg = remotePendingChanges.getNextUpdate(); 1457 1458 // Prepare restart of loop 1459 done = false; 1460 dependency = false; 1461 changeNumber = null; 1462 retryCount = 10; 1463 firstTry = true; 1464 1465 } while (msg != null); 1466 } 1467 1468 /** 1469 * This method is called when an error happens while replaying 1470 * an operation. 1471 * It is necessary because the postOperation does not always get 1472 * called when error or Exceptions happen during the operation replay. 1473 * 1474 * @param changeNumber the ChangeNumber of the operation with error. 1475 */ 1476 public void updateError(ChangeNumber changeNumber) 1477 { 1478 remotePendingChanges.commit(changeNumber); 1479 } 1480 1481 /** 1482 * Generate a new change number and insert it in the pending list. 1483 * 1484 * @param operation The operation for which the change number must be 1485 * generated. 1486 * @return The new change number. 1487 */ 1488 private ChangeNumber generateChangeNumber(PluginOperation operation) 1489 { 1490 return pendingChanges.putLocalOperation(operation); 1491 } 1492 1493 1494 /** 1495 * Find the Unique Id of the entry with the provided DN by doing a 1496 * search of the entry and extracting its uniqueID from its attributes. 1497 * 1498 * @param dn The dn of the entry for which the unique Id is searched. 1499 * 1500 * @return The unique Id of the entry whith the provided DN. 1501 */ 1502 private String findEntryId(DN dn) 1503 { 1504 if (dn == null) 1505 return null; 1506 try 1507 { 1508 LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); 1509 attrs.add(ENTRYUIDNAME); 1510 InternalSearchOperation search = conn.processSearch(dn, 1511 SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 1512 0, 0, false, 1513 SearchFilter.createFilterFromString("objectclass=*"), 1514 attrs); 1515 1516 if (search.getResultCode() == ResultCode.SUCCESS) 1517 { 1518 LinkedList<SearchResultEntry> result = search.getSearchEntries(); 1519 if (!result.isEmpty()) 1520 { 1521 SearchResultEntry resultEntry = result.getFirst(); 1522 if (resultEntry != null) 1523 { 1524 return Historical.getEntryUuid(resultEntry); 1525 } 1526 } 1527 } 1528 } catch (DirectoryException e) 1529 { 1530 // never happens because the filter is always valid. 1531 } 1532 return null; 1533 } 1534 1535 /** 1536 * find the current dn of an entry from its entry uuid. 1537 * 1538 * @param uuid the Entry Unique ID. 1539 * @return The curernt dn of the entry or null if there is no entry with 1540 * the specified uuid. 1541 */ 1542 private DN findEntryDN(String uuid) 1543 { 1544 try 1545 { 1546 InternalSearchOperation search = conn.processSearch(baseDN, 1547 SearchScope.WHOLE_SUBTREE, 1548 SearchFilter.createFilterFromString("entryuuid="+uuid)); 1549 if (search.getResultCode() == ResultCode.SUCCESS) 1550 { 1551 LinkedList<SearchResultEntry> result = search.getSearchEntries(); 1552 if (!result.isEmpty()) 1553 { 1554 SearchResultEntry resultEntry = result.getFirst(); 1555 if (resultEntry != null) 1556 { 1557 return resultEntry.getDN(); 1558 } 1559 } 1560 } 1561 } catch (DirectoryException e) 1562 { 1563 // never happens because the filter is always valid. 1564 } 1565 return null; 1566 } 1567 1568 /** 1569 * Solve a conflict detected when replaying a modify operation. 1570 * 1571 * @param op The operation that triggered the conflict detection. 1572 * @param msg The operation that triggered the conflict detection. 1573 * @return true if the process is completed, false if it must continue.. 1574 */ 1575 private boolean solveNamingConflict(ModifyOperation op, 1576 UpdateMessage msg) 1577 { 1578 ResultCode result = op.getResultCode(); 1579 ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); 1580 String entryUid = ctx.getEntryUid(); 1581 1582 if (result == ResultCode.NO_SUCH_OBJECT) 1583 { 1584 /* 1585 * The operation is a modification but 1586 * the entry has been renamed on a different master in the same time. 1587 * search if the entry has been renamed, and return the new dn 1588 * of the entry. 1589 */ 1590 DN newdn = findEntryDN(entryUid); 1591 if (newdn != null) 1592 { 1593 // There is an entry with the same unique id as this modify operation 1594 // replay the modify using the current dn of this entry. 1595 msg.setDn(newdn.toString()); 1596 numResolvedNamingConflicts.incrementAndGet(); 1597 return false; 1598 } 1599 else 1600 { 1601 // This entry does not exist anymore. 1602 // It has probably been deleted, stop the processing of this operation 1603 numResolvedNamingConflicts.incrementAndGet(); 1604 return true; 1605 } 1606 } 1607 else 1608 { 1609 // The other type of errors can not be caused by naming conflicts. 1610 // Log a message for the repair tool. 1611 Message message = ERR_ERROR_REPLAYING_OPERATION.get( 1612 op.toString(), ctx.getChangeNumber().toString(), 1613 result.toString(), op.getErrorMessage().toString()); 1614 logError(message); 1615 return true; 1616 } 1617 } 1618 1619 /** 1620 * Solve a conflict detected when replaying a delete operation. 1621 * 1622 * @param op The operation that triggered the conflict detection. 1623 * @param msg The operation that triggered the conflict detection. 1624 * @return true if the process is completed, false if it must continue.. 1625 */ 1626 private boolean solveNamingConflict(DeleteOperation op, 1627 UpdateMessage msg) 1628 { 1629 ResultCode result = op.getResultCode(); 1630 DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); 1631 String entryUid = ctx.getEntryUid(); 1632 1633 if (result == ResultCode.NO_SUCH_OBJECT) 1634 { 1635 /* 1636 * Find if the entry is still in the database. 1637 */ 1638 DN currentDn = findEntryDN(entryUid); 1639 if (currentDn == null) 1640 { 1641 /* 1642 * The entry has already been deleted, either because this delete 1643 * has already been replayed or because another concurrent delete 1644 * has already done the job. 1645 * In any case, there is is nothing more to do. 1646 */ 1647 numResolvedNamingConflicts.incrementAndGet(); 1648 return true; 1649 } 1650 else 1651 { 1652 /* 1653 * This entry has been renamed, replay the delete using its new DN. 1654 */ 1655 msg.setDn(currentDn.toString()); 1656 numResolvedNamingConflicts.incrementAndGet(); 1657 return false; 1658 } 1659 } 1660 else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) 1661 { 1662 /* 1663 * This may happen when we replay a DELETE done on a master 1664 * but children of this entry have been added on another master. 1665 * 1666 * Rename all the children by adding entryuuid in dn and delete this entry. 1667 * 1668 * The action taken here must be consistent with the actions 1669 * done in the solveNamingConflict(AddOperation) method 1670 * when we are adding an entry whose parent entry has already been deleted. 1671 */ 1672 findAndRenameChild(entryUid, op.getEntryDN(), op); 1673 numUnresolvedNamingConflicts.incrementAndGet(); 1674 return false; 1675 } 1676 else 1677 { 1678 // The other type of errors can not be caused by naming conflicts. 1679 // Log a message for the repair tool. 1680 Message message = ERR_ERROR_REPLAYING_OPERATION.get( 1681 op.toString(), ctx.getChangeNumber().toString(), 1682 result.toString(), op.getErrorMessage().toString()); 1683 logError(message); 1684 return true; 1685 } 1686 } 1687 1688 /** 1689 * Solve a conflict detected when replaying a Modify DN operation. 1690 * 1691 * @param op The operation that triggered the conflict detection. 1692 * @param msg The operation that triggered the conflict detection. 1693 * @return true if the process is completed, false if it must continue. 1694 * @throws Exception When the operation is not valid. 1695 */ 1696 private boolean solveNamingConflict(ModifyDNOperation op, 1697 UpdateMessage msg) throws Exception 1698 { 1699 ResultCode result = op.getResultCode(); 1700 ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); 1701 String entryUid = ctx.getEntryUid(); 1702 String newSuperiorID = ctx.getNewParentId(); 1703 1704 /* 1705 * four possible cases : 1706 * - the modified entry has been renamed 1707 * - the new parent has been renamed 1708 * - the operation is replayed for the second time. 1709 * - the entry has been deleted 1710 * action : 1711 * - change the target dn and the new parent dn and 1712 * restart the operation, 1713 * - don't do anything if the operation is replayed. 1714 */ 1715 1716 // Construct the new DN to use for the entry. 1717 DN entryDN = op.getEntryDN(); 1718 DN newSuperior = findEntryDN(newSuperiorID); 1719 RDN newRDN = op.getNewRDN(); 1720 DN parentDN; 1721 1722 if (newSuperior == null) 1723 { 1724 parentDN = entryDN.getParent(); 1725 } 1726 else 1727 { 1728 parentDN = newSuperior; 1729 } 1730 1731 if ((parentDN == null) || parentDN.isNullDN()) 1732 { 1733 /* this should never happen 1734 * can't solve any conflict in this case. 1735 */ 1736 throw new Exception("operation parameters are invalid"); 1737 } 1738 1739 DN newDN = parentDN.concat(newRDN); 1740 1741 // get the current DN of this entry in the database. 1742 DN currentDN = findEntryDN(entryUid); 1743 1744 if (currentDN == null) 1745 { 1746 // The entry targetted by the Modify DN is not in the database 1747 // anymore. 1748 // This is a conflict between a delete and this modify DN. 1749 // The entry has been deleted anymore so we can safely assume 1750 // that the operation is completed. 1751 numResolvedNamingConflicts.incrementAndGet(); 1752 return true; 1753 } 1754 1755 // if the newDN and the current DN match then the operation 1756 // is a no-op (this was probably a second replay) 1757 // don't do anything. 1758 if (newDN.equals(currentDN)) 1759 { 1760 numResolvedNamingConflicts.incrementAndGet(); 1761 return true; 1762 } 1763 1764 // If we could not find the new parent entry, we missed this entry 1765 // earlier or it has disappeared from the database 1766 // Log this information for the repair tool and mark the entry 1767 // as conflicting. 1768 // stop the processing. 1769 if (newSuperior == null) 1770 { 1771 markConflictEntry(op, currentDN, newDN); 1772 numUnresolvedNamingConflicts.incrementAndGet(); 1773 return true; 1774 } 1775 1776 if ((result == ResultCode.NO_SUCH_OBJECT) || 1777 (result == ResultCode.OBJECTCLASS_VIOLATION)) 1778 { 1779 /* 1780 * The entry or it's new parent has not been found 1781 * reconstruct the operation with the DN we just built 1782 */ 1783 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 1784 msg.setDn(currentDN.toString()); 1785 modifyDnMsg.setNewSuperior(newSuperior.toString()); 1786 numResolvedNamingConflicts.incrementAndGet(); 1787 return false; 1788 } 1789 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 1790 { 1791 /* 1792 * This may happen when two modifyDn operation 1793 * are done on different servers but with the same target DN 1794 * add the conflict object class to the entry 1795 * and rename it using its entryuuid. 1796 */ 1797 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 1798 markConflictEntry(op, op.getEntryDN(), newDN); 1799 modifyDnMsg.setNewRDN(generateConflictRDN(entryUid, 1800 modifyDnMsg.getNewRDN())); 1801 modifyDnMsg.setNewSuperior(newSuperior.toString()); 1802 numUnresolvedNamingConflicts.incrementAndGet(); 1803 return false; 1804 } 1805 else 1806 { 1807 // The other type of errors can not be caused by naming conflicts. 1808 // Log a message for the repair tool. 1809 Message message = ERR_ERROR_REPLAYING_OPERATION.get( 1810 op.toString(), ctx.getChangeNumber().toString(), 1811 result.toString(), op.getErrorMessage().toString()); 1812 logError(message); 1813 return true; 1814 } 1815 } 1816 1817 1818 /** 1819 * Solve a conflict detected when replaying a ADD operation. 1820 * 1821 * @param op The operation that triggered the conflict detection. 1822 * @param msg The message that triggered the conflict detection. 1823 * @return true if the process is completed, false if it must continue. 1824 * @throws Exception When the operation is not valid. 1825 */ 1826 private boolean solveNamingConflict(AddOperation op, 1827 AddMsg msg) throws Exception 1828 { 1829 ResultCode result = op.getResultCode(); 1830 AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); 1831 String entryUid = ctx.getEntryUid(); 1832 String parentUniqueId = ctx.getParentUid(); 1833 1834 if (result == ResultCode.NO_SUCH_OBJECT) 1835 { 1836 /* 1837 * This can happen if the parent has been renamed or deleted 1838 * find the parent dn and calculate a new dn for the entry 1839 */ 1840 if (parentUniqueId == null) 1841 { 1842 /* 1843 * This entry is the base dn of the backend. 1844 * It is quite surprising that the operation result be NO_SUCH_OBJECT. 1845 * There is nothing more we can do except TODO log a 1846 * message for the repair tool to look at this problem. 1847 */ 1848 return true; 1849 } 1850 DN parentDn = findEntryDN(parentUniqueId); 1851 if (parentDn == null) 1852 { 1853 /* 1854 * The parent has been deleted 1855 * rename the entry as a conflicting entry. 1856 * The action taken here must be consistent with the actions 1857 * done when in the solveNamingConflict(DeleteOperation) method 1858 * when we are deleting an entry that have some child entries. 1859 */ 1860 addConflict(msg); 1861 1862 msg.setDn(generateConflictRDN(entryUid, 1863 op.getEntryDN().getRDN().toString()) + "," 1864 + baseDN); 1865 // reset the parent uid so that the check done is the handleConflict 1866 // phase does not fail. 1867 msg.setParentUid(null); 1868 numUnresolvedNamingConflicts.incrementAndGet(); 1869 return false; 1870 } 1871 else 1872 { 1873 RDN entryRdn = DN.decode(msg.getDn()).getRDN(); 1874 msg.setDn(entryRdn + "," + parentDn); 1875 numResolvedNamingConflicts.incrementAndGet(); 1876 return false; 1877 } 1878 } 1879 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 1880 { 1881 /* 1882 * This can happen if 1883 * - two adds are done on different servers but with the 1884 * same target DN. 1885 * - the same ADD is being replayed for the second time on this server. 1886 * if the nsunique ID already exist, assume this is a replay and 1887 * don't do anything 1888 * if the entry unique id do not exist, generate conflict. 1889 */ 1890 if (findEntryDN(entryUid) != null) 1891 { 1892 // entry already exist : this is a replay 1893 return true; 1894 } 1895 else 1896 { 1897 addConflict(msg); 1898 msg.setDn(generateConflictRDN(entryUid, msg.getDn())); 1899 numUnresolvedNamingConflicts.incrementAndGet(); 1900 return false; 1901 } 1902 } 1903 else 1904 { 1905 // The other type of errors can not be caused by naming conflicts. 1906 // log a message for the repair tool. 1907 Message message = ERR_ERROR_REPLAYING_OPERATION.get( 1908 op.toString(), ctx.getChangeNumber().toString(), 1909 result.toString(), op.getErrorMessage().toString()); 1910 logError(message); 1911 return true; 1912 } 1913 } 1914 1915 /** 1916 * Find all the entries below the provided DN and rename them 1917 * so that they stay below the baseDn of this replicationDomain and 1918 * use the conflicting name and attribute. 1919 * 1920 * @param entryUid The unique ID of the entry whose child must be renamed. 1921 * @param entryDN The DN of the entry whose child must be renamed. 1922 * @param conflictOp The Operation that generated the conflict. 1923 */ 1924 private void findAndRenameChild( 1925 String entryUid, DN entryDN, Operation conflictOp) 1926 { 1927 // Find an rename child entries. 1928 InternalClientConnection conn = 1929 InternalClientConnection.getRootConnection(); 1930 1931 try 1932 { 1933 LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); 1934 attrs.add(ENTRYUIDNAME); 1935 1936 SearchFilter ALLMATCH; 1937 ALLMATCH = SearchFilter.createFilterFromString("(objectClass=*)"); 1938 InternalSearchOperation op = 1939 conn.processSearch(entryDN, SearchScope.SINGLE_LEVEL, 1940 DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, ALLMATCH, 1941 attrs); 1942 1943 if (op.getResultCode() == ResultCode.SUCCESS) 1944 { 1945 LinkedList<SearchResultEntry> entries = op.getSearchEntries(); 1946 if (entries != null) 1947 { 1948 for (SearchResultEntry entry : entries) 1949 { 1950 markConflictEntry(conflictOp, entry.getDN(), entryDN); 1951 renameConflictEntry(conflictOp, entry.getDN(), 1952 Historical.getEntryUuid(entry)); 1953 } 1954 } 1955 } 1956 else 1957 { 1958 // log error and information for the REPAIR tool. 1959 MessageBuilder mb = new MessageBuilder(); 1960 mb.append(ERR_CANNOT_RENAME_CONFLICT_ENTRY.get()); 1961 mb.append(String.valueOf(entryDN)); 1962 mb.append(" "); 1963 mb.append(String.valueOf(conflictOp)); 1964 mb.append(" "); 1965 mb.append(String.valueOf(op.getResultCode())); 1966 logError(mb.toMessage()); 1967 } 1968 } catch (DirectoryException e) 1969 { 1970 // log errror and information for the REPAIR tool. 1971 MessageBuilder mb = new MessageBuilder(); 1972 mb.append(ERR_EXCEPTION_RENAME_CONFLICT_ENTRY.get()); 1973 mb.append(String.valueOf(entryDN)); 1974 mb.append(" "); 1975 mb.append(String.valueOf(conflictOp)); 1976 mb.append(" "); 1977 mb.append(e.getLocalizedMessage()); 1978 logError(mb.toMessage()); 1979 } 1980 } 1981 1982 1983 /** 1984 * Rename an entry that was conflicting so that it stays below the 1985 * baseDN of the replicationDomain. 1986 * 1987 * @param conflictOp The Operation that caused the conflict. 1988 * @param dn The DN of the entry to be renamed. 1989 * @param uid The uniqueID of the entry to be renamed. 1990 */ 1991 private void renameConflictEntry(Operation conflictOp, DN dn, String uid) 1992 { 1993 InternalClientConnection conn = 1994 InternalClientConnection.getRootConnection(); 1995 1996 ModifyDNOperation newOp = conn.processModifyDN( 1997 dn, generateDeleteConflictDn(uid, dn),false, baseDN); 1998 1999 if (newOp.getResultCode() != ResultCode.SUCCESS) 2000 { 2001 // log information for the repair tool. 2002 MessageBuilder mb = new MessageBuilder(); 2003 mb.append(ERR_CANNOT_RENAME_CONFLICT_ENTRY.get()); 2004 mb.append(String.valueOf(dn)); 2005 mb.append(" "); 2006 mb.append(String.valueOf(conflictOp)); 2007 mb.append(" "); 2008 mb.append(String.valueOf(newOp.getResultCode())); 2009 logError(mb.toMessage()); 2010 } 2011 } 2012 2013 2014 /** 2015 * Generate a modification to add the conflict attribute to an entry 2016 * whose Dn is now conflicting with another entry. 2017 * 2018 * @param op The operation causing the conflict. 2019 * @param currentDN The current DN of the operation to mark as conflicting. 2020 * @param conflictDN The newDn on which the conflict happened. 2021 */ 2022 private void markConflictEntry(Operation op, DN currentDN, DN conflictDN) 2023 { 2024 // create new internal modify operation and run it. 2025 InternalClientConnection conn = 2026 InternalClientConnection.getRootConnection(); 2027 2028 AttributeType attrType = 2029 DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true); 2030 LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); 2031 values.add(new AttributeValue(attrType, conflictDN.toString())); 2032 Attribute attr = new Attribute(attrType, DS_SYNC_CONFLICT, values); 2033 List<Modification> mods = new ArrayList<Modification>(); 2034 Modification mod = new Modification(ModificationType.REPLACE, attr); 2035 mods.add(mod); 2036 ModifyOperation newOp = conn.processModify(currentDN, mods); 2037 if (newOp.getResultCode() != ResultCode.SUCCESS) 2038 { 2039 // Log information for the repair tool. 2040 MessageBuilder mb = new MessageBuilder(); 2041 mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get()); 2042 mb.append(String.valueOf(op)); 2043 mb.append(" "); 2044 mb.append(String.valueOf(newOp.getResultCode())); 2045 logError(mb.toMessage()); 2046 } 2047 2048 // Generate an alert to let the administratot know that some 2049 // conflict could not be solved. 2050 Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN.toString()); 2051 DirectoryServer.sendAlertNotification(this, 2052 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 2053 } 2054 2055 /** 2056 * Add the conflict attribute to an entry that could 2057 * not be added because it is conflicting with another entry. 2058 * 2059 * @param msg The conflicting Add Operation. 2060 * 2061 * @throws ASN1Exception When an encoding error happenned manipulating the 2062 * msg. 2063 */ 2064 private void addConflict(AddMsg msg) throws ASN1Exception 2065 { 2066 // Generate an alert to let the administratot know that some 2067 // conflict could not be solved. 2068 Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn()); 2069 DirectoryServer.sendAlertNotification(this, 2070 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 2071 2072 // Add the conflict attribute 2073 msg.addAttribute(DS_SYNC_CONFLICT, msg.getDn()); 2074 } 2075 2076 /** 2077 * Generate the Dn to use for a conflicting entry. 2078 * 2079 * @param entryUid The unique identifier of the entry involved in the 2080 * conflict. 2081 * @param rdn Original rdn. 2082 * @return The generated RDN for a conflicting entry. 2083 */ 2084 private String generateConflictRDN(String entryUid, String rdn) 2085 { 2086 return "entryuuid=" + entryUid + "+" + rdn; 2087 } 2088 2089 /** 2090 * Generate the RDN to use for a conflicting entry whose father was deleted. 2091 * 2092 * @param entryUid The unique identifier of the entry involved in the 2093 * conflict. 2094 * @param dn The original DN of the entry. 2095 * 2096 * @return The generated RDN for a conflicting entry. 2097 * @throws DirectoryException 2098 */ 2099 private RDN generateDeleteConflictDn(String entryUid, DN dn) 2100 { 2101 String newRDN = "entryuuid=" + entryUid + "+" + dn.getRDN(); 2102 RDN rdn = null; 2103 try 2104 { 2105 rdn = RDN.decode(newRDN); 2106 } catch (DirectoryException e) 2107 { 2108 // cannot happen 2109 } 2110 return rdn; 2111 } 2112 2113 /** 2114 * Check if an operation must be processed as an assured operation. 2115 * 2116 * @param op the operation to be checked. 2117 * @return true if the operations must be processed as an assured operation. 2118 */ 2119 private boolean isAssured(PostOperationOperation op) 2120 { 2121 // TODO : should have a filtering mechanism for checking 2122 // operation that are assured and operations that are not. 2123 return false; 2124 } 2125 2126 /** 2127 * Get the maximum receive window size. 2128 * 2129 * @return The maximum receive window size. 2130 */ 2131 public int getMaxRcvWindow() 2132 { 2133 if (broker != null) 2134 return broker.getMaxRcvWindow(); 2135 else 2136 return 0; 2137 } 2138 2139 /** 2140 * Get the current receive window size. 2141 * 2142 * @return The current receive window size. 2143 */ 2144 public int getCurrentRcvWindow() 2145 { 2146 if (broker != null) 2147 return broker.getCurrentRcvWindow(); 2148 else 2149 return 0; 2150 } 2151 2152 /** 2153 * Get the maximum send window size. 2154 * 2155 * @return The maximum send window size. 2156 */ 2157 public int getMaxSendWindow() 2158 { 2159 if (broker != null) 2160 return broker.getMaxSendWindow(); 2161 else 2162 return 0; 2163 } 2164 2165 /** 2166 * Get the current send window size. 2167 * 2168 * @return The current send window size. 2169 */ 2170 public int getCurrentSendWindow() 2171 { 2172 if (broker != null) 2173 return broker.getCurrentSendWindow(); 2174 else 2175 return 0; 2176 } 2177 2178 /** 2179 * Get the number of times the replication connection was lost. 2180 * @return The number of times the replication connection was lost. 2181 */ 2182 public int getNumLostConnections() 2183 { 2184 if (broker != null) 2185 return broker.getNumLostConnections(); 2186 else 2187 return 0; 2188 } 2189 2190 /** 2191 * Get the number of modify conflicts successfully resolved. 2192 * @return The number of modify conflicts successfully resolved. 2193 */ 2194 public int getNumResolvedModifyConflicts() 2195 { 2196 return numResolvedModifyConflicts.get(); 2197 } 2198 2199 /** 2200 * Get the number of namign conflicts successfully resolved. 2201 * @return The number of naming conflicts successfully resolved. 2202 */ 2203 public int getNumResolvedNamingConflicts() 2204 { 2205 return numResolvedNamingConflicts.get(); 2206 } 2207 2208 /** 2209 * Get the number of unresolved conflicts. 2210 * @return The number of unresolved conflicts. 2211 */ 2212 public int getNumUnresolvedNamingConflicts() 2213 { 2214 return numUnresolvedNamingConflicts.get(); 2215 } 2216 2217 /** 2218 * Get the server ID. 2219 * @return The server ID. 2220 */ 2221 public int getServerId() 2222 { 2223 return serverId; 2224 } 2225 2226 /** 2227 * Check if the domain solve conflicts. 2228 * 2229 * @return a boolean indicating if the domain should sove conflicts. 2230 */ 2231 public boolean solveConflict() 2232 { 2233 return solveConflictFlag; 2234 } 2235 2236 /** 2237 * Disable the replication on this domain. 2238 * The session to the replication server will be stopped. 2239 * The domain will not be destroyed but call to the pre-operation 2240 * methods will result in failure. 2241 * The listener thread will be destroyed. 2242 * The monitor informations will still be accessible. 2243 */ 2244 public void disable() 2245 { 2246 state.save(); 2247 state.clearInMemory(); 2248 disabled = true; 2249 2250 // Stop the listener thread 2251 listenerThread.shutdown(); 2252 2253 broker.stop(); // This will cut the session and wake up the listener 2254 2255 // Wait for the listener thread to stop 2256 listenerThread.waitForShutdown(); 2257 } 2258 2259 /** 2260 * Do what necessary when the data have changed : load state, load 2261 * generation Id. 2262 * @exception DirectoryException Thrown when an error occurs. 2263 */ 2264 protected void loadDataState() 2265 throws DirectoryException 2266 { 2267 state.clearInMemory(); 2268 state.loadState(); 2269 generator.adjust(state.getMaxChangeNumber(serverId)); 2270 // Retrieves the generation ID associated with the data imported 2271 generationId = loadGenerationId(); 2272 } 2273 2274 /** 2275 * Enable back the domain after a previous disable. 2276 * The domain will connect back to a replication Server and 2277 * will recreate threads to listen for messages from the Sycnhronization 2278 * server. 2279 * The generationId will be retrieved or computed if necessary. 2280 * The ServerState will also be read again from the local database. 2281 */ 2282 public void enable() 2283 { 2284 try 2285 { 2286 loadDataState(); 2287 } 2288 catch (Exception e) 2289 { 2290 /* TODO should mark that replicationServer service is 2291 * not available, log an error and retry upon timeout 2292 * should we stop the modifications ? 2293 */ 2294 logError(ERR_LOADING_GENERATION_ID.get( 2295 baseDN.toNormalizedString(), e.getLocalizedMessage())); 2296 return; 2297 } 2298 2299 // After an on-line import, the value of the generationId is new 2300 // and it is necessary for the broker to send this new value as part 2301 // of the serverStart message. 2302 broker.setGenerationId(generationId); 2303 2304 broker.start(replicationServers); 2305 2306 // Create the listener thread 2307 listenerThread = new ListenerThread(this, updateToReplayQueue); 2308 listenerThread.start(); 2309 2310 disabled = false; 2311 } 2312 2313 /** 2314 * Compute the data generationId associated with the current data present 2315 * in the backend for this domain. 2316 * @return The computed generationId. 2317 * @throws DirectoryException When an error occurs. 2318 */ 2319 public long computeGenerationId() throws DirectoryException 2320 { 2321 Backend backend = retrievesBackend(baseDN); 2322 long bec = backend.numSubordinates(baseDN, true) + 1; 2323 this.acquireIEContext(); 2324 ieContext.checksumOutput = true; 2325 ieContext.entryCount = (bec<1000?bec:1000); 2326 ieContext.entryLeftCount = ieContext.entryCount; 2327 exportBackend(); 2328 long genId = ieContext.checksumOutputValue; 2329 2330 if (debugEnabled()) 2331 TRACER.debugInfo("Computed generationId: #entries=" + bec + 2332 " generationId=" + ieContext.checksumOutputValue); 2333 ieContext.checksumOutput = false; 2334 this.releaseIEContext(); 2335 return genId; 2336 } 2337 2338 /** 2339 * Returns the generationId set for this domain. 2340 * 2341 * @return The generationId. 2342 */ 2343 public long getGenerationId() 2344 { 2345 return generationId; 2346 } 2347 2348 /** 2349 * The attribute name used to store the state in the backend. 2350 */ 2351 protected static final String REPLICATION_GENERATION_ID = 2352 "ds-sync-generation-id"; 2353 2354 /** 2355 * Stores the value of the generationId. 2356 * @param generationId The value of the generationId. 2357 * @return a ResultCode indicating if the method was successfull. 2358 */ 2359 public ResultCode saveGenerationId(long generationId) 2360 { 2361 // The generationId is stored in the root entry of the domain. 2362 ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); 2363 2364 ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); 2365 ASN1OctetString value = new ASN1OctetString(Long.toString(generationId)); 2366 values.add(value); 2367 2368 LDAPAttribute attr = 2369 new LDAPAttribute(REPLICATION_GENERATION_ID, values); 2370 LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); 2371 ArrayList<RawModification> mods = new ArrayList<RawModification>(1); 2372 mods.add(mod); 2373 2374 ModifyOperationBasis op = 2375 new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(), 2376 InternalClientConnection.nextMessageID(), 2377 new ArrayList<Control>(0), asn1BaseDn, 2378 mods); 2379 op.setInternalOperation(true); 2380 op.setSynchronizationOperation(true); 2381 op.setDontSynchronize(true); 2382 2383 op.run(); 2384 2385 ResultCode result = op.getResultCode(); 2386 if (result != ResultCode.SUCCESS) 2387 { 2388 generationIdSavedStatus = false; 2389 if (result != ResultCode.NO_SUCH_OBJECT) 2390 { 2391 // The case where the backend is empty (NO_SUCH_OBJECT) 2392 // is not an error case. 2393 Message message = ERR_UPDATING_GENERATION_ID.get( 2394 op.getResultCode().getResultCodeName() + " " + 2395 op.getErrorMessage(), 2396 baseDN.toString()); 2397 logError(message); 2398 } 2399 } 2400 else 2401 { 2402 generationIdSavedStatus = true; 2403 } 2404 return result; 2405 } 2406 2407 2408 /** 2409 * Load the GenerationId from the root entry of the domain 2410 * from the REPLICATION_GENERATION_ID attribute in database 2411 * to memory, or compute it if not found. 2412 * 2413 * @return generationId The retrieved value of generationId 2414 * @throws DirectoryException When an error occurs. 2415 */ 2416 public long loadGenerationId() 2417 throws DirectoryException 2418 { 2419 long generationId=-1; 2420 2421 if (debugEnabled()) 2422 TRACER.debugInfo( 2423 "Attempt to read generation ID from DB " + baseDN.toString()); 2424 2425 ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); 2426 boolean found = false; 2427 LDAPFilter filter; 2428 try 2429 { 2430 filter = LDAPFilter.decode("objectclass=*"); 2431 } 2432 catch (LDAPException e) 2433 { 2434 // can not happen 2435 return -1; 2436 } 2437 2438 /* 2439 * Search the database entry that is used to periodically 2440 * save the ServerState 2441 */ 2442 InternalSearchOperation search = null; 2443 LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); 2444 attributes.add(REPLICATION_GENERATION_ID); 2445 search = conn.processSearch(asn1BaseDn, 2446 SearchScope.BASE_OBJECT, 2447 DereferencePolicy.DEREF_ALWAYS, 0, 0, false, 2448 filter,attributes); 2449 if (((search.getResultCode() != ResultCode.SUCCESS)) && 2450 ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) 2451 { 2452 Message message = ERR_SEARCHING_GENERATION_ID.get( 2453 search.getResultCode().getResultCodeName() + " " + 2454 search.getErrorMessage(), 2455 baseDN.toString()); 2456 logError(message); 2457 } 2458 2459 SearchResultEntry resultEntry = null; 2460 if (search.getResultCode() == ResultCode.SUCCESS) 2461 { 2462 LinkedList<SearchResultEntry> result = search.getSearchEntries(); 2463 resultEntry = result.getFirst(); 2464 if (resultEntry != null) 2465 { 2466 AttributeType synchronizationGenIDType = 2467 DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); 2468 List<Attribute> attrs = 2469 resultEntry.getAttribute(synchronizationGenIDType); 2470 if (attrs != null) 2471 { 2472 Attribute attr = attrs.get(0); 2473 LinkedHashSet<AttributeValue> values = attr.getValues(); 2474 if (values.size()>1) 2475 { 2476 Message message = ERR_LOADING_GENERATION_ID.get( 2477 baseDN.toString(), "#Values=" + values.size() + 2478 " Must be exactly 1 in entry " + 2479 resultEntry.toLDIFString()); 2480 logError(message); 2481 } 2482 else if (values.size() == 1) 2483 { 2484 found=true; 2485 try 2486 { 2487 generationId = Long.decode(values.iterator().next(). 2488 getStringValue()); 2489 } 2490 catch(Exception e) 2491 { 2492 Message message = ERR_LOADING_GENERATION_ID.get( 2493 baseDN.toString(), e.getLocalizedMessage()); 2494 logError(message); 2495 } 2496 } 2497 } 2498 } 2499 } 2500 2501 if (!found) 2502 { 2503 generationId = computeGenerationId(); 2504 saveGenerationId(generationId); 2505 2506 if (debugEnabled()) 2507 TRACER.debugInfo("Generation ID created for domain base DN=" + 2508 baseDN.toString() + 2509 " generationId=" + generationId); 2510 } 2511 else 2512 { 2513 generationIdSavedStatus = true; 2514 if (debugEnabled()) 2515 TRACER.debugInfo( 2516 "Generation ID successfully read from domain base DN=" + baseDN + 2517 " generationId=" + generationId); 2518 } 2519 return generationId; 2520 } 2521 2522 /** 2523 * Reset the generationId of this domain in the whole topology. 2524 * A message is sent to the Replication Servers for them to reset 2525 * their change dbs. 2526 * 2527 * @param generationIdNewValue The new value of the generation Id. 2528 */ 2529 public void resetGenerationId(Long generationIdNewValue) 2530 { 2531 if (debugEnabled()) 2532 TRACER.debugInfo( 2533 this.getName() + "resetGenerationId" + generationIdNewValue); 2534 2535 ResetGenerationId genIdMessage = null; 2536 if (generationIdNewValue == null) 2537 { 2538 genIdMessage = new ResetGenerationId(this.generationId); 2539 } 2540 else 2541 { 2542 genIdMessage = new ResetGenerationId(generationIdNewValue); 2543 } 2544 broker.publish(genIdMessage); 2545 } 2546 2547 /** 2548 * Do whatever is needed when a backup is started. 2549 * We need to make sure that the serverState is correclty save. 2550 */ 2551 public void backupStart() 2552 { 2553 state.save(); 2554 } 2555 2556 /** 2557 * Do whatever is needed when a backup is finished. 2558 */ 2559 public void backupEnd() 2560 { 2561 // Nothing is needed at the moment 2562 } 2563 2564 /* 2565 * Total Update >> 2566 */ 2567 2568 /** 2569 * Receives bytes related to an entry in the context of an import to 2570 * initialize the domain (called by ReplLDIFInputStream). 2571 * 2572 * @return The bytes. Null when the Done or Err message has been received 2573 */ 2574 public byte[] receiveEntryBytes() 2575 { 2576 ReplicationMessage msg; 2577 while (true) 2578 { 2579 try 2580 { 2581 msg = broker.receive(); 2582 2583 if (debugEnabled()) 2584 TRACER.debugVerbose( 2585 " sid:" + this.serverId + 2586 " base DN:" + this.baseDN + 2587 " Import EntryBytes received " + msg); 2588 if (msg == null) 2589 { 2590 // The server is in the shutdown process 2591 return null; 2592 } 2593 2594 if (msg instanceof EntryMessage) 2595 { 2596 EntryMessage entryMsg = (EntryMessage)msg; 2597 byte[] entryBytes = entryMsg.getEntryBytes(); 2598 ieContext.updateCounters(); 2599 return entryBytes; 2600 } 2601 else if (msg instanceof DoneMessage) 2602 { 2603 // This is the normal termination of the import 2604 // No error is stored and the import is ended 2605 // by returning null 2606 return null; 2607 } 2608 else if (msg instanceof ErrorMessage) 2609 { 2610 // This is an error termination during the import 2611 // The error is stored and the import is ended 2612 // by returning null 2613 ErrorMessage errorMsg = (ErrorMessage)msg; 2614 ieContext.exception = new DirectoryException( 2615 ResultCode.OTHER, 2616 errorMsg.getDetails()); 2617 return null; 2618 } 2619 else 2620 { 2621 // Other messages received during an import are trashed 2622 } 2623 } 2624 catch(Exception e) 2625 { 2626 // TODO: i18n 2627 ieContext.exception = new DirectoryException(ResultCode.OTHER, 2628 Message.raw("received an unexpected message type" + 2629 e.getLocalizedMessage())); 2630 } 2631 } 2632 } 2633 2634 /** 2635 * Processes an error message received while an import/export is 2636 * on going. 2637 * @param errorMsg The error message received. 2638 */ 2639 protected void abandonImportExport(ErrorMessage errorMsg) 2640 { 2641 // FIXME TBD Treat the case where the error happens while entries 2642 // are being exported 2643 2644 if (debugEnabled()) 2645 TRACER.debugVerbose( 2646 " abandonImportExport:" + this.serverId + 2647 " base DN:" + this.baseDN + 2648 " Error Msg received " + errorMsg); 2649 2650 if (ieContext != null) 2651 { 2652 ieContext.exception = new DirectoryException(ResultCode.OTHER, 2653 errorMsg.getDetails()); 2654 2655 if (ieContext.initializeTask instanceof InitializeTask) 2656 { 2657 // Update the task that initiated the import 2658 ((InitializeTask)ieContext.initializeTask). 2659 updateTaskCompletionState(ieContext.exception); 2660 2661 releaseIEContext(); 2662 } 2663 } 2664 } 2665 2666 /** 2667 * Clears all the entries from the JE backend determined by the 2668 * be id passed into the method. 2669 * 2670 * @param createBaseEntry Indicate whether to automatically create the base 2671 * entry and add it to the backend. 2672 * @param beID The be id to clear. 2673 * @param dn The suffix of the backend to create if the the createBaseEntry 2674 * boolean is true. 2675 * @throws Exception If an unexpected problem occurs. 2676 */ 2677 public static void clearJEBackend(boolean createBaseEntry, String beID, 2678 String dn) throws Exception 2679 { 2680 BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID); 2681 2682 // FIXME Should setBackendEnabled be part of TaskUtils ? 2683 TaskUtils.disableBackend(beID); 2684 2685 try 2686 { 2687 String lockFile = LockFileManager.getBackendLockFileName(backend); 2688 StringBuilder failureReason = new StringBuilder(); 2689 2690 if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason)) 2691 { 2692 throw new RuntimeException(failureReason.toString()); 2693 } 2694 2695 try 2696 { 2697 backend.clearBackend(); 2698 } 2699 finally 2700 { 2701 LockFileManager.releaseLock(lockFile, failureReason); 2702 } 2703 } 2704 finally 2705 { 2706 TaskUtils.enableBackend(beID); 2707 } 2708 2709 if (createBaseEntry) 2710 { 2711 DN baseDN = DN.decode(dn); 2712 Entry e = createEntry(baseDN); 2713 backend = (BackendImpl)DirectoryServer.getBackend(beID); 2714 backend.addEntry(e, null); 2715 } 2716 } 2717 2718 /** 2719 * Export the entries from the backend. 2720 * The ieContext must have been set before calling. 2721 * 2722 * @throws DirectoryException when an error occurred 2723 */ 2724 protected void exportBackend() 2725 throws DirectoryException 2726 { 2727 Backend backend = retrievesBackend(this.baseDN); 2728 2729 // Acquire a shared lock for the backend. 2730 try 2731 { 2732 String lockFile = LockFileManager.getBackendLockFileName(backend); 2733 StringBuilder failureReason = new StringBuilder(); 2734 if (! LockFileManager.acquireSharedLock(lockFile, failureReason)) 2735 { 2736 Message message = ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get( 2737 backend.getBackendID(), String.valueOf(failureReason)); 2738 logError(message); 2739 throw new DirectoryException( 2740 ResultCode.OTHER, message, null); 2741 } 2742 } 2743 catch (Exception e) 2744 { 2745 Message message = 2746 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get( 2747 backend.getBackendID(), e.getLocalizedMessage()); 2748 logError(message); 2749 throw new DirectoryException( 2750 ResultCode.OTHER, message, null); 2751 } 2752 2753 OutputStream os; 2754 ReplLDIFOutputStream ros; 2755 2756 if (ieContext.checksumOutput) 2757 { 2758 ros = new ReplLDIFOutputStream(this, ieContext.entryCount); 2759 os = new CheckedOutputStream(ros, new Adler32()); 2760 try 2761 { 2762 os.write((Long.toString(backend.numSubordinates(baseDN, true) + 1)). 2763 getBytes()); 2764 } 2765 catch(Exception e) 2766 { 2767 // Should never happen 2768 } 2769 } 2770 else 2771 { 2772 ros = new ReplLDIFOutputStream(this, (short)-1); 2773 os = ros; 2774 } 2775 LDIFExportConfig exportConfig = new LDIFExportConfig(os); 2776 2777 // baseDN branch is the only one included in the export 2778 List<DN> includeBranches = new ArrayList<DN>(1); 2779 includeBranches.add(this.baseDN); 2780 exportConfig.setIncludeBranches(includeBranches); 2781 2782 // For the checksum computing mode, only consider the 'stable' attributes 2783 if (ieContext.checksumOutput) 2784 { 2785 String includeAttributeStrings[] = 2786 {"objectclass", "sn", "cn", "entryuuid"}; 2787 HashSet<AttributeType> includeAttributes; 2788 includeAttributes = new HashSet<AttributeType>(); 2789 for (String attrName : includeAttributeStrings) 2790 { 2791 AttributeType attrType = DirectoryServer.getAttributeType(attrName); 2792 if (attrType == null) 2793 { 2794 attrType = DirectoryServer.getDefaultAttributeType(attrName); 2795 } 2796 includeAttributes.add(attrType); 2797 } 2798 exportConfig.setIncludeAttributes(includeAttributes); 2799 } 2800 2801 // Launch the export. 2802 try 2803 { 2804 backend.exportLDIF(exportConfig); 2805 } 2806 catch (DirectoryException de) 2807 { 2808 if ((ieContext != null) && (ieContext.checksumOutput) && 2809 (ros.getNumExportedEntries() >= ieContext.entryCount)) 2810 { 2811 // This is the normal end when computing the generationId 2812 // We can interrupt the export only by an IOException 2813 } 2814 else 2815 { 2816 Message message = 2817 ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); 2818 logError(message); 2819 throw new DirectoryException( 2820 ResultCode.OTHER, message, null); 2821 } 2822 } 2823 catch (Exception e) 2824 { 2825 Message message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get( 2826 stackTraceToSingleLineString(e)); 2827 logError(message); 2828 throw new DirectoryException( 2829 ResultCode.OTHER, message, null); 2830 } 2831 finally 2832 { 2833 2834 if ((ieContext != null) && (ieContext.checksumOutput)) 2835 { 2836 ieContext.checksumOutputValue = 2837 ((CheckedOutputStream)os).getChecksum().getValue(); 2838 } 2839 else 2840 { 2841 // Clean up after the export by closing the export config. 2842 // Will also flush the export and export the remaining entries. 2843 // This is a real export where writer has been initialized. 2844 exportConfig.close(); 2845 } 2846 2847 // Release the shared lock on the backend. 2848 try 2849 { 2850 String lockFile = LockFileManager.getBackendLockFileName(backend); 2851 StringBuilder failureReason = new StringBuilder(); 2852 if (! LockFileManager.releaseLock(lockFile, failureReason)) 2853 { 2854 Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get( 2855 backend.getBackendID(), String.valueOf(failureReason)); 2856 logError(message); 2857 throw new DirectoryException( 2858 ResultCode.OTHER, message, null); 2859 } 2860 } 2861 catch (Exception e) 2862 { 2863 Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get( 2864 backend.getBackendID(), stackTraceToSingleLineString(e)); 2865 logError(message); 2866 throw new DirectoryException( 2867 ResultCode.OTHER, message, null); 2868 } 2869 } 2870 } 2871 2872 /** 2873 * Retrieves the backend related to the domain. 2874 * 2875 * @return The backend of that domain. 2876 * @param baseDN The baseDN to retrieve the backend 2877 */ 2878 protected static Backend retrievesBackend(DN baseDN) 2879 { 2880 // Retrieves the backend related to this domain 2881 return DirectoryServer.getBackend(baseDN); 2882 } 2883 2884 /** 2885 * Get the internal broker to perform some operations on it. 2886 * 2887 * @return The broker for this domain. 2888 */ 2889 ReplicationBroker getBroker() 2890 { 2891 return broker; 2892 } 2893 2894 /** 2895 * Exports an entry in LDIF format. 2896 * 2897 * @param lDIFEntry The entry to be exported.. 2898 * 2899 * @throws IOException when an error occurred. 2900 */ 2901 public void exportLDIFEntry(String lDIFEntry) throws IOException 2902 { 2903 // If an error was raised - like receiving an ErrorMessage 2904 // we just let down the export. 2905 if (ieContext.exception != null) 2906 { 2907 IOException ioe = new IOException(ieContext.exception.getMessage()); 2908 ieContext = null; 2909 throw ioe; 2910 } 2911 2912 if (ieContext.checksumOutput == false) 2913 { 2914 EntryMessage entryMessage = new EntryMessage( 2915 serverId, ieContext.exportTarget, lDIFEntry.getBytes()); 2916 broker.publish(entryMessage); 2917 } 2918 try 2919 { 2920 ieContext.updateCounters(); 2921 } 2922 catch (DirectoryException de) 2923 { 2924 throw new IOException(de.getMessage()); 2925 } 2926 } 2927 2928 /** 2929 * Initializes this domain from another source server. 2930 * 2931 * @param source The source from which to initialize 2932 * @param initTask The task that launched the initialization 2933 * and should be updated of its progress. 2934 * @throws DirectoryException when an error occurs 2935 */ 2936 public void initializeFromRemote(short source, Task initTask) 2937 throws DirectoryException 2938 { 2939 if (debugEnabled()) 2940 TRACER.debugInfo("Entering initializeFromRemote"); 2941 2942 acquireIEContext(); 2943 ieContext.initializeTask = initTask; 2944 2945 InitializeRequestMessage initializeMsg = new InitializeRequestMessage( 2946 baseDN, serverId, source); 2947 2948 // Publish Init request msg 2949 broker.publish(initializeMsg); 2950 2951 // .. we expect to receive entries or err after that 2952 } 2953 2954 /** 2955 * Verifies that the given string represents a valid source 2956 * from which this server can be initialized. 2957 * @param sourceString The string representing the source 2958 * @return The source as a short value 2959 * @throws DirectoryException if the string is not valid 2960 */ 2961 public short decodeSource(String sourceString) 2962 throws DirectoryException 2963 { 2964 short source = 0; 2965 Throwable cause = null; 2966 try 2967 { 2968 source = Integer.decode(sourceString).shortValue(); 2969 if ((source >= -1) && (source != serverId)) 2970 { 2971 // TODO Verifies serverID is in the domain 2972 // We shold check here that this is a server implied 2973 // in the current domain. 2974 return source; 2975 } 2976 } 2977 catch(Exception e) 2978 { 2979 cause = e; 2980 } 2981 2982 ResultCode resultCode = ResultCode.OTHER; 2983 Message message = ERR_INVALID_IMPORT_SOURCE.get(); 2984 if (cause != null) 2985 { 2986 throw new DirectoryException( 2987 resultCode, message, cause); 2988 } 2989 else 2990 { 2991 throw new DirectoryException( 2992 resultCode, message); 2993 } 2994 } 2995 2996 /** 2997 * Verifies that the given string represents a valid source 2998 * from which this server can be initialized. 2999 * @param targetString The string representing the source 3000 * @return The source as a short value 3001 * @throws DirectoryException if the string is not valid 3002 */ 3003 public short decodeTarget(String targetString) 3004 throws DirectoryException 3005 { 3006 short target = 0; 3007 Throwable cause; 3008 if (targetString.equalsIgnoreCase("all")) 3009 { 3010 return RoutableMessage.ALL_SERVERS; 3011 } 3012 3013 // So should be a serverID 3014 try 3015 { 3016 target = Integer.decode(targetString).shortValue(); 3017 if (target >= 0) 3018 { 3019 // FIXME Could we check now that it is a know server in the domain ? 3020 } 3021 return target; 3022 } 3023 catch(Exception e) 3024 { 3025 cause = e; 3026 } 3027 ResultCode resultCode = ResultCode.OTHER; 3028 Message message = ERR_INVALID_EXPORT_TARGET.get(); 3029 3030 if (cause != null) 3031 throw new DirectoryException( 3032 resultCode, message, cause); 3033 else 3034 throw new DirectoryException( 3035 resultCode, message); 3036 3037 } 3038 3039 private synchronized void acquireIEContext() 3040 throws DirectoryException 3041 { 3042 if (ieContext != null) 3043 { 3044 // Rejects 2 simultaneous exports 3045 Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); 3046 throw new DirectoryException(ResultCode.OTHER, 3047 message); 3048 } 3049 3050 ieContext = new IEContext(); 3051 } 3052 3053 private synchronized void releaseIEContext() 3054 { 3055 ieContext = null; 3056 } 3057 3058 /** 3059 * Process the initialization of some other server or servers in the topology 3060 * specified by the target argument. 3061 * @param target The target that should be initialized 3062 * @param initTask The task that triggers this initialization and that should 3063 * be updated with its progress. 3064 * 3065 * @exception DirectoryException When an error occurs. 3066 */ 3067 public void initializeRemote(short target, Task initTask) 3068 throws DirectoryException 3069 { 3070 initializeRemote(target, serverId, initTask); 3071 } 3072 3073 /** 3074 * Process the initialization of some other server or servers in the topology 3075 * specified by the target argument when this initialization specifying the 3076 * server that requests the initialization. 3077 * 3078 * @param target The target that should be initialized. 3079 * @param requestorID The server that initiated the export. 3080 * @param initTask The task that triggers this initialization and that should 3081 * be updated with its progress. 3082 * 3083 * @exception DirectoryException When an error occurs. 3084 */ 3085 public void initializeRemote(short target, short requestorID, Task initTask) 3086 throws DirectoryException 3087 { 3088 try 3089 { 3090 Backend backend = retrievesBackend(this.baseDN); 3091 3092 if (!backend.supportsLDIFExport()) 3093 { 3094 Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( 3095 backend.getBackendID().toString()); 3096 logError(message); 3097 throw new DirectoryException(ResultCode.OTHER, message); 3098 } 3099 3100 acquireIEContext(); 3101 3102 // The number of entries to be exported is the number of entries under 3103 // the base DN entry and the base entry itself. 3104 long entryCount = backend.numSubordinates(baseDN, true) + 1; 3105 ieContext.exportTarget = target; 3106 if (initTask != null) 3107 { 3108 ieContext.initializeTask = initTask; 3109 } 3110 ieContext.setCounters(entryCount, entryCount); 3111 3112 // Send start message to the peer 3113 InitializeTargetMessage initializeMessage = new InitializeTargetMessage( 3114 baseDN, serverId, ieContext.exportTarget, requestorID, entryCount); 3115 3116 broker.publish(initializeMessage); 3117 3118 exportBackend(); 3119 3120 // Notify the peer of the success 3121 DoneMessage doneMsg = new DoneMessage(serverId, 3122 initializeMessage.getDestination()); 3123 broker.publish(doneMsg); 3124 3125 releaseIEContext(); 3126 } 3127 catch(DirectoryException de) 3128 { 3129 // Notify the peer of the failure 3130 ErrorMessage errorMsg = 3131 new ErrorMessage(target, 3132 de.getMessageObject()); 3133 broker.publish(errorMsg); 3134 3135 releaseIEContext(); 3136 3137 throw(de); 3138 } 3139 } 3140 3141 /** 3142 * Process backend before import. 3143 * @param backend The backend. 3144 * @throws Exception 3145 */ 3146 private void preBackendImport(Backend backend) 3147 throws Exception 3148 { 3149 // Stop saving state 3150 stateSavingDisabled = true; 3151 3152 // FIXME setBackendEnabled should be part of TaskUtils ? 3153 TaskUtils.disableBackend(backend.getBackendID()); 3154 3155 // Acquire an exclusive lock for the backend. 3156 String lockFile = LockFileManager.getBackendLockFileName(backend); 3157 StringBuilder failureReason = new StringBuilder(); 3158 if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) 3159 { 3160 Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get( 3161 backend.getBackendID(), 3162 String.valueOf(failureReason)); 3163 logError(message); 3164 throw new DirectoryException(ResultCode.OTHER, message); 3165 } 3166 } 3167 3168 /** 3169 * Initializes the domain's backend with received entries. 3170 * @param initializeMessage The message that initiated the import. 3171 * @exception DirectoryException Thrown when an error occurs. 3172 */ 3173 protected void initialize(InitializeTargetMessage initializeMessage) 3174 throws DirectoryException 3175 { 3176 LDIFImportConfig importConfig = null; 3177 DirectoryException de = null; 3178 3179 Backend backend = retrievesBackend(baseDN); 3180 3181 try 3182 { 3183 if (!backend.supportsLDIFImport()) 3184 { 3185 Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get( 3186 backend.getBackendID().toString()); 3187 logError(message); 3188 de = new DirectoryException(ResultCode.OTHER, message); 3189 } 3190 else 3191 { 3192 if (initializeMessage.getRequestorID() == serverId) 3193 { 3194 // The import responds to a request we did so the IEContext 3195 // is already acquired 3196 } 3197 else 3198 { 3199 acquireIEContext(); 3200 } 3201 3202 ieContext.importSource = initializeMessage.getsenderID(); 3203 ieContext.entryLeftCount = initializeMessage.getEntryCount(); 3204 ieContext.setCounters(initializeMessage.getEntryCount(), 3205 initializeMessage.getEntryCount()); 3206 3207 preBackendImport(backend); 3208 3209 ieContext.ldifImportInputStream = new ReplLDIFInputStream(this); 3210 importConfig = 3211 new LDIFImportConfig(ieContext.ldifImportInputStream); 3212 List<DN> includeBranches = new ArrayList<DN>(); 3213 includeBranches.add(this.baseDN); 3214 importConfig.setIncludeBranches(includeBranches); 3215 importConfig.setAppendToExistingData(false); 3216 3217 // TODO How to deal with rejected entries during the import 3218 importConfig.writeRejectedEntries( 3219 getFileForPath("logs" + File.separator + 3220 "replInitRejectedEntries").getAbsolutePath(), 3221 ExistingFileBehavior.OVERWRITE); 3222 3223 // Process import 3224 backend.importLDIF(importConfig); 3225 3226 stateSavingDisabled = false; 3227 } 3228 } 3229 catch(Exception e) 3230 { 3231 de = new DirectoryException(ResultCode.OTHER, 3232 Message.raw(e.getLocalizedMessage())); 3233 } 3234 finally 3235 { 3236 if ((ieContext != null) && (ieContext.exception != null)) 3237 de = ieContext.exception; 3238 3239 // Cleanup 3240 if (importConfig != null) 3241 { 3242 importConfig.close(); 3243 3244 // Re-enable backend 3245 closeBackendImport(backend); 3246 3247 backend = retrievesBackend(baseDN); 3248 } 3249 3250 // Update the task that initiated the import 3251 if ((ieContext != null ) && (ieContext.initializeTask != null)) 3252 { 3253 ((InitializeTask)ieContext.initializeTask). 3254 updateTaskCompletionState(de); 3255 } 3256 releaseIEContext(); 3257 } 3258 // Sends up the root error. 3259 if (de != null) 3260 { 3261 throw de; 3262 } 3263 else 3264 { 3265 loadDataState(); 3266 3267 if (debugEnabled()) 3268 TRACER.debugInfo( 3269 "After import, the replication plugin restarts connections" + 3270 " to all RSs to provide new generation ID=" + generationId); 3271 broker.setGenerationId(generationId); 3272 3273 // Re-exchange generationID and state with RS 3274 broker.reStart(); 3275 } 3276 } 3277 3278 /** 3279 * Make post import operations. 3280 * @param backend The backend implied in the import. 3281 * @exception DirectoryException Thrown when an error occurs. 3282 */ 3283 protected void closeBackendImport(Backend backend) 3284 throws DirectoryException 3285 { 3286 String lockFile = LockFileManager.getBackendLockFileName(backend); 3287 StringBuilder failureReason = new StringBuilder(); 3288 3289 // Release lock 3290 if (!LockFileManager.releaseLock(lockFile, failureReason)) 3291 { 3292 Message message = WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get( 3293 backend.getBackendID(), String.valueOf(failureReason)); 3294 logError(message); 3295 throw new DirectoryException(ResultCode.OTHER, message); 3296 } 3297 3298 TaskUtils.enableBackend(backend.getBackendID()); 3299 } 3300 3301 /** 3302 * Retrieves a replication domain based on the baseDN. 3303 * 3304 * @param baseDN The baseDN of the domain to retrieve 3305 * @return The domain retrieved 3306 * @throws DirectoryException When an error occurred or no domain 3307 * match the provided baseDN. 3308 */ 3309 public static ReplicationDomain retrievesReplicationDomain(DN baseDN) 3310 throws DirectoryException 3311 { 3312 ReplicationDomain replicationDomain = null; 3313 3314 // Retrieves the domain 3315 DirectoryServer.getSynchronizationProviders(); 3316 for (SynchronizationProvider provider : 3317 DirectoryServer.getSynchronizationProviders()) 3318 { 3319 if (!( provider instanceof MultimasterReplication)) 3320 { 3321 Message message = ERR_INVALID_PROVIDER.get(); 3322 throw new DirectoryException(ResultCode.OTHER, 3323 message); 3324 } 3325 3326 // From the domainDN retrieves the replication domain 3327 ReplicationDomain sdomain = 3328 MultimasterReplication.findDomain(baseDN, null); 3329 if (sdomain == null) 3330 { 3331 break; 3332 } 3333 if (replicationDomain != null) 3334 { 3335 // Should never happen 3336 Message message = ERR_MULTIPLE_MATCHING_DOMAIN.get(); 3337 throw new DirectoryException(ResultCode.OTHER, 3338 message); 3339 } 3340 replicationDomain = sdomain; 3341 } 3342 3343 if (replicationDomain == null) 3344 { 3345 MessageBuilder mb = new MessageBuilder(ERR_NO_MATCHING_DOMAIN.get()); 3346 mb.append(" "); 3347 mb.append(String.valueOf(baseDN)); 3348 throw new DirectoryException(ResultCode.OTHER, 3349 mb.toMessage()); 3350 } 3351 return replicationDomain; 3352 } 3353 3354 /** 3355 * Returns the backend associated to this domain. 3356 * @return The associated backend. 3357 */ 3358 public Backend getBackend() 3359 { 3360 return retrievesBackend(baseDN); 3361 } 3362 3363 /** 3364 * Returns a boolean indiciating if an import or export is currently 3365 * processed. 3366 * @return The status 3367 */ 3368 public boolean ieRunning() 3369 { 3370 return (ieContext != null); 3371 } 3372 /* 3373 * <<Total Update 3374 */ 3375 3376 3377 /** 3378 * Push the modifications contain the in given parameter has 3379 * a modification that would happen on a local server. 3380 * The modifications are not applied to the local database, 3381 * historical information is not updated but a ChangeNumber 3382 * is generated and the ServerState associated to this domain is 3383 * updated. 3384 * @param modifications The modification to push 3385 */ 3386 public void synchronizeModifications(List<Modification> modifications) 3387 { 3388 ModifyOperation opBasis = 3389 new ModifyOperationBasis(InternalClientConnection.getRootConnection(), 3390 InternalClientConnection.nextOperationID(), 3391 InternalClientConnection.nextMessageID(), 3392 null, DirectoryServer.getSchemaDN(), 3393 modifications); 3394 LocalBackendModifyOperation op = new LocalBackendModifyOperation(opBasis); 3395 3396 ChangeNumber cn = generateChangeNumber(op); 3397 OperationContext ctx = new ModifyContext(cn, "schema"); 3398 op.setAttachment(SYNCHROCONTEXT, ctx); 3399 op.setResultCode(ResultCode.SUCCESS); 3400 synchronize(op); 3401 } 3402 3403 /** 3404 * Check if the provided configuration is acceptable for add. 3405 * 3406 * @param configuration The configuration to check. 3407 * @param unacceptableReasons When the configuration is not acceptable, this 3408 * table is use to return the reasons why this 3409 * configuration is not acceptbale. 3410 * 3411 * @return true if the configuration is acceptable, false other wise. 3412 */ 3413 public static boolean isConfigurationAcceptable( 3414 ReplicationDomainCfg configuration, List<Message> unacceptableReasons) 3415 { 3416 // Check that there is not already a domain with the same DN 3417 DN dn = configuration.getBaseDN(); 3418 if (MultimasterReplication.findDomain(dn,null) != null) 3419 { 3420 Message message = ERR_SYNC_INVALID_DN.get(); 3421 unacceptableReasons.add(message); 3422 return false; 3423 } 3424 3425 // Check that the base DN is configured as a base-dn of the directory server 3426 if (retrievesBackend(dn) == null) 3427 { 3428 Message message = ERR_UNKNOWN_DN.get(dn.toString()); 3429 unacceptableReasons.add(message); 3430 return false; 3431 } 3432 return true; 3433 } 3434 3435 /** 3436 * {@inheritDoc} 3437 */ 3438 public ConfigChangeResult applyConfigurationChange( 3439 ReplicationDomainCfg configuration) 3440 { 3441 // server id and base dn are readonly. 3442 // isolationPolicy can be set immediately and will apply 3443 // to the next updates. 3444 // The other parameters needs to be renegociated with the ReplicationServer. 3445 // so that requires restarting the session with the ReplicationServer. 3446 replicationServers = configuration.getReplicationServer(); 3447 window = configuration.getWindowSize(); 3448 heartbeatInterval = configuration.getHeartbeatInterval(); 3449 broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay, 3450 maxSendQueue, maxSendDelay, window, heartbeatInterval); 3451 isolationpolicy = configuration.getIsolationPolicy(); 3452 3453 return new ConfigChangeResult(ResultCode.SUCCESS, false); 3454 } 3455 3456 /** 3457 * {@inheritDoc} 3458 */ 3459 public boolean isConfigurationChangeAcceptable( 3460 ReplicationDomainCfg configuration, List<Message> unacceptableReasons) 3461 { 3462 return true; 3463 } 3464 3465 /** 3466 * {@inheritDoc} 3467 */ 3468 public LinkedHashMap<String, String> getAlerts() 3469 { 3470 LinkedHashMap<String,String> alerts = new LinkedHashMap<String,String>(); 3471 3472 alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, 3473 ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT); 3474 return alerts; 3475 } 3476 3477 /** 3478 * {@inheritDoc} 3479 */ 3480 public String getClassName() 3481 { 3482 return CLASS_NAME; 3483 3484 } 3485 3486 /** 3487 * {@inheritDoc} 3488 */ 3489 public DN getComponentEntryDN() 3490 { 3491 return configDn; 3492 } 3493 3494 /** 3495 * Check if the domain is connected to a ReplicationServer. 3496 * 3497 * @return true if the server is connected, false if not. 3498 */ 3499 public boolean isConnected() 3500 { 3501 if (broker != null) 3502 return broker.isConnected(); 3503 else 3504 return false; 3505 } 3506 3507 /** 3508 * Determine whether the connection to the replication server is encrypted. 3509 * @return true if the connection is encrypted, false otherwise. 3510 */ 3511 public boolean isSessionEncrypted() 3512 { 3513 if (broker != null) 3514 return broker.isSessionEncrypted(); 3515 else 3516 return false; 3517 } 3518 }