001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 029 import java.util.ArrayList; 030 import static org.opends.server.replication.plugin. 031 ReplicationRepairRequestControl.*; 032 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.concurrent.LinkedBlockingQueue; 037 038 import org.opends.messages.Message; 039 import org.opends.server.admin.server.ConfigurationAddListener; 040 import org.opends.server.admin.server.ConfigurationChangeListener; 041 import org.opends.server.admin.server.ConfigurationDeleteListener; 042 import org.opends.server.admin.std.server.ReplicationDomainCfg; 043 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; 044 import org.opends.server.api.Backend; 045 import org.opends.server.api.BackupTaskListener; 046 import org.opends.server.api.ExportTaskListener; 047 import org.opends.server.api.ImportTaskListener; 048 import org.opends.server.api.RestoreTaskListener; 049 import org.opends.server.api.SynchronizationProvider; 050 import org.opends.server.config.ConfigException; 051 import org.opends.server.core.DirectoryServer; 052 import org.opends.server.types.BackupConfig; 053 import org.opends.server.types.ConfigChangeResult; 054 import org.opends.server.types.Control; 055 import org.opends.server.types.DN; 056 import org.opends.server.types.DirectoryException; 057 import org.opends.server.types.Entry; 058 import org.opends.server.types.LDIFExportConfig; 059 import org.opends.server.types.LDIFImportConfig; 060 import org.opends.server.types.Modification; 061 import org.opends.server.types.Operation; 062 import org.opends.server.types.RestoreConfig; 063 import org.opends.server.types.ResultCode; 064 import org.opends.server.types.SynchronizationProviderResult; 065 import org.opends.server.types.operation.PluginOperation; 066 import org.opends.server.types.operation.PostOperationAddOperation; 067 import org.opends.server.types.operation.PostOperationDeleteOperation; 068 import org.opends.server.types.operation.PostOperationModifyDNOperation; 069 import org.opends.server.types.operation.PostOperationModifyOperation; 070 import org.opends.server.types.operation.PostOperationOperation; 071 import org.opends.server.types.operation.PreOperationAddOperation; 072 import org.opends.server.types.operation.PreOperationDeleteOperation; 073 import org.opends.server.types.operation.PreOperationModifyDNOperation; 074 import org.opends.server.types.operation.PreOperationModifyOperation; 075 076 /** 077 * This class is used to load the Replication code inside the JVM 078 * and to trigger initialization of the replication. 079 * 080 * It also extends the SynchronizationProvider class in order to have some 081 * replication code running during the operation process 082 * as pre-op, conflictRsolution, and post-op. 083 */ 084 public class MultimasterReplication 085 extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> 086 implements ConfigurationAddListener<ReplicationDomainCfg>, 087 ConfigurationDeleteListener<ReplicationDomainCfg>, 088 ConfigurationChangeListener 089 <ReplicationSynchronizationProviderCfg>, 090 BackupTaskListener, RestoreTaskListener, ImportTaskListener, 091 ExportTaskListener 092 { 093 private ReplicationServerListener replicationServerListener = null; 094 private static Map<DN, ReplicationDomain> domains = 095 new HashMap<DN, ReplicationDomain>() ; 096 097 /** 098 * The queue of received update messages, to be treated by the ReplayThread 099 * threads. 100 */ 101 private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue = 102 new LinkedBlockingQueue<UpdateToReplay>(); 103 104 /** 105 * The list of ReplayThread threads. 106 */ 107 private static List<ReplayThread> replayThreads = 108 new ArrayList<ReplayThread>(); 109 110 /** 111 * The configurable number of replay threads. 112 */ 113 private static int replayThreadNumber = 10; 114 115 private boolean isRegistered = false; 116 117 /** 118 * Finds the domain for a given DN. 119 * 120 * @param dn The DN for which the domain must be returned. 121 * @param pluginOp An optional operation for which the check is done. 122 * Can be null is the request has no associated operation. 123 * @return The domain for this DN. 124 */ 125 public static ReplicationDomain findDomain(DN dn, PluginOperation pluginOp) 126 { 127 /* 128 * Don't run the special replication code on Operation that are 129 * specifically marked as don't synchronize. 130 */ 131 if ((pluginOp != null) && (pluginOp instanceof Operation)) 132 { 133 Operation op = ((Operation) pluginOp); 134 135 if (op.dontSynchronize()) 136 return null; 137 138 /* 139 * Check if the provided operation is a repair operation and set 140 * the synchronization flags if necessary. 141 * The repair operations are tagged as synchronization operations 142 * so that the core server let the operation modify the entryuuid 143 * and ds-sync-hist attributes. 144 * They are also tagged as dontSynchronize so that the replication 145 * code running later do not generate ChnageNumber, solve conflicts 146 * and forward the operation to the replication server. 147 */ 148 for (Control c : op.getRequestControls()) 149 { 150 if (c.getOID().equals(OID_REPLICATION_REPAIR_CONTROL)) 151 { 152 op.setSynchronizationOperation(true); 153 op.setDontSynchronize(true); 154 // remove this control from the list of controls since 155 // it has now been processed and the local backend will 156 // fail if it finds a control that it does not know about and 157 // that is marked as critical. 158 List<Control> controls = op.getRequestControls(); 159 controls.remove(c); 160 return null; 161 } 162 } 163 } 164 165 166 ReplicationDomain domain = null; 167 DN temp = dn; 168 do 169 { 170 domain = domains.get(temp); 171 temp = temp.getParentDNInSuffix(); 172 if (temp == null) 173 { 174 break; 175 } 176 } while (domain == null); 177 178 return domain; 179 } 180 181 /** 182 * Creates a new domain from its configEntry, do the 183 * necessary initialization and starts it so that it is 184 * fully operational when this method returns. 185 * @param configuration The entry whith the configuration of this domain. 186 * @return The domain created. 187 * @throws ConfigException When the configuration is not valid. 188 */ 189 public static ReplicationDomain createNewDomain( 190 ReplicationDomainCfg configuration) 191 throws ConfigException 192 { 193 ReplicationDomain domain; 194 domain = new ReplicationDomain(configuration, updateToReplayQueue); 195 196 if (domains.size() == 0) 197 { 198 /* 199 * Create the threads that will process incoming update messages 200 */ 201 createReplayThreads(); 202 } 203 204 domains.put(domain.getBaseDN(), domain); 205 return domain; 206 } 207 208 /** 209 * Deletes a domain. 210 * @param dn : the base DN of the domain to delete. 211 */ 212 public static void deleteDomain(DN dn) 213 { 214 ReplicationDomain domain = domains.remove(dn); 215 216 if (domain != null) 217 domain.shutdown(); 218 219 // No replay threads running if no replication need 220 if (domains.size() == 0) { 221 stopReplayThreads(); 222 } 223 } 224 225 /** 226 * {@inheritDoc} 227 */ 228 @Override 229 public void initializeSynchronizationProvider( 230 ReplicationSynchronizationProviderCfg configuration) 231 throws ConfigException 232 { 233 domains.clear(); 234 replicationServerListener = new ReplicationServerListener(configuration); 235 236 // Register as an add and delete listener with the root configuration so we 237 // can be notified if Multimaster domain entries are added or removed. 238 configuration.addReplicationDomainAddListener(this); 239 configuration.addReplicationDomainDeleteListener(this); 240 241 // Register as a root configuration listener so that we can be notified if 242 // number of replay threads is changed and apply changes. 243 configuration.addReplicationChangeListener(this); 244 245 replayThreadNumber = configuration.getNumUpdateReplayThreads(); 246 247 // Create the list of domains that are already defined. 248 for (String name : configuration.listReplicationDomains()) 249 { 250 ReplicationDomainCfg domain = configuration.getReplicationDomain(name); 251 createNewDomain(domain); 252 } 253 254 /* 255 * If any schema changes were made with the server offline, then handle them 256 * now. 257 */ 258 List<Modification> offlineSchemaChanges = 259 DirectoryServer.getOfflineSchemaChanges(); 260 if ((offlineSchemaChanges != null) && (! offlineSchemaChanges.isEmpty())) 261 { 262 processSchemaChange(offlineSchemaChanges); 263 } 264 265 DirectoryServer.registerBackupTaskListener(this); 266 DirectoryServer.registerRestoreTaskListener(this); 267 DirectoryServer.registerExportTaskListener(this); 268 DirectoryServer.registerImportTaskListener(this); 269 270 DirectoryServer.registerSupportedControl( 271 ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL); 272 } 273 274 /** 275 * Create the threads that will wait for incoming update messages. 276 */ 277 private synchronized static void createReplayThreads() 278 { 279 replayThreads.clear(); 280 281 for (int i = 0; i < replayThreadNumber; i++) 282 { 283 ReplayThread replayThread = new ReplayThread(updateToReplayQueue); 284 replayThread.start(); 285 replayThreads.add(replayThread); 286 } 287 } 288 289 /** 290 * Stope the threads that are waiting for incoming update messages. 291 */ 292 private synchronized static void stopReplayThreads() 293 { 294 // stop the replay threads 295 for (ReplayThread replayThread : replayThreads) 296 { 297 replayThread.shutdown(); 298 } 299 300 for (ReplayThread replayThread : replayThreads) 301 { 302 replayThread.waitForShutdown(); 303 } 304 replayThreads.clear(); 305 } 306 307 /** 308 * {@inheritDoc} 309 */ 310 public boolean isConfigurationAddAcceptable( 311 ReplicationDomainCfg configuration, List<Message> unacceptableReasons) 312 { 313 return ReplicationDomain.isConfigurationAcceptable( 314 configuration, unacceptableReasons); 315 } 316 317 /** 318 * {@inheritDoc} 319 */ 320 public ConfigChangeResult applyConfigurationAdd( 321 ReplicationDomainCfg configuration) 322 { 323 try 324 { 325 ReplicationDomain rd = createNewDomain(configuration); 326 if (isRegistered) 327 { 328 rd.start(); 329 } 330 return new ConfigChangeResult(ResultCode.SUCCESS, false); 331 } catch (ConfigException e) 332 { 333 // we should never get to this point because the configEntry has 334 // already been validated in configAddisAcceptable 335 return new ConfigChangeResult(ResultCode.CONSTRAINT_VIOLATION, false); 336 } 337 } 338 339 /** 340 * {@inheritDoc} 341 */ 342 @Override 343 public void doPostOperation(PostOperationAddOperation addOperation) 344 { 345 DN dn = addOperation.getEntryDN(); 346 genericPostOperation(addOperation, dn); 347 } 348 349 350 /** 351 * {@inheritDoc} 352 */ 353 @Override 354 public void doPostOperation(PostOperationDeleteOperation deleteOperation) 355 { 356 DN dn = deleteOperation.getEntryDN(); 357 genericPostOperation(deleteOperation, dn); 358 } 359 360 /** 361 * {@inheritDoc} 362 */ 363 @Override 364 public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation) 365 { 366 DN dn = modifyDNOperation.getEntryDN(); 367 genericPostOperation(modifyDNOperation, dn); 368 } 369 370 /** 371 * {@inheritDoc} 372 */ 373 @Override 374 public void doPostOperation(PostOperationModifyOperation modifyOperation) 375 { 376 DN dn = modifyOperation.getEntryDN(); 377 genericPostOperation(modifyOperation, dn); 378 } 379 380 /** 381 * {@inheritDoc} 382 */ 383 @Override 384 public SynchronizationProviderResult handleConflictResolution( 385 PreOperationModifyOperation modifyOperation) 386 { 387 ReplicationDomain domain = 388 findDomain(modifyOperation.getEntryDN(), modifyOperation); 389 if (domain == null) 390 return new SynchronizationProviderResult.ContinueProcessing(); 391 392 return domain.handleConflictResolution(modifyOperation); 393 } 394 395 /** 396 * {@inheritDoc} 397 */ 398 @Override 399 public SynchronizationProviderResult handleConflictResolution( 400 PreOperationAddOperation addOperation) throws DirectoryException 401 { 402 ReplicationDomain domain = 403 findDomain(addOperation.getEntryDN(), addOperation); 404 if (domain == null) 405 return new SynchronizationProviderResult.ContinueProcessing(); 406 407 return domain.handleConflictResolution(addOperation); 408 } 409 410 /** 411 * {@inheritDoc} 412 */ 413 @Override 414 public SynchronizationProviderResult handleConflictResolution( 415 PreOperationDeleteOperation deleteOperation) throws DirectoryException 416 { 417 ReplicationDomain domain = 418 findDomain(deleteOperation.getEntryDN(), deleteOperation); 419 if (domain == null) 420 return new SynchronizationProviderResult.ContinueProcessing(); 421 422 return domain.handleConflictResolution(deleteOperation); 423 } 424 425 /** 426 * {@inheritDoc} 427 */ 428 @Override 429 public SynchronizationProviderResult handleConflictResolution( 430 PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException 431 { 432 ReplicationDomain domain = 433 findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); 434 if (domain == null) 435 return new SynchronizationProviderResult.ContinueProcessing(); 436 437 return domain.handleConflictResolution(modifyDNOperation); 438 } 439 440 /** 441 * {@inheritDoc} 442 */ 443 @Override 444 public SynchronizationProviderResult 445 doPreOperation(PreOperationModifyOperation modifyOperation) 446 { 447 DN operationDN = modifyOperation.getEntryDN(); 448 ReplicationDomain domain = findDomain(operationDN, modifyOperation); 449 450 if ((domain == null) || (!domain.solveConflict())) 451 return new SynchronizationProviderResult.ContinueProcessing(); 452 453 Historical historicalInformation = (Historical) 454 modifyOperation.getAttachment( 455 Historical.HISTORICAL); 456 if (historicalInformation == null) 457 { 458 Entry entry = modifyOperation.getModifiedEntry(); 459 historicalInformation = Historical.load(entry); 460 modifyOperation.setAttachment(Historical.HISTORICAL, 461 historicalInformation); 462 } 463 464 historicalInformation.generateState(modifyOperation); 465 466 return new SynchronizationProviderResult.ContinueProcessing(); 467 } 468 469 /** 470 * {@inheritDoc} 471 */ 472 @Override 473 public SynchronizationProviderResult doPreOperation( 474 PreOperationDeleteOperation deleteOperation) throws DirectoryException 475 { 476 return new SynchronizationProviderResult.ContinueProcessing(); 477 } 478 479 /** 480 * {@inheritDoc} 481 */ 482 @Override 483 public SynchronizationProviderResult doPreOperation( 484 PreOperationModifyDNOperation modifyDNOperation) 485 throws DirectoryException 486 { 487 return new SynchronizationProviderResult.ContinueProcessing(); 488 } 489 490 /** 491 * {@inheritDoc} 492 */ 493 @Override 494 public SynchronizationProviderResult doPreOperation( 495 PreOperationAddOperation addOperation) 496 { 497 ReplicationDomain domain = 498 findDomain(addOperation.getEntryDN(), addOperation); 499 if (domain == null) 500 return new SynchronizationProviderResult.ContinueProcessing(); 501 502 if (!addOperation.isSynchronizationOperation()) 503 domain.doPreOperation(addOperation); 504 505 return new SynchronizationProviderResult.ContinueProcessing(); 506 } 507 508 509 /** 510 * {@inheritDoc} 511 */ 512 @Override 513 public void finalizeSynchronizationProvider() 514 { 515 isRegistered = false; 516 517 // shutdown all the domains 518 for (ReplicationDomain domain : domains.values()) 519 { 520 domain.shutdown(); 521 } 522 domains.clear(); 523 524 // Stop replay threads 525 stopReplayThreads(); 526 527 // shutdown the ReplicationServer Service if necessary 528 if (replicationServerListener != null) 529 replicationServerListener.shutdown(); 530 531 DirectoryServer.deregisterBackupTaskListener(this); 532 DirectoryServer.deregisterRestoreTaskListener(this); 533 DirectoryServer.deregisterExportTaskListener(this); 534 DirectoryServer.deregisterImportTaskListener(this); 535 } 536 537 /** 538 * This method is called whenever the server detects a modification 539 * of the schema done by directly modifying the backing files 540 * of the schema backend. 541 * Call the schema Domain if it exists. 542 * 543 * @param modifications The list of modifications that was 544 * applied to the schema. 545 * 546 */ 547 @Override 548 public void processSchemaChange(List<Modification> modifications) 549 { 550 ReplicationDomain domain = 551 findDomain(DirectoryServer.getSchemaDN(), null); 552 if (domain != null) 553 domain.synchronizeModifications(modifications); 554 } 555 556 /** 557 * {@inheritDoc} 558 */ 559 public void processBackupBegin(Backend backend, BackupConfig config) 560 { 561 for (DN dn : backend.getBaseDNs()) 562 { 563 ReplicationDomain domain = findDomain(dn, null); 564 if (domain != null) 565 domain.backupStart(); 566 } 567 } 568 569 /** 570 * {@inheritDoc} 571 */ 572 public void processBackupEnd(Backend backend, BackupConfig config, 573 boolean successful) 574 { 575 for (DN dn : backend.getBaseDNs()) 576 { 577 ReplicationDomain domain = findDomain(dn, null); 578 if (domain != null) 579 domain.backupEnd(); 580 } 581 } 582 583 /** 584 * {@inheritDoc} 585 */ 586 public void processRestoreBegin(Backend backend, RestoreConfig config) 587 { 588 for (DN dn : backend.getBaseDNs()) 589 { 590 ReplicationDomain domain = findDomain(dn, null); 591 if (domain != null) 592 domain.disable(); 593 } 594 } 595 596 /** 597 * {@inheritDoc} 598 */ 599 public void processRestoreEnd(Backend backend, RestoreConfig config, 600 boolean successful) 601 { 602 for (DN dn : backend.getBaseDNs()) 603 { 604 ReplicationDomain domain = findDomain(dn, null); 605 if (domain != null) 606 domain.enable(); 607 } 608 } 609 610 /** 611 * {@inheritDoc} 612 */ 613 public void processImportBegin(Backend backend, LDIFImportConfig config) 614 { 615 for (DN dn : backend.getBaseDNs()) 616 { 617 ReplicationDomain domain = findDomain(dn, null); 618 if (domain != null) 619 domain.disable(); 620 } 621 } 622 623 /** 624 * {@inheritDoc} 625 */ 626 public void processImportEnd(Backend backend, LDIFImportConfig config, 627 boolean successful) 628 { 629 for (DN dn : backend.getBaseDNs()) 630 { 631 ReplicationDomain domain = findDomain(dn, null); 632 if (domain != null) 633 domain.enable(); 634 } 635 } 636 637 /** 638 * {@inheritDoc} 639 */ 640 public void processExportBegin(Backend backend, LDIFExportConfig config) 641 { 642 for (DN dn : backend.getBaseDNs()) 643 { 644 ReplicationDomain domain = findDomain(dn, null); 645 if (domain != null) 646 domain.backupStart(); 647 } 648 } 649 650 /** 651 * {@inheritDoc} 652 */ 653 public void processExportEnd(Backend backend, LDIFExportConfig config, 654 boolean successful) 655 { 656 for (DN dn : backend.getBaseDNs()) 657 { 658 ReplicationDomain domain = findDomain(dn, null); 659 if (domain != null) 660 domain.backupEnd(); 661 } 662 } 663 664 /** 665 * {@inheritDoc} 666 */ 667 public ConfigChangeResult applyConfigurationDelete( 668 ReplicationDomainCfg configuration) 669 { 670 deleteDomain(configuration.getBaseDN()); 671 672 return new ConfigChangeResult(ResultCode.SUCCESS, false); 673 } 674 675 /** 676 * {@inheritDoc} 677 */ 678 public boolean isConfigurationDeleteAcceptable( 679 ReplicationDomainCfg configuration, List<Message> unacceptableReasons) 680 { 681 return true; 682 } 683 684 /** 685 * Generic code for all the postOperation entry point. 686 * 687 * @param operation The Operation for which the post-operation is called. 688 * @param dn The Dn for which the post-operation is called. 689 */ 690 private void genericPostOperation(PostOperationOperation operation, DN dn) 691 { 692 ReplicationDomain domain = findDomain(dn, operation); 693 if (domain == null) 694 return; 695 696 domain.synchronize(operation); 697 698 return; 699 } 700 701 /** 702 * Returns the replication server listener associated to that Multimaster 703 * Replication. 704 * @return the listener. 705 */ 706 public ReplicationServerListener getReplicationServerListener() 707 { 708 return replicationServerListener; 709 } 710 711 /** 712 * {@inheritDoc} 713 */ 714 public boolean 715 isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg 716 configuration, 717 List<Message> unacceptableReasons) 718 { 719 return true; 720 } 721 722 /** 723 * {@inheritDoc} 724 */ 725 public ConfigChangeResult 726 applyConfigurationChange 727 (ReplicationSynchronizationProviderCfg configuration) 728 { 729 int numUpdateRepayThread = configuration.getNumUpdateReplayThreads(); 730 731 // Stop threads then restart new number of threads 732 stopReplayThreads(); 733 replayThreadNumber = numUpdateRepayThread; 734 if (domains.size() > 0) 735 { 736 createReplayThreads(); 737 } 738 739 return new ConfigChangeResult(ResultCode.SUCCESS, false); 740 } 741 742 /** 743 * {@inheritDoc} 744 */ 745 public void completeSynchronizationProvider() 746 { 747 isRegistered = true; 748 749 // start all the domains 750 for (ReplicationDomain domain : domains.values()) 751 { 752 domain.start(); 753 } 754 } 755 }