001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import static org.opends.messages.ReplicationMessages.*; 029 import static org.opends.server.loggers.ErrorLogger.logError; 030 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 031 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 032 import static org.opends.server.util.ServerConstants.EOL; 033 import static org.opends.server.util.StaticUtils.getFileForPath; 034 035 import java.io.File; 036 import java.io.IOException; 037 import java.io.StringReader; 038 import java.net.InetAddress; 039 import java.net.InetSocketAddress; 040 import java.net.ServerSocket; 041 import java.net.Socket; 042 import java.net.UnknownHostException; 043 import java.util.ArrayList; 044 import java.util.Collection; 045 import java.util.concurrent.ConcurrentHashMap; 046 import java.util.Iterator; 047 import java.util.LinkedHashSet; 048 import java.util.List; 049 import java.util.Set; 050 051 import org.opends.messages.Message; 052 import org.opends.messages.MessageBuilder; 053 import org.opends.server.admin.server.ConfigurationChangeListener; 054 import org.opends.server.admin.std.server.MonitorProviderCfg; 055 import org.opends.server.admin.std.server.ReplicationServerCfg; 056 import org.opends.server.api.Backend; 057 import org.opends.server.api.BackupTaskListener; 058 import org.opends.server.api.ExportTaskListener; 059 import org.opends.server.api.ImportTaskListener; 060 import org.opends.server.api.MonitorProvider; 061 import org.opends.server.api.RestoreTaskListener; 062 import org.opends.server.config.ConfigException; 063 import org.opends.server.core.DirectoryServer; 064 import org.opends.server.loggers.LogLevel; 065 import org.opends.server.loggers.debug.DebugTracer; 066 import org.opends.server.replication.protocol.ProtocolSession; 067 import org.opends.server.replication.protocol.ReplSessionSecurity; 068 import org.opends.server.types.Attribute; 069 import org.opends.server.types.AttributeType; 070 import org.opends.server.types.AttributeValue; 071 import org.opends.server.types.BackupConfig; 072 import org.opends.server.types.ConfigChangeResult; 073 import org.opends.server.types.DN; 074 import org.opends.server.types.Entry; 075 import org.opends.server.types.LDIFExportConfig; 076 import org.opends.server.types.LDIFImportConfig; 077 import org.opends.server.types.RestoreConfig; 078 import org.opends.server.types.ResultCode; 079 import org.opends.server.util.LDIFReader; 080 081 import com.sleepycat.je.DatabaseException; 082 083 /** 084 * ReplicationServer Listener. 085 * 086 * This singleton is the main object of the replication server 087 * It waits for the incoming connections and create listener 088 * and publisher objects for 089 * connection with LDAP servers and with replication servers 090 * 091 * It is responsible for creating the replication server replicationServerDomain 092 * and managing it 093 */ 094 public class ReplicationServer extends MonitorProvider<MonitorProviderCfg> 095 implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>, 096 BackupTaskListener, RestoreTaskListener, ImportTaskListener, 097 ExportTaskListener 098 { 099 private short serverId; 100 private String serverURL; 101 102 private ServerSocket listenSocket; 103 private Thread listenThread; 104 private Thread connectThread; 105 106 /* The list of replication servers configured by the administrator */ 107 private Collection<String> replicationServers; 108 109 /* This table is used to store the list of dn for which we are currently 110 * handling servers. 111 */ 112 private ConcurrentHashMap<DN, ReplicationServerDomain> baseDNs = 113 new ConcurrentHashMap<DN, ReplicationServerDomain>(); 114 115 private String localURL = "null"; 116 private boolean shutdown = false; 117 private short replicationServerId; 118 private ReplicationDbEnv dbEnv; 119 private int rcvWindow; 120 private int queueSize; 121 private String dbDirname = null; 122 123 // The delay (in sec) after which the changes must 124 // be deleted from the persistent storage. 125 private long purgeDelay; 126 127 private int replicationPort; 128 private boolean stopListen = false; 129 private ReplSessionSecurity replSessionSecurity; 130 131 // For the backend associated to this replication server, 132 // DN of the config entry of the backend 133 private DN backendConfigEntryDN; 134 // ID of the backend 135 private static final String backendId = "replicationChanges"; 136 137 // At startup, the listen thread wait on this flag for the connet 138 // thread to look for other servers in the topology. 139 private boolean connectedInTopology = false; 140 private final Object connectedInTopologyLock = new Object(); 141 142 /** 143 * The tracer object for the debug logger. 144 */ 145 private static final DebugTracer TRACER = getTracer(); 146 147 /** 148 * Creates a new Replication server using the provided configuration entry. 149 * 150 * @param configuration The configuration of this replication server. 151 * @throws ConfigException When Configuration is invalid. 152 */ 153 public ReplicationServer(ReplicationServerCfg configuration) 154 throws ConfigException 155 { 156 super("Replication Server" + configuration.getReplicationPort()); 157 158 replicationPort = configuration.getReplicationPort(); 159 replicationServerId = (short) configuration.getReplicationServerId(); 160 replicationServers = configuration.getReplicationServer(); 161 if (replicationServers == null) 162 replicationServers = new ArrayList<String>(); 163 queueSize = configuration.getQueueSize(); 164 purgeDelay = configuration.getReplicationPurgeDelay(); 165 dbDirname = configuration.getReplicationDBDirectory(); 166 rcvWindow = configuration.getWindowSize(); 167 if (dbDirname == null) 168 { 169 dbDirname = "changelogDb"; 170 } 171 // Check that this path exists or create it. 172 File f = getFileForPath(dbDirname); 173 try 174 { 175 if (!f.exists()) 176 { 177 f.mkdir(); 178 } 179 } 180 catch (Exception e) 181 { 182 183 MessageBuilder mb = new MessageBuilder(); 184 mb.append(e.getLocalizedMessage()); 185 mb.append(" "); 186 mb.append(String.valueOf(getFileForPath(dbDirname))); 187 Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()); 188 throw new ConfigException(msg, e); 189 } 190 191 replSessionSecurity = new ReplSessionSecurity(configuration); 192 initialize(replicationServerId, replicationPort); 193 configuration.addChangeListener(this); 194 DirectoryServer.registerMonitorProvider(this); 195 196 try 197 { 198 backendConfigEntryDN = DN.decode( 199 "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config"); 200 } catch (Exception e) {} 201 202 // Creates the backend associated to this ReplicationServer 203 // if it does not exist. 204 createBackend(); 205 206 DirectoryServer.registerBackupTaskListener(this); 207 DirectoryServer.registerRestoreTaskListener(this); 208 DirectoryServer.registerExportTaskListener(this); 209 DirectoryServer.registerImportTaskListener(this); 210 } 211 212 213 /** 214 * The run method for the Listen thread. 215 * This thread accept incoming connections on the replication server 216 * ports from other replication servers or from LDAP servers 217 * and spawn further thread responsible for handling those connections 218 */ 219 220 void runListen() 221 { 222 Socket newSocket; 223 224 // wait for the connect thread to find other replication 225 // servers in the topology before starting to accept connections 226 // from the ldap servers. 227 synchronized (connectedInTopologyLock) 228 { 229 if (connectedInTopology == false) 230 { 231 try 232 { 233 connectedInTopologyLock.wait(1000); 234 } catch (InterruptedException e) 235 { 236 } 237 } 238 } 239 240 while ((shutdown == false) && (stopListen == false)) 241 { 242 // Wait on the replicationServer port. 243 // Read incoming messages and create LDAP or ReplicationServer listener 244 // and Publisher. 245 246 try 247 { 248 newSocket = listenSocket.accept(); 249 newSocket.setReceiveBufferSize(1000000); 250 newSocket.setTcpNoDelay(true); 251 newSocket.setKeepAlive(true); 252 ProtocolSession session = 253 replSessionSecurity.createServerSession(newSocket); 254 if (session == null) // Error, go back to accept 255 continue; 256 ServerHandler handler = new ServerHandler(session, queueSize); 257 handler.start(null, serverId, serverURL, rcvWindow, 258 false, this); 259 } 260 catch (Exception e) 261 { 262 // The socket has probably been closed as part of the 263 // shutdown or changing the port number process. 264 // just log debug information and loop. 265 Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()); 266 logError(message); 267 } 268 } 269 } 270 271 /** 272 * This method manages the connection with the other replication servers. 273 * It periodically checks that this replication server is indeed connected 274 * to all the other replication servers and if not attempts to 275 * make the connection. 276 */ 277 void runConnect() 278 { 279 while (shutdown == false) 280 { 281 /* 282 * periodically check that we are connected to all other 283 * replication servers and if not establish the connection 284 */ 285 for (ReplicationServerDomain replicationServerDomain: baseDNs.values()) 286 { 287 Set<String> connectedReplServers = 288 replicationServerDomain.getChangelogs(); 289 /* 290 * check that all replication server in the config are in the connected 291 * Set. If not create the connection 292 */ 293 for (String serverURL : replicationServers) 294 { 295 int separator = serverURL.lastIndexOf(':'); 296 String port = serverURL.substring(separator + 1); 297 String hostname = serverURL.substring(0, separator); 298 299 try 300 { 301 InetAddress inetAddress = InetAddress.getByName(hostname); 302 String serverAddress = inetAddress.getHostAddress() + ":" + port; 303 304 if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0) 305 && (serverAddress.compareTo(this.localURL) != 0) 306 && (!connectedReplServers.contains(serverAddress))) 307 { 308 this.connect(serverURL, replicationServerDomain.getBaseDn()); 309 } 310 } 311 catch (IOException e) 312 { 313 Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname); 314 logError(message); 315 } 316 } 317 } 318 synchronized (connectedInTopologyLock) 319 { 320 // wake up the listen thread if necessary. 321 if (connectedInTopology == false) 322 { 323 connectedInTopologyLock.notify(); 324 connectedInTopology = true; 325 } 326 } 327 try 328 { 329 synchronized (this) 330 { 331 /* check if we are connected every second */ 332 int randomizer = (int) Math.random()*100; 333 wait(1000 + randomizer); 334 } 335 } catch (InterruptedException e) 336 { 337 // ignore error, will try to connect again or shutdown 338 } 339 } 340 } 341 342 /** 343 * Establish a connection to the server with the address and port. 344 * 345 * @param serverURL The address and port for the server, separated by a 346 * colon. 347 * @param baseDn The baseDn of the connection 348 */ 349 private void connect(String serverURL, DN baseDn) 350 { 351 int separator = serverURL.lastIndexOf(':'); 352 String port = serverURL.substring(separator + 1); 353 String hostname = serverURL.substring(0, separator); 354 boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL); 355 356 if (debugEnabled()) 357 TRACER.debugInfo("RS " + this.getMonitorInstanceName() + 358 " connects to " + serverURL); 359 360 try 361 { 362 InetSocketAddress ServerAddr = new InetSocketAddress( 363 InetAddress.getByName(hostname), Integer.parseInt(port)); 364 Socket socket = new Socket(); 365 socket.setReceiveBufferSize(1000000); 366 socket.setTcpNoDelay(true); 367 socket.connect(ServerAddr, 500); 368 369 ServerHandler handler = new ServerHandler( 370 replSessionSecurity.createClientSession(serverURL, socket), 371 queueSize); 372 handler.start(baseDn, serverId, this.serverURL, rcvWindow, 373 sslEncryption, this); 374 } 375 catch (Exception e) 376 { 377 // ignore 378 } 379 380 } 381 382 /** 383 * initialization function for the replicationServer. 384 * 385 * @param changelogId The unique identifier for this replicationServer. 386 * @param changelogPort The port on which the replicationServer should 387 * listen. 388 * 389 */ 390 private void initialize(short changelogId, int changelogPort) 391 { 392 shutdown = false; 393 394 try 395 { 396 /* 397 * Initialize the replicationServer database. 398 */ 399 dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(), 400 this); 401 402 /* 403 * create replicationServer replicationServerDomain 404 */ 405 serverId = changelogId; 406 407 /* 408 * Open replicationServer socket 409 */ 410 String localhostname = InetAddress.getLocalHost().getHostName(); 411 String localAdddress = InetAddress.getLocalHost().getHostAddress(); 412 serverURL = localhostname + ":" + String.valueOf(changelogPort); 413 localURL = localAdddress + ":" + String.valueOf(changelogPort); 414 listenSocket = new ServerSocket(); 415 listenSocket.setReceiveBufferSize(1000000); 416 listenSocket.bind(new InetSocketAddress(changelogPort)); 417 418 /* 419 * creates working threads 420 * We must first connect, then start to listen. 421 */ 422 if (debugEnabled()) 423 TRACER.debugInfo("RS " +getMonitorInstanceName()+ 424 " creates connect threads"); 425 connectThread = 426 new ReplicationServerConnectThread("Replication Server Connect", this); 427 connectThread.start(); 428 429 // FIXME : Is it better to have the time to receive the ReplServerInfo 430 // from all the other replication servers since this info is necessary 431 // to route an early received total update request. 432 try { Thread.sleep(300);} catch(Exception e) {} 433 if (debugEnabled()) 434 TRACER.debugInfo("RS " +getMonitorInstanceName()+ 435 " creates listen threads"); 436 437 listenThread = 438 new ReplicationServerListenThread("Replication Server Listener", this); 439 listenThread.start(); 440 441 if (debugEnabled()) 442 TRACER.debugInfo("RS " +getMonitorInstanceName()+ 443 " successfully initialized"); 444 445 } catch (DatabaseException e) 446 { 447 Message message = ERR_COULD_NOT_INITIALIZE_DB.get( 448 getFileForPath(dbDirname).getAbsolutePath()); 449 logError(message); 450 } catch (ReplicationDBException e) 451 { 452 Message message = ERR_COULD_NOT_READ_DB.get(dbDirname, 453 e.getLocalizedMessage()); 454 logError(message); 455 } catch (UnknownHostException e) 456 { 457 Message message = ERR_UNKNOWN_HOSTNAME.get(); 458 logError(message); 459 } catch (IOException e) 460 { 461 Message message = 462 ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage()); 463 logError(message); 464 } 465 } 466 467 /** 468 * Get the ReplicationServerDomain associated to the base DN given in 469 * parameter. 470 * 471 * @param baseDn The base Dn for which the ReplicationServerDomain must be 472 * returned. 473 * @param create Specifies whether to create the ReplicationServerDomain if 474 * it does not already exist. 475 * @return The ReplicationServerDomain associated to the base DN given in 476 * parameter. 477 */ 478 public ReplicationServerDomain getReplicationServerDomain(DN baseDn, 479 boolean create) 480 { 481 ReplicationServerDomain replicationServerDomain; 482 483 synchronized (baseDNs) 484 { 485 replicationServerDomain = baseDNs.get(baseDn); 486 if ((replicationServerDomain == null) && (create)) 487 { 488 replicationServerDomain = new ReplicationServerDomain(baseDn, this); 489 baseDNs.put(baseDn, replicationServerDomain); 490 } 491 } 492 493 return replicationServerDomain; 494 } 495 496 /** 497 * Shutdown the Replication Server service and all its connections. 498 */ 499 public void shutdown() 500 { 501 if (shutdown) 502 return; 503 504 shutdown = true; 505 506 // shutdown the connect thread 507 if (connectThread != null) 508 { 509 connectThread.interrupt(); 510 } 511 512 // shutdown the listener thread 513 try 514 { 515 if (listenSocket != null) 516 { 517 listenSocket.close(); 518 } 519 } catch (IOException e) 520 { 521 // replication Server service is closing anyway. 522 } 523 524 // shutdown the listen thread 525 if (listenThread != null) 526 { 527 listenThread.interrupt(); 528 } 529 530 // shutdown all the ChangelogCaches 531 for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) 532 { 533 replicationServerDomain.shutdown(); 534 } 535 536 if (dbEnv != null) 537 { 538 dbEnv.shutdown(); 539 } 540 DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); 541 } 542 543 544 /** 545 * Creates a new DB handler for this ReplicationServer and the serverId and 546 * DN given in parameter. 547 * 548 * @param id The serverId for which the dbHandler must be created. 549 * @param baseDn The DN for which the dbHandler muste be created. 550 * @return The new DB handler for this ReplicationServer and the serverId and 551 * DN given in parameter. 552 * @throws DatabaseException in case of underlying database problem. 553 */ 554 public DbHandler newDbHandler(short id, DN baseDn) 555 throws DatabaseException 556 { 557 return new DbHandler(id, baseDn, this, dbEnv); 558 } 559 560 /** 561 * Clears the generationId for the replicationServerDomain related to the 562 * provided baseDn. 563 * @param baseDn The baseDn for which to delete the generationId. 564 * @throws DatabaseException When it occurs. 565 */ 566 public void clearGenerationId(DN baseDn) 567 throws DatabaseException 568 { 569 try 570 { 571 dbEnv.clearGenerationId(baseDn); 572 } 573 catch(Exception e) 574 { 575 TRACER.debugCaught(LogLevel.ALL, e); 576 } 577 } 578 579 /** 580 * Retrieves the time after which changes must be deleted from the 581 * persistent storage (in milliseconds). 582 * 583 * @return The time after which changes must be deleted from the 584 * persistent storage (in milliseconds). 585 */ 586 long getTrimage() 587 { 588 return purgeDelay * 1000; 589 } 590 591 /** 592 * Check if the provided configuration is acceptable for add. 593 * 594 * @param configuration The configuration to check. 595 * @param unacceptableReasons When the configuration is not acceptable, this 596 * table is use to return the reasons why this 597 * configuration is not acceptbale. 598 * 599 * @return true if the configuration is acceptable, false other wise. 600 */ 601 public static boolean isConfigurationAcceptable( 602 ReplicationServerCfg configuration, List<Message> unacceptableReasons) 603 { 604 int port = configuration.getReplicationPort(); 605 606 try 607 { 608 ServerSocket tmpSocket = new ServerSocket(); 609 tmpSocket.bind(new InetSocketAddress(port)); 610 tmpSocket.close(); 611 } 612 catch (Exception e) 613 { 614 Message message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage()); 615 unacceptableReasons.add(message); 616 return false; 617 } 618 619 return true; 620 } 621 622 /** 623 * {@inheritDoc} 624 */ 625 public ConfigChangeResult applyConfigurationChange( 626 ReplicationServerCfg configuration) 627 { 628 // Changing those properties don't need specific code. 629 // They will be applied for next connections. 630 replicationServers = configuration.getReplicationServer(); 631 if (replicationServers == null) 632 replicationServers = new ArrayList<String>(); 633 queueSize = configuration.getQueueSize(); 634 long newPurgeDelay = configuration.getReplicationPurgeDelay(); 635 if (newPurgeDelay != purgeDelay) 636 { 637 purgeDelay = newPurgeDelay; 638 // propagate 639 for (ReplicationServerDomain domain : baseDNs.values()) 640 { 641 domain.setPurgeDelay(purgeDelay); 642 } 643 } 644 645 rcvWindow = configuration.getWindowSize(); 646 647 // changing the listen port requires to stop the listen thread 648 // and restart it. 649 int newPort = configuration.getReplicationPort(); 650 if (newPort != replicationPort) 651 { 652 stopListen = true; 653 try 654 { 655 listenSocket.close(); 656 listenThread.join(); 657 stopListen = false; 658 659 replicationPort = newPort; 660 String localhostname = InetAddress.getLocalHost().getHostName(); 661 String localAdddress = InetAddress.getLocalHost().getHostAddress(); 662 serverURL = localhostname + ":" + String.valueOf(replicationPort); 663 localURL = localAdddress + ":" + String.valueOf(replicationPort); 664 listenSocket = new ServerSocket(); 665 listenSocket.setReceiveBufferSize(1000000); 666 listenSocket.bind(new InetSocketAddress(replicationPort)); 667 668 listenThread = 669 new ReplicationServerListenThread( 670 "Replication Server Listener", this); 671 listenThread.start(); 672 } 673 catch (IOException e) 674 { 675 Message message = ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString()); 676 logError(message); 677 new ConfigChangeResult(DirectoryServer.getServerErrorResultCode(), 678 false); 679 } 680 catch (InterruptedException e) 681 { 682 Message message = ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString()); 683 logError(message); 684 new ConfigChangeResult(DirectoryServer.getServerErrorResultCode(), 685 false); 686 } 687 } 688 689 if ((configuration.getReplicationDBDirectory() != null) && 690 (!dbDirname.equals(configuration.getReplicationDBDirectory()))) 691 { 692 return new ConfigChangeResult(ResultCode.SUCCESS, true); 693 } 694 695 return new ConfigChangeResult(ResultCode.SUCCESS, false); 696 } 697 698 /** 699 * {@inheritDoc} 700 */ 701 public boolean isConfigurationChangeAcceptable( 702 ReplicationServerCfg configuration, List<Message> unacceptableReasons) 703 { 704 return true; 705 } 706 707 /** 708 * {@inheritDoc} 709 */ 710 @Override 711 public void initializeMonitorProvider(MonitorProviderCfg configuraiton) 712 { 713 // Nothing to do for now 714 } 715 716 /** 717 * {@inheritDoc} 718 */ 719 @Override 720 public String getMonitorInstanceName() 721 { 722 return "Replication Server " + this.replicationPort + " " 723 + replicationServerId; 724 } 725 726 /** 727 * {@inheritDoc} 728 */ 729 @Override 730 public long getUpdateInterval() 731 { 732 /* we don't wont to do polling on this monitor */ 733 return 0; 734 } 735 736 /** 737 * {@inheritDoc} 738 */ 739 @Override 740 public void updateMonitorData() 741 { 742 // As long as getUpdateInterval() returns 0, this will never get called 743 744 } 745 746 /** 747 * {@inheritDoc} 748 */ 749 @Override 750 public ArrayList<Attribute> getMonitorData() 751 { 752 /* 753 * publish the server id and the port number. 754 */ 755 ArrayList<Attribute> attributes = new ArrayList<Attribute>(); 756 attributes.add(new Attribute("replication server id", 757 String.valueOf(serverId))); 758 attributes.add(new Attribute("replication server port", 759 String.valueOf(replicationPort))); 760 761 /* 762 * Add all the base DNs that are known by this replication server. 763 */ 764 AttributeType baseType= 765 DirectoryServer.getAttributeType("base-dn", true); 766 LinkedHashSet<AttributeValue> baseValues = 767 new LinkedHashSet<AttributeValue>(); 768 for (DN base : baseDNs.keySet()) 769 { 770 baseValues.add(new AttributeValue(baseType, base. toString())); 771 } 772 773 Attribute bases = new Attribute(baseType, "base-dn", baseValues); 774 attributes.add(bases); 775 776 // Publish to monitor the generation ID by replicationServerDomain 777 AttributeType generationIdType= 778 DirectoryServer.getAttributeType("base-dn-generation-id", true); 779 LinkedHashSet<AttributeValue> generationIdValues = 780 new LinkedHashSet<AttributeValue>(); 781 for (DN base : baseDNs.keySet()) 782 { 783 long generationId=-1; 784 ReplicationServerDomain replicationServerDomain = 785 getReplicationServerDomain(base, false); 786 if (replicationServerDomain != null) 787 generationId = replicationServerDomain.getGenerationId(); 788 generationIdValues.add(new AttributeValue(generationIdType, 789 base.toString() + " " + generationId)); 790 } 791 Attribute generationIds = new Attribute(generationIdType, "generation-id", 792 generationIdValues); 793 attributes.add(generationIds); 794 795 return attributes; 796 } 797 798 /** 799 * Get the value of generationId for the replication replicationServerDomain 800 * associated with the provided baseDN. 801 * 802 * @param baseDN The baseDN of the replicationServerDomain. 803 * @return The value of the generationID. 804 */ 805 public long getGenerationId(DN baseDN) 806 { 807 ReplicationServerDomain rsd = 808 this.getReplicationServerDomain(baseDN, false); 809 if (rsd!=null) 810 return rsd.getGenerationId(); 811 return -1; 812 } 813 814 /** 815 * Get the serverId for this replication server. 816 * 817 * @return The value of the serverId. 818 * 819 */ 820 public short getServerId() 821 { 822 return serverId; 823 } 824 825 /** 826 * Creates the backend associated to this replication server. 827 * @throws ConfigException 828 */ 829 private void createBackend() 830 throws ConfigException 831 { 832 try 833 { 834 String ldif = makeLdif( 835 "dn: ds-cfg-backend-id="+backendId+",cn=Backends,cn=config", 836 "objectClass: top", 837 "objectClass: ds-cfg-backend", 838 "ds-cfg-base-dn: dc="+backendId, 839 "ds-cfg-enabled: true", 840 "ds-cfg-writability-mode: enabled", 841 "ds-cfg-java-class: " + 842 "org.opends.server.replication.server.ReplicationBackend", 843 "ds-cfg-backend-id: " + backendId); 844 845 LDIFImportConfig ldifImportConfig = new LDIFImportConfig( 846 new StringReader(ldif)); 847 LDIFReader reader = new LDIFReader(ldifImportConfig); 848 Entry backendConfigEntry = reader.readEntry(); 849 if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN)) 850 { 851 // Add the replication backend 852 DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null); 853 } 854 ldifImportConfig.close(); 855 } 856 catch(Exception e) 857 { 858 MessageBuilder mb = new MessageBuilder(); 859 mb.append(e.getLocalizedMessage()); 860 Message msg = ERR_CHECK_CREATE_REPL_BACKEND_FAILED.get(mb.toString()); 861 throw new ConfigException(msg, e); 862 863 } 864 } 865 866 private static String makeLdif(String... lines) 867 { 868 StringBuilder buffer = new StringBuilder(); 869 for (String line : lines) { 870 buffer.append(line).append(EOL); 871 } 872 // Append an extra line so we can append LDIF Strings. 873 buffer.append(EOL); 874 return buffer.toString(); 875 } 876 877 /** 878 * Do what needed when the config object related to this replication server 879 * is deleted from the server configuration. 880 */ 881 public void remove() 882 { 883 if (debugEnabled()) 884 TRACER.debugInfo("RS " +getMonitorInstanceName()+ 885 " starts removing"); 886 887 shutdown(); 888 removeBackend(); 889 890 DirectoryServer.deregisterBackupTaskListener(this); 891 DirectoryServer.deregisterRestoreTaskListener(this); 892 DirectoryServer.deregisterExportTaskListener(this); 893 DirectoryServer.deregisterImportTaskListener(this); 894 } 895 896 /** 897 * Removes the backend associated to this Replication Server that has been 898 * created when this replication server was created. 899 */ 900 protected void removeBackend() 901 { 902 try 903 { 904 if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN)) 905 { 906 // Delete the replication backend 907 DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN, 908 null); 909 } 910 } 911 catch(Exception e) 912 { 913 MessageBuilder mb = new MessageBuilder(); 914 mb.append(e.getLocalizedMessage()); 915 Message msg = ERR_DELETE_REPL_BACKEND_FAILED.get(mb.toString()); 916 logError(msg); 917 } 918 } 919 /** 920 * {@inheritDoc} 921 */ 922 public void processBackupBegin(Backend backend, BackupConfig config) 923 { 924 // Nothing is needed at the moment 925 } 926 927 /** 928 * {@inheritDoc} 929 */ 930 public void processBackupEnd(Backend backend, BackupConfig config, 931 boolean successful) 932 { 933 // Nothing is needed at the moment 934 } 935 936 /** 937 * {@inheritDoc} 938 */ 939 public void processRestoreBegin(Backend backend, RestoreConfig config) 940 { 941 if (backend.getBackendID().equals(backendId)) 942 shutdown(); 943 } 944 945 /** 946 * {@inheritDoc} 947 */ 948 public void processRestoreEnd(Backend backend, RestoreConfig config, 949 boolean successful) 950 { 951 if (backend.getBackendID().equals(backendId)) 952 initialize(this.replicationServerId, this.replicationPort); 953 } 954 955 /** 956 * {@inheritDoc} 957 */ 958 public void processImportBegin(Backend backend, LDIFImportConfig config) 959 { 960 // Nothing is needed at the moment 961 } 962 963 /** 964 * {@inheritDoc} 965 */ 966 public void processImportEnd(Backend backend, LDIFImportConfig config, 967 boolean successful) 968 { 969 // Nothing is needed at the moment 970 } 971 972 /** 973 * {@inheritDoc} 974 */ 975 public void processExportBegin(Backend backend, LDIFExportConfig config) 976 { 977 if (debugEnabled()) 978 TRACER.debugInfo("RS " +getMonitorInstanceName()+ 979 " Export starts"); 980 if (backend.getBackendID().equals(backendId)) 981 { 982 // Retrieves the backend related to this replicationServerDomain 983 // backend = 984 ReplicationBackend b = 985 (ReplicationBackend)DirectoryServer.getBackend(backendId); 986 b.setServer(this); 987 } 988 } 989 990 /** 991 * {@inheritDoc} 992 */ 993 public void processExportEnd(Backend backend, LDIFExportConfig config, 994 boolean successful) 995 { 996 // Nothing is needed at the moment 997 } 998 999 /** 1000 * Returns an iterator on the list of replicationServerDomain. 1001 * Returns null if none. 1002 * @return the iterator. 1003 */ 1004 public Iterator<ReplicationServerDomain> getCacheIterator() 1005 { 1006 if (!baseDNs.isEmpty()) 1007 return baseDNs.values().iterator(); 1008 else 1009 return null; 1010 } 1011 1012 /** 1013 * Clears the Db associated with that server. 1014 */ 1015 public void clearDb() 1016 { 1017 Iterator<ReplicationServerDomain> rcachei = getCacheIterator(); 1018 if (rcachei != null) 1019 { 1020 while (rcachei.hasNext()) 1021 { 1022 ReplicationServerDomain rsd = rcachei.next(); 1023 rsd.clearDbs(); 1024 } 1025 } 1026 } 1027 }