001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2007-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import static org.opends.messages.BackendMessages.*; 029 import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_FINAL_STATUS; 030 import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_PROGRESS_REPORT; 031 import static org.opends.messages.ReplicationMessages.*; 032 import static org.opends.server.loggers.ErrorLogger.logError; 033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 034 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 035 import static org.opends.server.util.StaticUtils.*; 036 037 import java.io.BufferedReader; 038 import java.io.ByteArrayInputStream; 039 import java.io.ByteArrayOutputStream; 040 import java.io.File; 041 import java.io.IOException; 042 import java.io.StringReader; 043 import java.util.ArrayList; 044 import java.util.HashMap; 045 import java.util.HashSet; 046 import java.util.Iterator; 047 import java.util.LinkedHashSet; 048 import java.util.LinkedHashMap; 049 import java.util.LinkedList; 050 import java.util.List; 051 import java.util.Map; 052 import java.util.Timer; 053 import java.util.TimerTask; 054 055 import org.opends.messages.Message; 056 import org.opends.server.admin.Configuration; 057 import org.opends.server.admin.server.ServerManagementContext; 058 import org.opends.server.admin.std.server.BackendCfg; 059 import org.opends.server.admin.std.server.ReplicationServerCfg; 060 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; 061 import org.opends.server.admin.std.server.RootCfg; 062 import org.opends.server.admin.std.server.SynchronizationProviderCfg; 063 import org.opends.server.api.Backend; 064 import org.opends.server.api.SynchronizationProvider; 065 import org.opends.server.backends.jeb.BackupManager; 066 import org.opends.server.config.ConfigException; 067 import org.opends.server.core.AddOperation; 068 import org.opends.server.core.DeleteOperation; 069 import org.opends.server.core.DirectoryServer; 070 import org.opends.server.core.ModifyDNOperation; 071 import org.opends.server.core.ModifyOperation; 072 import org.opends.server.core.SearchOperation; 073 import org.opends.server.loggers.debug.DebugTracer; 074 import org.opends.server.protocols.internal.InternalClientConnection; 075 import org.opends.server.replication.plugin.MultimasterReplication; 076 import org.opends.server.replication.plugin.ReplicationServerListener; 077 import org.opends.server.replication.protocol.AddMsg; 078 import org.opends.server.replication.protocol.DeleteMsg; 079 import org.opends.server.replication.protocol.ModifyDNMsg; 080 import org.opends.server.replication.protocol.ModifyMsg; 081 import org.opends.server.replication.protocol.UpdateMessage; 082 import org.opends.server.types.Attribute; 083 import org.opends.server.types.AttributeType; 084 import org.opends.server.types.AttributeValue; 085 import org.opends.server.types.BackupConfig; 086 import org.opends.server.types.BackupDirectory; 087 import org.opends.server.types.ConditionResult; 088 import org.opends.server.types.Control; 089 import org.opends.server.types.DN; 090 import org.opends.server.types.DebugLogLevel; 091 import org.opends.server.types.DirectoryException; 092 import org.opends.server.types.DereferencePolicy; 093 import org.opends.server.types.Entry; 094 import org.opends.server.types.IndexType; 095 import org.opends.server.types.InitializationException; 096 import org.opends.server.types.LDIFExportConfig; 097 import org.opends.server.types.LDIFImportConfig; 098 import org.opends.server.types.LDIFImportResult; 099 import org.opends.server.types.RawAttribute; 100 import org.opends.server.types.RestoreConfig; 101 import org.opends.server.types.ResultCode; 102 import org.opends.server.types.SearchFilter; 103 import org.opends.server.types.SearchScope; 104 import org.opends.server.types.SearchResultEntry; 105 import org.opends.server.types.ObjectClass; 106 import org.opends.server.util.AddChangeRecordEntry; 107 import org.opends.server.util.DeleteChangeRecordEntry; 108 import org.opends.server.util.LDIFReader; 109 import org.opends.server.util.LDIFWriter; 110 import org.opends.server.util.ModifyChangeRecordEntry; 111 import org.opends.server.util.ModifyDNChangeRecordEntry; 112 import org.opends.server.util.Validator; 113 import static org.opends.server.config.ConfigConstants.ATTR_OBJECTCLASSES_LC; 114 import org.opends.server.protocols.internal.InternalSearchOperation; 115 import static org.opends.server.util.ServerConstants.*; 116 117 118 /** 119 * This class defines a backend that stores its information in an 120 * associated replication server object. 121 * This is primarily intended to take advantage of the backup/restore/ 122 * import/export of the backend API, and to provide an LDAP access 123 * to the replication server database. 124 * <BR><BR> 125 * Entries stored in this backend are held in the DB associated with 126 * the replication server. 127 * <BR><BR> 128 * Currently are only implemented the create and restore backup features. 129 * 130 */ 131 public class ReplicationBackend 132 extends Backend 133 { 134 /** 135 * The tracer object for the debug logger. 136 */ 137 private static final DebugTracer TRACER = getTracer(); 138 139 private static final String BASE_DN = "dc=replicationchanges"; 140 141 // The base DNs for this backend. 142 private DN[] baseDNs; 143 144 // The base DNs for this backend, in a hash set. 145 private HashSet<DN> baseDNSet; 146 147 // The set of supported controls for this backend. 148 private HashSet<String> supportedControls; 149 150 // The set of supported features for this backend. 151 private HashSet<String> supportedFeatures; 152 153 private ReplicationServer server; 154 155 /** 156 * The configuration of this backend. 157 */ 158 private BackendCfg cfg; 159 160 /** 161 * The number of milliseconds between job progress reports. 162 */ 163 private long progressInterval = 10000; 164 165 /** 166 * The current number of entries exported. 167 */ 168 private long exportedCount = 0; 169 170 /** 171 * The current number of entries skipped. 172 */ 173 private long skippedCount = 0; 174 175 //Objectclass for getEntry root entries. 176 private HashMap<ObjectClass,String> rootObjectclasses; 177 178 //Attributes used for getEntry root entries. 179 private LinkedHashMap<AttributeType,List<Attribute>> attributes; 180 181 //Operational attributes used for getEntry root entries. 182 private Map<AttributeType,List<Attribute>> operationalAttributes; 183 184 185 /** 186 * Creates a new backend with the provided information. All backend 187 * implementations must implement a default constructor that use 188 * <CODE>super()</CODE> to invoke this constructor. 189 */ 190 public ReplicationBackend() 191 { 192 super(); 193 // Perform all initialization in initializeBackend. 194 } 195 196 197 /** 198 * Set the base DNs for this backend. This is used by the unit tests 199 * to set the base DNs without having to provide a configuration 200 * object when initializing the backend. 201 * @param baseDNs The set of base DNs to be served by this memory backend. 202 */ 203 public void setBaseDNs(DN[] baseDNs) 204 { 205 this.baseDNs = baseDNs; 206 } 207 208 209 210 /** 211 * {@inheritDoc} 212 */ 213 @Override() 214 public void configureBackend(Configuration config) throws ConfigException 215 { 216 if (config != null) 217 { 218 Validator.ensureTrue(config instanceof BackendCfg); 219 cfg = (BackendCfg)config; 220 DN[] baseDNs = new DN[cfg.getBaseDN().size()]; 221 cfg.getBaseDN().toArray(baseDNs); 222 setBaseDNs(baseDNs); 223 } 224 } 225 226 227 228 /** 229 * {@inheritDoc} 230 */ 231 @Override() 232 public synchronized void initializeBackend() 233 throws ConfigException, InitializationException 234 { 235 if ((baseDNs == null) || (baseDNs.length != 1)) 236 { 237 Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get(); 238 throw new ConfigException(message); 239 } 240 241 baseDNSet = new HashSet<DN>(); 242 for (DN dn : baseDNs) 243 { 244 baseDNSet.add(dn); 245 } 246 247 supportedControls = new HashSet<String>(); 248 supportedFeatures = new HashSet<String>(); 249 250 for (DN dn : baseDNs) 251 { 252 try 253 { 254 DirectoryServer.registerBaseDN(dn, this, true); 255 } 256 catch (Exception e) 257 { 258 if (debugEnabled()) 259 { 260 TRACER.debugCaught(DebugLogLevel.ERROR, e); 261 } 262 263 Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get( 264 dn.toString(), getExceptionMessage(e)); 265 throw new InitializationException(message, e); 266 } 267 } 268 rootObjectclasses = new LinkedHashMap<ObjectClass,String>(3); 269 rootObjectclasses.put(DirectoryServer.getTopObjectClass(), OC_TOP); 270 ObjectClass domainOC = DirectoryServer.getObjectClass("domain", true); 271 rootObjectclasses.put(domainOC, "domain"); 272 ObjectClass objectclassOC = 273 DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true); 274 rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC); 275 attributes = new LinkedHashMap<AttributeType,List<Attribute>>(); 276 AttributeType changeType = 277 DirectoryServer.getAttributeType("changetype", true); 278 LinkedHashSet<AttributeValue> valueSet = 279 new LinkedHashSet<AttributeValue>(1); 280 valueSet.add(new AttributeValue(changeType, "add")); 281 Attribute a = new Attribute(changeType, "changetype", valueSet); 282 ArrayList<Attribute> attrList = new ArrayList<Attribute>(1); 283 attrList.add(a); 284 attributes.put(changeType, attrList); 285 operationalAttributes = new LinkedHashMap<AttributeType,List<Attribute>>(); 286 } 287 288 289 290 /** 291 * {@inheritDoc} 292 */ 293 @Override() 294 public synchronized void finalizeBackend() 295 { 296 for (DN dn : baseDNs) 297 { 298 try 299 { 300 DirectoryServer.deregisterBaseDN(dn); 301 } 302 catch (Exception e) 303 { 304 if (debugEnabled()) 305 { 306 TRACER.debugCaught(DebugLogLevel.ERROR, e); 307 } 308 } 309 } 310 } 311 312 313 314 /** 315 * {@inheritDoc} 316 */ 317 @Override() 318 public DN[] getBaseDNs() 319 { 320 return baseDNs; 321 } 322 323 324 325 /** 326 * {@inheritDoc} 327 */ 328 @Override() 329 public synchronized long getEntryCount() 330 { 331 if (server==null) 332 { 333 try 334 { 335 server = getReplicationServer(); 336 if (server == null) 337 { 338 return 0; 339 } 340 } 341 catch(Exception e) 342 { 343 return 0; 344 } 345 } 346 347 //This method only returns the number of actual change entries, the 348 //domain and any baseDN entries are not counted. 349 long retNum=0; 350 Iterator<ReplicationServerDomain> rcachei = server.getCacheIterator(); 351 if (rcachei != null) 352 { 353 while (rcachei.hasNext()) 354 { 355 ReplicationServerDomain rsd = rcachei.next(); 356 retNum += rsd.getChangesCount(); 357 } 358 } 359 return retNum; 360 361 } 362 363 364 365 /** 366 * {@inheritDoc} 367 */ 368 @Override() 369 public boolean isLocal() 370 { 371 return true; 372 } 373 374 375 376 /** 377 * {@inheritDoc} 378 */ 379 @Override() 380 public boolean isIndexed(AttributeType attributeType, IndexType indexType) 381 { 382 return true; 383 } 384 385 386 387 /** 388 * {@inheritDoc} 389 */ 390 @Override() 391 public synchronized Entry getEntry(DN entryDN) 392 { 393 Entry e = null; 394 try { 395 if(baseDNSet.contains(entryDN)) { 396 return new Entry(entryDN, rootObjectclasses, attributes, 397 operationalAttributes); 398 } else { 399 InternalClientConnection conn = 400 InternalClientConnection.getRootConnection(); 401 SearchFilter filter= 402 SearchFilter.createFilterFromString("(changetype=*)"); 403 InternalSearchOperation searchOperation = 404 new InternalSearchOperation(conn, 405 InternalClientConnection.nextOperationID(), 406 InternalClientConnection.nextMessageID(), null, entryDN, 407 SearchScope.BASE_OBJECT, 408 DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, 409 filter, null, null); 410 search(searchOperation); 411 LinkedList<SearchResultEntry> resultEntries = 412 searchOperation.getSearchEntries(); 413 if(resultEntries.size() != 0) { 414 e=resultEntries.getFirst(); 415 } 416 } 417 } catch (DirectoryException ex) { 418 e=null; 419 } 420 return e; 421 422 } 423 424 425 426 /** 427 * {@inheritDoc} 428 */ 429 @Override() 430 public synchronized boolean entryExists(DN entryDN) 431 { 432 return getEntry(entryDN) != null; 433 } 434 435 436 437 /** 438 * {@inheritDoc} 439 */ 440 @Override() 441 public synchronized void addEntry(Entry entry, AddOperation addOperation) 442 throws DirectoryException 443 { 444 Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get(); 445 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 446 } 447 448 449 450 /** 451 * {@inheritDoc} 452 */ 453 @Override() 454 public synchronized void deleteEntry(DN entryDN, 455 DeleteOperation deleteOperation) 456 throws DirectoryException 457 { 458 Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get(); 459 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 460 } 461 462 463 464 /** 465 * {@inheritDoc} 466 */ 467 @Override() 468 public synchronized void replaceEntry(Entry entry, 469 ModifyOperation modifyOperation) 470 throws DirectoryException 471 { 472 Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get(); 473 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 474 } 475 476 477 478 /** 479 * {@inheritDoc} 480 */ 481 @Override() 482 public synchronized void renameEntry(DN currentDN, Entry entry, 483 ModifyDNOperation modifyDNOperation) 484 throws DirectoryException 485 { 486 Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get(); 487 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 488 } 489 490 491 492 /** 493 * {@inheritDoc} 494 */ 495 @Override() 496 public HashSet<String> getSupportedControls() 497 { 498 return supportedControls; 499 } 500 501 502 503 /** 504 * {@inheritDoc} 505 */ 506 @Override() 507 public HashSet<String> getSupportedFeatures() 508 { 509 return supportedFeatures; 510 } 511 512 513 514 /** 515 * {@inheritDoc} 516 */ 517 @Override() 518 public boolean supportsLDIFExport() 519 { 520 return true; 521 } 522 523 524 525 /** 526 * {@inheritDoc} 527 */ 528 @Override() 529 public synchronized void exportLDIF(LDIFExportConfig exportConfig) 530 throws DirectoryException 531 { 532 List<DN> includeBranches = exportConfig.getIncludeBranches(); 533 DN baseDN; 534 ArrayList<ReplicationServerDomain> exportContainers = 535 new ArrayList<ReplicationServerDomain>(); 536 if(server == null) { 537 Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get(); 538 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message); 539 } 540 Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); 541 if (rsdi != null) 542 { 543 while (rsdi.hasNext()) 544 { 545 ReplicationServerDomain rc = rsdi.next(); 546 547 // Skip containers that are not covered by the include branches. 548 baseDN = DN.decode(rc.getBaseDn().toString() + "," + BASE_DN); 549 550 if (includeBranches == null || includeBranches.isEmpty()) 551 { 552 exportContainers.add(rc); 553 } 554 else 555 { 556 for (DN includeBranch : includeBranches) 557 { 558 if (includeBranch.isDescendantOf(baseDN) || 559 includeBranch.isAncestorOf(baseDN)) 560 { 561 exportContainers.add(rc); 562 } 563 } 564 } 565 } 566 } 567 568 // Make a note of the time we started. 569 long startTime = System.currentTimeMillis(); 570 571 // Start a timer for the progress report. 572 Timer timer = new Timer(); 573 TimerTask progressTask = new ProgressTask(); 574 timer.scheduleAtFixedRate(progressTask, progressInterval, 575 progressInterval); 576 577 // Create the LDIF writer. 578 LDIFWriter ldifWriter; 579 try 580 { 581 ldifWriter = new LDIFWriter(exportConfig); 582 } 583 catch (Exception e) 584 { 585 if (debugEnabled()) 586 { 587 TRACER.debugCaught(DebugLogLevel.ERROR, e); 588 } 589 590 Message message = 591 ERR_BACKEND_CANNOT_CREATE_LDIF_WRITER.get(String.valueOf(e)); 592 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 593 message, e); 594 } 595 596 exportRootChanges(exportContainers, exportConfig, ldifWriter); 597 598 // Iterate through the containers. 599 try 600 { 601 for (ReplicationServerDomain exportContainer : exportContainers) 602 { 603 if (exportConfig.isCancelled()) 604 { 605 break; 606 } 607 processContainer(exportContainer, exportConfig, ldifWriter, null); 608 } 609 } 610 finally 611 { 612 timer.cancel(); 613 614 // Close the LDIF writer 615 try 616 { 617 ldifWriter.close(); 618 } 619 catch (Exception e) 620 { 621 if (debugEnabled()) 622 { 623 TRACER.debugCaught(DebugLogLevel.ERROR, e); 624 } 625 } 626 } 627 628 long finishTime = System.currentTimeMillis(); 629 long totalTime = (finishTime - startTime); 630 631 float rate = 0; 632 if (totalTime > 0) 633 { 634 rate = 1000f*exportedCount / totalTime; 635 } 636 637 Message message = NOTE_JEB_EXPORT_FINAL_STATUS.get( 638 exportedCount, skippedCount, totalTime/1000, rate); 639 logError(message); 640 } 641 642 /* 643 * Exports the root changes of the export, and one entry by domain. 644 */ 645 private void exportRootChanges(List<ReplicationServerDomain> exportContainers, 646 LDIFExportConfig exportConfig, LDIFWriter ldifWriter) 647 { 648 Map<AttributeType,List<Attribute>> attributes = 649 new HashMap<AttributeType,List<Attribute>>(); 650 ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>(); 651 652 AttributeType ocType= 653 DirectoryServer.getAttributeType("objectclass", true); 654 LinkedHashSet<AttributeValue> ocValues = 655 new LinkedHashSet<AttributeValue>(); 656 ocValues.add(new AttributeValue(ocType, "top")); 657 ocValues.add(new AttributeValue(ocType, "domain")); 658 Attribute ocAttr = new Attribute(ocType, "objectclass", ocValues); 659 ldapAttrList.add(ocAttr); 660 attributes.put(ocType, ldapAttrList); 661 662 try 663 { 664 AddChangeRecordEntry changeRecord = 665 new AddChangeRecordEntry(DN.decode(BASE_DN), 666 attributes); 667 ldifWriter.writeChangeRecord(changeRecord); 668 } 669 catch (Exception e) {} 670 671 for (ReplicationServerDomain exportContainer : exportContainers) 672 { 673 if (exportConfig != null && exportConfig.isCancelled()) 674 { 675 break; 676 } 677 678 attributes.clear(); 679 ldapAttrList.clear(); 680 681 ldapAttrList.add(ocAttr); 682 683 AttributeType stateType= 684 DirectoryServer.getAttributeType("state", true); 685 LinkedHashSet<AttributeValue> stateValues = 686 new LinkedHashSet<AttributeValue>(); 687 stateValues.add(new AttributeValue(stateType, 688 exportContainer.getDbServerState().toString())); 689 TRACER.debugInfo("State=" + 690 exportContainer.getDbServerState().toString()); 691 Attribute stateAttr = new Attribute(ocType, "state", stateValues); 692 ldapAttrList.add(stateAttr); 693 694 AttributeType genidType= 695 DirectoryServer.getAttributeType("generation-id", true); 696 LinkedHashSet<AttributeValue> genidValues = 697 new LinkedHashSet<AttributeValue>(); 698 genidValues.add(new AttributeValue(genidType, 699 String.valueOf(exportContainer.getGenerationId())+ 700 exportContainer.getBaseDn())); 701 Attribute genidAttr = new Attribute(ocType, "generation-id", genidValues); 702 ldapAttrList.add(genidAttr); 703 attributes.put(genidType, ldapAttrList); 704 705 try 706 { 707 AddChangeRecordEntry changeRecord = 708 new AddChangeRecordEntry(DN.decode( 709 exportContainer.getBaseDn() + "," + BASE_DN), 710 attributes); 711 ldifWriter.writeChangeRecord(changeRecord); 712 } 713 catch (Exception e) 714 { 715 if (debugEnabled()) 716 { 717 TRACER.debugCaught(DebugLogLevel.ERROR, e); 718 } 719 Message message = ERR_BACKEND_EXPORT_ENTRY.get( 720 exportContainer.getBaseDn() + "," + BASE_DN, 721 String.valueOf(e)); 722 logError(message); 723 } 724 } 725 } 726 727 /** 728 * Processes the changes for a given ReplicationServerDomain. 729 */ 730 private void processContainer(ReplicationServerDomain rsd, 731 LDIFExportConfig exportConfig, LDIFWriter ldifWriter, 732 SearchOperation searchOperation) 733 { 734 // Walk through the servers 735 for (Short serverId : rsd.getServers()) 736 { 737 if (exportConfig != null && exportConfig.isCancelled()) 738 { 739 break; 740 } 741 742 ReplicationIterator ri = rsd.getChangelogIterator(serverId, 743 null); 744 745 if (ri != null) 746 { 747 try 748 { 749 // Walk through the changes 750 while (ri.getChange() != null) 751 { 752 if (exportConfig != null && exportConfig.isCancelled()) 753 { 754 break; 755 } 756 UpdateMessage msg = ri.getChange(); 757 processChange(msg, exportConfig, ldifWriter, searchOperation); 758 if (!ri.next()) 759 break; 760 } 761 } 762 finally 763 { 764 ri.releaseCursor(); 765 } 766 } 767 } 768 } 769 770 /** 771 * Export one change. 772 */ 773 private void processChange(UpdateMessage msg, 774 LDIFExportConfig exportConfig, LDIFWriter ldifWriter, 775 SearchOperation searchOperation) 776 { 777 InternalClientConnection conn = 778 InternalClientConnection.getRootConnection(); 779 Entry entry = null; 780 DN dn = null; 781 782 try 783 { 784 if (msg instanceof AddMsg) 785 { 786 AddMsg addMsg = (AddMsg)msg; 787 AddOperation addOperation = (AddOperation)msg.createOperation(conn); 788 789 dn = DN.decode("puid=" + addMsg.getParentUid() + "," + 790 "changeNumber=" + msg.getChangeNumber().toString() + "," + 791 msg.getDn() +","+ BASE_DN); 792 793 Map<AttributeType,List<Attribute>> attributes = 794 new HashMap<AttributeType,List<Attribute>>(); 795 796 for (RawAttribute a : addOperation.getRawAttributes()) 797 { 798 Attribute attr = a.toAttribute(); 799 AttributeType attrType = attr.getAttributeType(); 800 List<Attribute> attrs = attributes.get(attrType); 801 if (attrs == null) 802 { 803 attrs = new ArrayList<Attribute>(1); 804 attrs.add(attr); 805 attributes.put(attrType, attrs); 806 } 807 else 808 { 809 attrs.add(attr); 810 } 811 } 812 813 AddChangeRecordEntry changeRecord = 814 new AddChangeRecordEntry(dn, attributes); 815 if (exportConfig != null) 816 { 817 ldifWriter.writeChangeRecord(changeRecord); 818 } 819 else 820 { 821 Writer writer = new Writer(); 822 LDIFWriter ldifWriter2 = writer.getLDIFWriter(); 823 ldifWriter2.writeChangeRecord(changeRecord); 824 LDIFReader reader = writer.getLDIFReader(); 825 entry = reader.readEntry(); 826 } 827 } 828 else if (msg instanceof DeleteMsg) 829 { 830 DeleteMsg delMsg = (DeleteMsg)msg; 831 832 dn = DN.decode("uuid=" + msg.getUniqueId() + "," + 833 "changeNumber=" + delMsg.getChangeNumber().toString()+ "," + 834 msg.getDn() +","+ BASE_DN); 835 836 DeleteChangeRecordEntry changeRecord = 837 new DeleteChangeRecordEntry(dn); 838 if (exportConfig != null) 839 { 840 ldifWriter.writeChangeRecord(changeRecord); 841 } 842 else 843 { 844 Writer writer = new Writer(); 845 LDIFWriter ldifWriter2 = writer.getLDIFWriter(); 846 ldifWriter2.writeChangeRecord(changeRecord); 847 LDIFReader reader = writer.getLDIFReader(); 848 entry = reader.readEntry(); 849 } 850 } 851 else if (msg instanceof ModifyMsg) 852 { 853 ModifyOperation op = (ModifyOperation)msg.createOperation(conn); 854 855 dn = DN.decode("uuid=" + msg.getUniqueId() + "," + 856 "changeNumber=" + msg.getChangeNumber().toString()+ "," + 857 msg.getDn() +","+ BASE_DN); 858 op.setInternalOperation(true); 859 860 ModifyChangeRecordEntry changeRecord = 861 new ModifyChangeRecordEntry(dn, op.getRawModifications()); 862 if (exportConfig != null) 863 { 864 ldifWriter.writeChangeRecord(changeRecord); 865 } 866 else 867 { 868 Writer writer = new Writer(); 869 LDIFWriter ldifWriter2 = writer.getLDIFWriter(); 870 ldifWriter2.writeChangeRecord(changeRecord); 871 LDIFReader reader = writer.getLDIFReader(); 872 entry = reader.readEntry(); 873 } 874 } 875 else if (msg instanceof ModifyDNMsg) 876 { 877 ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn); 878 879 dn = DN.decode("uuid=" + msg.getUniqueId() + "," + 880 "changeNumber=" + msg.getChangeNumber().toString()+ "," + 881 msg.getDn() +","+ BASE_DN); 882 op.setInternalOperation(true); 883 884 ModifyDNChangeRecordEntry changeRecord = 885 new ModifyDNChangeRecordEntry(dn, op.getNewRDN(), op.deleteOldRDN(), 886 op.getNewSuperior()); 887 888 if (exportConfig != null) 889 { 890 ldifWriter.writeChangeRecord(changeRecord); 891 } 892 else 893 { 894 Writer writer = new Writer(); 895 LDIFWriter ldifWriter2 = writer.getLDIFWriter(); 896 ldifWriter2.writeChangeRecord(changeRecord); 897 LDIFReader reader = writer.getLDIFReader(); 898 Entry modDNEntry = reader.readEntry(); 899 entry = modDNEntry; 900 } 901 } 902 903 if (exportConfig != null) 904 { 905 this.exportedCount++; 906 } 907 else 908 { 909 // Get the base DN, scope, and filter for the search. 910 DN searchBaseDN = searchOperation.getBaseDN(); 911 SearchScope scope = searchOperation.getScope(); 912 SearchFilter filter = searchOperation.getFilter(); 913 914 boolean ms = entry.matchesBaseAndScope(searchBaseDN, scope); 915 boolean mf = filter.matchesEntry(entry); 916 if ( ms && mf ) 917 { 918 searchOperation.returnEntry(entry, new LinkedList<Control>()); 919 } 920 } 921 } 922 catch (Exception e) 923 { 924 this.skippedCount++; 925 if (debugEnabled()) 926 { 927 TRACER.debugCaught(DebugLogLevel.ERROR, e); 928 } 929 Message message = null; 930 if (exportConfig != null) 931 { 932 message = ERR_BACKEND_EXPORT_ENTRY.get( 933 dn.toNormalizedString(), String.valueOf(e)); 934 } 935 else 936 { 937 message = ERR_BACKEND_SEARCH_ENTRY.get( 938 dn.toNormalizedString(), e.getLocalizedMessage()); 939 } 940 logError(message); 941 } 942 } 943 944 945 946 /** 947 * {@inheritDoc} 948 */ 949 @Override() 950 public boolean supportsLDIFImport() 951 { 952 return false; 953 } 954 955 956 957 /** 958 * {@inheritDoc} 959 */ 960 @Override() 961 public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig) 962 throws DirectoryException 963 { 964 Message message = ERR_REPLICATONBACKEND_IMPORT_LDIF_NOT_SUPPORTED.get(); 965 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 966 } 967 968 969 970 /** 971 * {@inheritDoc} 972 */ 973 @Override() 974 public boolean supportsBackup() 975 { 976 // This backend does not provide a backup/restore mechanism. 977 return true; 978 } 979 980 981 982 /** 983 * {@inheritDoc} 984 */ 985 @Override() 986 public boolean supportsBackup(BackupConfig backupConfig, 987 StringBuilder unsupportedReason) 988 { 989 return true; 990 } 991 992 993 994 /** 995 * {@inheritDoc} 996 */ 997 @Override() 998 public void createBackup(BackupConfig backupConfig) 999 throws DirectoryException 1000 { 1001 BackupManager backupManager = new BackupManager(getBackendID()); 1002 File backendDir = getFileForPath(getReplicationServerCfg() 1003 .getReplicationDBDirectory()); 1004 backupManager.createBackup(backendDir, backupConfig); 1005 } 1006 1007 1008 1009 /** 1010 * {@inheritDoc} 1011 */ 1012 @Override() 1013 public void removeBackup(BackupDirectory backupDirectory, 1014 String backupID) 1015 throws DirectoryException 1016 { 1017 BackupManager backupManager = 1018 new BackupManager(getBackendID()); 1019 backupManager.removeBackup(backupDirectory, backupID); 1020 } 1021 1022 1023 1024 /** 1025 * {@inheritDoc} 1026 */ 1027 @Override() 1028 public boolean supportsRestore() 1029 { 1030 return true; 1031 } 1032 1033 1034 1035 /** 1036 * {@inheritDoc} 1037 */ 1038 @Override() 1039 public void restoreBackup(RestoreConfig restoreConfig) 1040 throws DirectoryException 1041 { 1042 BackupManager backupManager = 1043 new BackupManager(getBackendID()); 1044 File backendDir = getFileForPath(getReplicationServerCfg() 1045 .getReplicationDBDirectory()); 1046 backupManager.restoreBackup(backendDir, restoreConfig); 1047 } 1048 1049 1050 1051 /** 1052 * {@inheritDoc} 1053 */ 1054 @Override() 1055 public long numSubordinates(DN entryDN, boolean subtree) 1056 throws DirectoryException 1057 { 1058 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 1059 ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 1060 } 1061 1062 1063 1064 /** 1065 * {@inheritDoc} 1066 */ 1067 @Override() 1068 public ConditionResult hasSubordinates(DN entryDN) 1069 throws DirectoryException 1070 { 1071 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 1072 ERR_HAS_SUBORDINATES_NOT_SUPPORTED.get()); 1073 } 1074 1075 /** 1076 * Set the replication server associated with this backend. 1077 * @param server The replication server. 1078 */ 1079 public void setServer(ReplicationServer server) 1080 { 1081 this.server = server; 1082 } 1083 1084 /** 1085 * This class reports progress of the export job at fixed intervals. 1086 */ 1087 private final class ProgressTask extends TimerTask 1088 { 1089 /** 1090 * The number of entries that had been exported at the time of the 1091 * previous progress report. 1092 */ 1093 private long previousCount = 0; 1094 1095 /** 1096 * The time in milliseconds of the previous progress report. 1097 */ 1098 private long previousTime; 1099 1100 /** 1101 * Create a new export progress task. 1102 */ 1103 public ProgressTask() 1104 { 1105 previousTime = System.currentTimeMillis(); 1106 } 1107 1108 /** 1109 * The action to be performed by this timer task. 1110 */ 1111 public void run() 1112 { 1113 long latestCount = exportedCount; 1114 long deltaCount = (latestCount - previousCount); 1115 long latestTime = System.currentTimeMillis(); 1116 long deltaTime = latestTime - previousTime; 1117 1118 if (deltaTime == 0) 1119 { 1120 return; 1121 } 1122 1123 float rate = 1000f*deltaCount / deltaTime; 1124 1125 Message message = 1126 NOTE_JEB_EXPORT_PROGRESS_REPORT.get(latestCount, skippedCount, rate); 1127 logError(message); 1128 1129 previousCount = latestCount; 1130 previousTime = latestTime; 1131 } 1132 }; 1133 1134 1135 1136 /** 1137 * {@inheritDoc} 1138 */ 1139 @Override() 1140 public synchronized void search(SearchOperation searchOperation) 1141 throws DirectoryException 1142 { 1143 // Get the base DN, scope, and filter for the search. 1144 DN searchBaseDN = searchOperation.getBaseDN(); 1145 DN baseDN; 1146 ArrayList<ReplicationServerDomain> searchContainers = 1147 new ArrayList<ReplicationServerDomain>(); 1148 1149 //This check is for GroupManager initialization. It currently doesn't 1150 //come into play because the replication server variable is null in 1151 //the check above. But if the order of initialization of the server variable 1152 //is ever changed, the following check will keep replication change entries 1153 //from being added to the groupmanager cache erroneously. 1154 List<Control> requestControls = searchOperation.getRequestControls(); 1155 if (requestControls != null) 1156 { 1157 for (Control c : requestControls) 1158 { 1159 if (c.getOID().equals(OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE)) 1160 { 1161 return; 1162 } 1163 } 1164 } 1165 // Make sure the base entry exists if it's supposed to be in this backend. 1166 if (!handlesEntry(searchBaseDN)) 1167 { 1168 DN matchedDN = searchBaseDN.getParentDNInSuffix(); 1169 while (matchedDN != null) 1170 { 1171 if (handlesEntry(matchedDN)) 1172 { 1173 break; 1174 } 1175 matchedDN = matchedDN.getParentDNInSuffix(); 1176 } 1177 1178 Message message = ERR_REPLICATIONBACKEND_ENTRY_DOESNT_EXIST. 1179 get(String.valueOf(searchBaseDN)); 1180 throw new DirectoryException( 1181 ResultCode.NO_SUCH_OBJECT, message, matchedDN, null); 1182 } 1183 1184 if (server==null) 1185 { 1186 server = getReplicationServer(); 1187 1188 if (server == null) 1189 { 1190 if (baseDNSet.contains(searchBaseDN)) 1191 { 1192 // Get the base DN, scope, and filter for the search. 1193 SearchScope scope = searchOperation.getScope(); 1194 SearchFilter filter = searchOperation.getFilter(); 1195 Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes, 1196 operationalAttributes); 1197 1198 if (re.matchesBaseAndScope(searchBaseDN, scope) && 1199 filter.matchesEntry(re)) 1200 { 1201 searchOperation.returnEntry(re, new LinkedList<Control>()); 1202 } 1203 return; 1204 } 1205 else 1206 { 1207 Message message = ERR_REPLICATIONBACKEND_ENTRY_DOESNT_EXIST. 1208 get(String.valueOf(searchBaseDN)); 1209 throw new DirectoryException( 1210 ResultCode.NO_SUCH_OBJECT, message, null, null); 1211 } 1212 } 1213 } 1214 1215 // Get the base DN, scope, and filter for the search. 1216 SearchScope scope = searchOperation.getScope(); 1217 SearchFilter filter = searchOperation.getFilter(); 1218 Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes, 1219 operationalAttributes); 1220 1221 if (re.matchesBaseAndScope(searchBaseDN, scope) && 1222 filter.matchesEntry(re)) 1223 { 1224 searchOperation.returnEntry(re, new LinkedList<Control>()); 1225 } 1226 1227 // Walk through all entries and send the ones that match. 1228 Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); 1229 if (rsdi != null) 1230 { 1231 while (rsdi.hasNext()) 1232 { 1233 ReplicationServerDomain rsd = rsdi.next(); 1234 1235 // Skip containers that are not covered by the include branches. 1236 baseDN = DN.decode(rsd.getBaseDn().toString() + "," + BASE_DN); 1237 1238 if (searchBaseDN.isDescendantOf(baseDN) || 1239 searchBaseDN.isAncestorOf(baseDN)) 1240 { 1241 searchContainers.add(rsd); 1242 } 1243 } 1244 } 1245 1246 for (ReplicationServerDomain exportContainer : searchContainers) 1247 { 1248 processContainer(exportContainer, null, null, searchOperation); 1249 } 1250 } 1251 1252 1253 /** 1254 * Retrieves the replication server associated to this backend. 1255 * 1256 * @return The server retrieved 1257 * @throws DirectoryException When it occurs. 1258 */ 1259 private ReplicationServer getReplicationServer() throws DirectoryException 1260 { 1261 ReplicationServer replicationServer = null; 1262 1263 DirectoryServer.getSynchronizationProviders(); 1264 for (SynchronizationProvider<?> provider : 1265 DirectoryServer.getSynchronizationProviders()) 1266 { 1267 if (provider instanceof MultimasterReplication) 1268 { 1269 MultimasterReplication mmp = (MultimasterReplication)provider; 1270 ReplicationServerListener list = mmp.getReplicationServerListener(); 1271 if (list != null) 1272 { 1273 replicationServer = list.getReplicationServer(); 1274 break; 1275 } 1276 } 1277 } 1278 return replicationServer; 1279 } 1280 1281 // Find the replication server configuration associated with this 1282 // replication backend. 1283 private ReplicationServerCfg getReplicationServerCfg() 1284 throws DirectoryException { 1285 RootCfg root = ServerManagementContext.getInstance().getRootConfiguration(); 1286 1287 for (String name : root.listSynchronizationProviders()) { 1288 SynchronizationProviderCfg cfg; 1289 try { 1290 cfg = root.getSynchronizationProvider(name); 1291 } catch (ConfigException e) { 1292 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1293 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get(), e); 1294 } 1295 if (cfg instanceof ReplicationSynchronizationProviderCfg) { 1296 ReplicationSynchronizationProviderCfg scfg = 1297 (ReplicationSynchronizationProviderCfg) cfg; 1298 try { 1299 return scfg.getReplicationServer(); 1300 } catch (ConfigException e) { 1301 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1302 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get(), e); 1303 } 1304 } 1305 } 1306 1307 // No replication server found. 1308 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1309 ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get()); 1310 } 1311 1312 /** 1313 * Writer class to read/write from/to a bytearray. 1314 */ 1315 private static final class Writer 1316 { 1317 // The underlying output stream. 1318 private final ByteArrayOutputStream stream; 1319 1320 // The underlying LDIF config. 1321 private final LDIFExportConfig config; 1322 1323 // The LDIF writer. 1324 private final LDIFWriter writer; 1325 1326 /** 1327 * Create a new string writer. 1328 */ 1329 public Writer() { 1330 this.stream = new ByteArrayOutputStream(); 1331 this.config = new LDIFExportConfig(stream); 1332 try { 1333 this.writer = new LDIFWriter(config); 1334 } catch (IOException e) { 1335 // Should not happen. 1336 throw new RuntimeException(e); 1337 } 1338 } 1339 1340 /** 1341 * Get the LDIF writer. 1342 * 1343 * @return Returns the LDIF writer. 1344 */ 1345 public LDIFWriter getLDIFWriter() { 1346 return writer; 1347 } 1348 1349 /** 1350 * Close the writer and get a string reader for the LDIF content. 1351 * 1352 * @return Returns the string contents of the writer. 1353 * @throws Exception 1354 * If an error occurred closing the writer. 1355 */ 1356 public BufferedReader getLDIFBufferedReader() throws Exception { 1357 writer.close(); 1358 String ldif = stream.toString("UTF-8"); 1359 StringReader reader = new StringReader(ldif); 1360 return new BufferedReader(reader); 1361 } 1362 1363 /** 1364 * Close the writer and get an LDIF reader for the LDIF content. 1365 * 1366 * @return Returns an LDIF Reader. 1367 * @throws Exception 1368 * If an error occurred closing the writer. 1369 */ 1370 public LDIFReader getLDIFReader() throws Exception { 1371 writer.close(); 1372 ByteArrayInputStream istream = new 1373 ByteArrayInputStream(stream.toByteArray()); 1374 String ldif = stream.toString("UTF-8"); 1375 ldif = ldif.replace("\n-\n", "\n"); 1376 istream = new ByteArrayInputStream(ldif.getBytes()); 1377 LDIFImportConfig config = new LDIFImportConfig(istream); 1378 return new LDIFReader(config); 1379 } 1380 } 1381 1382 1383 1384 /** 1385 * {@inheritDoc} 1386 */ 1387 public void preloadEntryCache() throws UnsupportedOperationException { 1388 throw new UnsupportedOperationException("Operation not supported."); 1389 } 1390 } 1391