001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    
029    import org.opends.messages.*;
030    
031    import static org.opends.server.loggers.ErrorLogger.logError;
032    import static org.opends.server.loggers.debug.DebugLogger.*;
033    import org.opends.server.loggers.debug.DebugTracer;
034    import static org.opends.messages.ReplicationMessages.*;
035    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
036    
037    import java.io.IOException;
038    import java.net.ConnectException;
039    import java.net.InetAddress;
040    import java.net.InetSocketAddress;
041    import java.net.Socket;
042    import java.net.SocketException;
043    import java.net.SocketTimeoutException;
044    import java.util.Collection;
045    import java.util.HashMap;
046    import java.util.Iterator;
047    import java.util.LinkedHashSet;
048    import java.util.TreeSet;
049    import java.util.concurrent.Semaphore;
050    import java.util.concurrent.TimeUnit;
051    
052    import org.opends.server.protocols.asn1.ASN1OctetString;
053    import org.opends.server.protocols.internal.InternalClientConnection;
054    import org.opends.server.protocols.internal.InternalSearchListener;
055    import org.opends.server.protocols.internal.InternalSearchOperation;
056    import org.opends.server.protocols.ldap.LDAPFilter;
057    import org.opends.server.replication.common.ChangeNumber;
058    import org.opends.server.replication.common.ServerState;
059    import org.opends.server.replication.protocol.*;
060    import org.opends.server.types.DN;
061    import org.opends.server.types.DereferencePolicy;
062    import org.opends.server.types.ResultCode;
063    import org.opends.server.types.SearchResultEntry;
064    import org.opends.server.types.SearchResultReference;
065    import org.opends.server.types.SearchScope;
066    
067    /**
068     * The broker for Multi-master Replication.
069     */
070    public class ReplicationBroker implements InternalSearchListener
071    {
072    
073      /**
074       * The tracer object for the debug logger.
075       */
076      private static final DebugTracer TRACER = getTracer();
077      private boolean shutdown = false;
078      private Collection<String> servers;
079      private boolean connected = false;
080      private String replicationServer = "Not connected";
081      private TreeSet<FakeOperation> replayOperations;
082      private ProtocolSession session = null;
083      private final ServerState state;
084      private final DN baseDn;
085      private final short serverID;
086      private int maxSendDelay;
087      private int maxReceiveDelay;
088      private int maxSendQueue;
089      private int maxReceiveQueue;
090      private Semaphore sendWindow;
091      private int maxSendWindow;
092      private int rcvWindow;
093      private int halfRcvWindow;
094      private int maxRcvWindow;
095      private int timeout = 0;
096      private short protocolVersion;
097      private long generationId = -1;
098      private ReplSessionSecurity replSessionSecurity;
099    
100      // Trick for avoiding a inner class for many parameters return for
101      // performHandshake method.
102      private String tmpReadableServerName = null;
103      /**
104       * The time in milliseconds between heartbeats from the replication
105       * server.  Zero means heartbeats are off.
106       */
107      private long heartbeatInterval = 0;
108      /**
109       * A thread to monitor heartbeats on the session.
110       */
111      private HeartbeatMonitor heartbeatMonitor = null;
112      /**
113       * The number of times the connection was lost.
114       */
115      private int numLostConnections = 0;
116      /**
117       * When the broker cannot connect to any replication server
118       * it log an error and keeps continuing every second.
119       * This boolean is set when the first failure happens and is used
120       * to avoid repeating the error message for further failure to connect
121       * and to know that it is necessary to print a new message when the broker
122       * finally succeed to connect.
123       */
124      private boolean connectionError = false;
125      private final Object connectPhaseLock = new Object();
126    
127      /**
128       * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
129       *
130       * @param state The ServerState that should be used by this broker
131       *              when negociating the session with the replicationServer.
132       * @param baseDn The base DN that should be used by this broker
133       *              when negociating the session with the replicationServer.
134       * @param serverID The server ID that should be used by this broker
135       *              when negociating the session with the replicationServer.
136       * @param maxReceiveQueue The maximum size of the receive queue to use on
137       *                         the replicationServer.
138       * @param maxReceiveDelay The maximum replication delay to use on the
139       *                        replicationServer.
140       * @param maxSendQueue The maximum size of the send queue to use on
141       *                     the replicationServer.
142       * @param maxSendDelay The maximum send delay to use on the replicationServer.
143       * @param window The size of the send and receive window to use.
144       * @param heartbeatInterval The interval between heartbeats requested of the
145       * replicationServer, or zero if no heartbeats are requested.
146       *
147       * @param generationId The generationId for the server associated to the
148       * provided serverID and for the domain associated to the provided baseDN.
149       * @param replSessionSecurity The session security configuration.
150       */
151      public ReplicationBroker(ServerState state, DN baseDn, short serverID,
152        int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
153        int maxSendDelay, int window, long heartbeatInterval,
154        long generationId, ReplSessionSecurity replSessionSecurity)
155      {
156        this.baseDn = baseDn;
157        this.serverID = serverID;
158        this.maxReceiveDelay = maxReceiveDelay;
159        this.maxSendDelay = maxSendDelay;
160        this.maxReceiveQueue = maxReceiveQueue;
161        this.maxSendQueue = maxSendQueue;
162        this.state = state;
163        replayOperations =
164          new TreeSet<FakeOperation>(new FakeOperationComparator());
165        this.rcvWindow = window;
166        this.maxRcvWindow = window;
167        this.halfRcvWindow = window / 2;
168        this.heartbeatInterval = heartbeatInterval;
169        this.protocolVersion = ProtocolVersion.currentVersion();
170        this.generationId = generationId;
171        this.replSessionSecurity = replSessionSecurity;
172      }
173    
174      /**
175       * Start the ReplicationBroker.
176       *
177       * @param servers list of servers used
178       */
179      public void start(Collection<String> servers)
180      {
181        /*
182         * Open Socket to the ReplicationServer
183         * Send the Start message
184         */
185        shutdown = false;
186        this.servers = servers;
187        if (servers.size() < 1)
188        {
189          Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
190          logError(message);
191        }
192    
193        this.rcvWindow = this.maxRcvWindow;
194        this.connect();
195      }
196    
197      /**
198       * Connect to a ReplicationServer.
199       *
200       * @throws NumberFormatException address was invalid
201       */
202      private void connect()
203      {
204        HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
205    
206        // Stop any existing heartbeat monitor from a previous session.
207        stopHeartBeat();
208    
209        synchronized (connectPhaseLock)
210        {
211          /*
212           * Connect to each replication server and get their ServerState then find
213           * out which one is the best to connect to.
214           */
215          for (String server : servers)
216          {
217            // Connect to server and get reply message
218            ReplServerStartMessage replServerStartMsg =
219              performHandshake(server, false);
220            tmpReadableServerName = null; // Not needed now
221    
222            // Store reply message in list
223            if (replServerStartMsg != null)
224            {
225              ServerState rsState = replServerStartMsg.getServerState();
226              rsStates.put(server, rsState);
227            }
228          } // for servers
229    
230          ReplServerStartMessage replServerStartMsg = null;
231    
232          if (rsStates.size() > 0)
233          {
234    
235            // At least one server answered, find the best one.
236            String bestServer = computeBestReplicationServer(state, rsStates,
237              serverID, baseDn);
238    
239            // Best found, now connect to this one
240            replServerStartMsg = performHandshake(bestServer, true);
241    
242            if (replServerStartMsg != null)
243            {
244              try
245              {
246                /*
247                 * We must not publish changes to a replicationServer that has not
248                 * seen all our previous changes because this could cause some
249                 * other ldap servers to miss those changes.
250                 * Check that the ReplicationServer has seen all our previous
251                 * changes.
252                 */
253                ChangeNumber replServerMaxChangeNumber =
254                  replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
255    
256                if (replServerMaxChangeNumber == null)
257                {
258                  replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
259                }
260                ChangeNumber ourMaxChangeNumber =
261                  state.getMaxChangeNumber(serverID);
262    
263                if ((ourMaxChangeNumber != null) &&
264                  (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
265                {
266    
267                  // Replication server is missing some of our changes: let's send
268                  // them to him.
269                  replayOperations.clear();
270    
271                  Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
272                  logError(message);
273    
274                  /*
275                   * Get all the changes that have not been seen by this
276                   * replication server and populate the replayOperations
277                   * list.
278                   */
279                  InternalSearchOperation op = searchForChangedEntries(
280                    baseDn, replServerMaxChangeNumber, this);
281                  if (op.getResultCode() != ResultCode.SUCCESS)
282                  {
283                    /*
284                     * An error happened trying to search for the updates
285                     * This server will start acepting again new updates but
286                     * some inconsistencies will stay between servers.
287                     * Log an error for the repair tool
288                     * that will need to resynchronize the servers.
289                     */
290                    message = ERR_CANNOT_RECOVER_CHANGES.get(
291                      baseDn.toNormalizedString());
292                    logError(message);
293                  } else
294                  {
295                    for (FakeOperation replayOp : replayOperations)
296                    {
297                      message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
298                        toString());
299                      logError(message);
300                      session.publish(replayOp.generateMessage());
301                    }
302                    message = DEBUG_CHANGES_SENT.get();
303                    logError(message);
304                  }
305                }
306    
307                replicationServer = tmpReadableServerName;
308                maxSendWindow = replServerStartMsg.getWindowSize();
309                connected = true;
310                startHeartBeat();
311              } catch (IOException e)
312              {
313                Message message = ERR_PUBLISHING_FAKE_OPS.get(
314                  baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
315                  stackTraceToSingleLineString(e));
316                logError(message);
317              } catch (Exception e)
318              {
319                Message message = ERR_COMPUTING_FAKE_OPS.get(
320                  baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
321                  stackTraceToSingleLineString(e));
322                logError(message);
323              } finally
324              {
325                if (connected == false)
326                {
327                  if (session != null)
328                  {
329                    try
330                    {
331                      session.close();
332                    } catch (IOException e)
333                    {
334                    // The session was already closed, just ignore.
335                    }
336                    session = null;
337                  }
338                }
339              }
340            } // Could perform handshake with best
341          } // Reached some servers
342    
343          if (connected)
344          {
345            // Log a message to let the administrator know that the failure was
346            // resolved.
347            // Wakeup all the thread that were waiting on the window
348            // on the previous connection.
349            connectionError = false;
350            if (sendWindow != null)
351            {
352              sendWindow.release(Integer.MAX_VALUE);
353            }
354            this.sendWindow = new Semaphore(maxSendWindow);
355            connectPhaseLock.notify();
356    
357            if ((replServerStartMsg.getGenerationId() == this.generationId) ||
358              (replServerStartMsg.getGenerationId() == -1))
359            {
360              Message message =
361                NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
362                baseDn.toString(),
363                replicationServer,
364                Long.toString(this.generationId));
365              logError(message);
366            } else
367            {
368              Message message =
369                NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
370                baseDn.toString(),
371                replicationServer,
372                Long.toString(this.generationId),
373                Long.toString(replServerStartMsg.getGenerationId()));
374              logError(message);
375            }
376          } else
377          {
378            /*
379             * This server could not find any replicationServer. It's going to start
380             * in degraded mode. Log a message.
381             */
382            if (!connectionError)
383            {
384              connectionError = true;
385              connectPhaseLock.notify();
386              Message message =
387                NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
388              logError(message);
389            }
390          }
391        }
392      }
393    
394      /**
395       * Connect to the provided server performing the handshake (start messages
396       * exchange) and return the reply message from the replication server.
397       *
398       * @param server Server to connect to.
399       * @param keepConnection Do we keep session opened or not after handshake.
400       * @return The ReplServerStartMessage the server replied. Null if could not
401       *         get an answer.
402       */
403      public ReplServerStartMessage performHandshake(String server,
404        boolean keepConnection)
405      {
406        ReplServerStartMessage replServerStartMsg = null;
407    
408        // Parse server string.
409        int separator = server.lastIndexOf(':');
410        String port = server.substring(separator + 1);
411        String hostname = server.substring(0, separator);
412    
413        boolean error = false;
414        try
415        {
416          /*
417           * Open a socket connection to the next candidate.
418           */
419          int intPort = Integer.parseInt(port);
420          InetSocketAddress serverAddr = new InetSocketAddress(
421            InetAddress.getByName(hostname), intPort);
422          tmpReadableServerName = serverAddr.toString();
423          Socket socket = new Socket();
424          socket.setReceiveBufferSize(1000000);
425          socket.setTcpNoDelay(true);
426          socket.connect(serverAddr, 500);
427          session = replSessionSecurity.createClientSession(server, socket);
428          boolean isSslEncryption =
429            replSessionSecurity.isSslEncryption(server);
430          /*
431           * Send our ServerStartMessage.
432           */
433          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
434            maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
435            halfRcvWindow * 2, heartbeatInterval, state,
436            protocolVersion, generationId, isSslEncryption, !keepConnection);
437          session.publish(msg);
438    
439          /*
440           * Read the ReplServerStartMessage that should come back.
441           */
442          session.setSoTimeout(1000);
443          replServerStartMsg = (ReplServerStartMessage) session.receive();
444    
445          /*
446           * We have sent our own protocol version to the replication server.
447           * The replication server will use the same one (or an older one
448           * if it is an old replication server).
449           */
450          protocolVersion = ProtocolVersion.minWithCurrent(
451            replServerStartMsg.getVersion());
452          session.setSoTimeout(timeout);
453    
454          if (!isSslEncryption)
455          {
456            session.stopEncryption();
457          }
458        } catch (ConnectException e)
459        {
460          /*
461           * There was no server waiting on this host:port
462           * Log a notice and try the next replicationServer in the list
463           */
464          if (!connectionError)
465          {
466            // the error message is only logged once to avoid overflowing
467            // the error log
468            Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
469            logError(message);
470          }
471          error = true;
472        } catch (Exception e)
473        {
474          Message message = ERR_EXCEPTION_STARTING_SESSION.get(
475            baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
476            stackTraceToSingleLineString(e));
477          logError(message);
478          error = true;
479        }
480    
481        // Close session if requested
482        if (!keepConnection || error)
483        {
484          if (session != null)
485          {
486            try
487            {
488              session.close();
489            } catch (IOException e)
490            {
491            // The session was already closed, just ignore.
492            }
493            session = null;
494          }
495          if (error)
496          {
497            replServerStartMsg = null;
498          } // Be sure to return null.
499        }
500    
501        return replServerStartMsg;
502      }
503    
504      /**
505       * Returns the replication server that best fits our need so that we can
506       * connect to it.
507       *
508       * Note: this method put as public static for unit testing purpose.
509       *
510       * @param myState The local server state.
511       * @param rsStates The list of available replication servers and their
512       *                 associated server state.
513       * @param serverId The server id for the suffix we are working for.
514       * @param baseDn The suffix for which we are working for.
515       * @return The computed best replication server.
516       */
517      public static String computeBestReplicationServer(ServerState myState,
518        HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
519      {
520    
521        /*
522         * Find replication servers who are up to date (or more up to date than us,
523         * if for instance we failed and restarted, having sent some changes to the
524         * RS but without having time to store our own state) regarding our own
525         * server id. Then, among them, choose the server that is the most up to
526         * date regarding the whole topology.
527         *
528         * If no server is up to date regarding our own server id, find the one who
529         * is the most up to date regarding our server id.
530         */
531    
532        // Should never happen (sanity check)
533        if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
534          (baseDn == null))
535        {
536          return null;
537        }
538    
539        String bestServer = null;
540        // Servers up to dates with regard to our changes
541        HashMap<String, ServerState> upToDateServers =
542          new HashMap<String, ServerState>();
543        // Servers late with regard to our changes
544        HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
545    
546        /*
547         * Start loop to differenciate up to date servers from late ones.
548         */
549        ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
550        if (myChangeNumber == null)
551        {
552          myChangeNumber = new ChangeNumber(0, 0, serverId);
553        }
554        for (String repServer : rsStates.keySet())
555        {
556    
557          ServerState rsState = rsStates.get(repServer);
558          ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
559          if (rsChangeNumber == null)
560          {
561            rsChangeNumber = new ChangeNumber(0, 0, serverId);
562          }
563    
564          // Store state in right list
565          if (myChangeNumber.olderOrEqual(rsChangeNumber))
566          {
567            upToDateServers.put(repServer, rsState);
568          } else
569          {
570            lateOnes.put(repServer, rsState);
571          }
572        }
573    
574        if (upToDateServers.size() > 0)
575        {
576    
577          /*
578           * Some up to date servers, among them, choose the one that has the
579           * maximum number of changes to send us. This is the most up to date one
580           * regarding the whole topology. This server is the one which has the less
581           * difference with the topology server state. For comparison, we need to
582           * compute the difference for each server id with the topology server
583           * state.
584           */
585    
586          Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
587            upToDateServers.size(),
588            baseDn.toNormalizedString());
589          logError(message);
590    
591          /*
592           * First of all, compute the virtual server state for the whole topology,
593           * which is composed of the most up to date change numbers for
594           * each server id in the topology.
595           */
596          ServerState topoState = new ServerState();
597          for (ServerState curState : upToDateServers.values())
598          {
599    
600            Iterator<Short> it = curState.iterator();
601            while (it.hasNext())
602            {
603              Short sId = it.next();
604              ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
605              if (curSidCn == null)
606              {
607                curSidCn = new ChangeNumber(0, 0, sId);
608              }
609              // Update topology state
610              topoState.update(curSidCn);
611            }
612          } // For up to date servers
613    
614          // Min of the max shifts
615          long minShift = -1L;
616          for (String upServer : upToDateServers.keySet())
617          {
618    
619            /*
620             * Compute the maximum difference between the time of a server id's
621             * change number and the time of the matching server id's change
622             * number in the topology server state.
623             *
624             * Note: we could have used the sequence number here instead of the
625             * timestamp, but this would have caused a problem when the sequence
626             * number loops and comes back to 0 (computation would have becomen
627             * meaningless).
628             */
629            long shift = -1L;
630            ServerState curState = upToDateServers.get(upServer);
631            Iterator<Short> it = curState.iterator();
632            while (it.hasNext())
633            {
634              Short sId = it.next();
635              ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
636              if (curSidCn == null)
637              {
638                curSidCn = new ChangeNumber(0, 0, sId);
639              }
640              // Cannot be null as checked at construction time
641              ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
642              // Cannot be negative as topoState computed as being the max CN
643              // for each server id in the topology
644              long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
645              if (tmpShift > shift)
646              {
647                shift = tmpShift;
648              }
649            }
650    
651            if ((minShift < 0) // First time in loop
652              || (shift < minShift))
653            {
654              // This sever is even closer to topo state
655              bestServer = upServer;
656              minShift = shift;
657            }
658          } // For up to date servers
659    
660        } else
661        {
662          /*
663           * We could not find a replication server that has seen all the
664           * changes that this server has already processed,
665           */
666          // lateOnes cannot be empty
667          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
668            baseDn.toNormalizedString(), lateOnes.size());
669          logError(message);
670    
671          // Min of the shifts
672          long minShift = -1L;
673          for (String lateServer : lateOnes.keySet())
674          {
675    
676            /*
677             * Choose the server who is the closest to us regarding our server id
678             * (this is the most up to date regarding our server id).
679             */
680            ServerState curState = lateOnes.get(lateServer);
681            ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
682            if (ourSidCn == null)
683            {
684              ourSidCn = new ChangeNumber(0, 0, serverId);
685            }
686            // Cannot be negative as our Cn for our server id is strictly
687            // greater than those of the servers in late server list
688            long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
689    
690            if ((minShift < 0) // First time in loop
691              || (tmpShift < minShift))
692            {
693              // This sever is even closer to topo state
694              bestServer = lateServer;
695              minShift = tmpShift;
696            }
697          } // For late servers
698        }
699    
700        return bestServer;
701      }
702    
703      /**
704       * Search for the changes that happened since fromChangeNumber
705       * based on the historical attribute.
706       * @param baseDn the base DN
707       * @param fromChangeNumber The change number from which we want the changes
708       * @param resultListener that will process the entries returned.
709       * @return the internal search operation
710       * @throws Exception when raised.
711       */
712      public static InternalSearchOperation searchForChangedEntries(
713        DN baseDn,
714        ChangeNumber fromChangeNumber,
715        InternalSearchListener resultListener)
716        throws Exception
717      {
718        InternalClientConnection conn =
719          InternalClientConnection.getRootConnection();
720        LDAPFilter filter = LDAPFilter.decode(
721          "(" + Historical.HISTORICALATTRIBUTENAME +
722          ">=dummy:" + fromChangeNumber + ")");
723        LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
724        attrs.add(Historical.HISTORICALATTRIBUTENAME);
725        attrs.add(Historical.ENTRYUIDNAME);
726        return conn.processSearch(
727          new ASN1OctetString(baseDn.toString()),
728          SearchScope.WHOLE_SUBTREE,
729          DereferencePolicy.NEVER_DEREF_ALIASES,
730          0, 0, false, filter,
731          attrs,
732          resultListener);
733      }
734    
735      /**
736       * Start the heartbeat monitor thread.
737       */
738      private void startHeartBeat()
739      {
740        // Start a heartbeat monitor thread.
741        if (heartbeatInterval > 0)
742        {
743          heartbeatMonitor =
744            new HeartbeatMonitor("Replication Heartbeat Monitor on " +
745            baseDn + " with " + getReplicationServer(),
746            session, heartbeatInterval);
747          heartbeatMonitor.start();
748        }
749      }
750    
751      /**
752       * Stop the heartbeat monitor thread.
753       */
754      void stopHeartBeat()
755      {
756        if (heartbeatMonitor != null)
757        {
758          heartbeatMonitor.shutdown();
759          heartbeatMonitor = null;
760        }
761      }
762    
763      /**
764       * restart the ReplicationBroker.
765       */
766      public void reStart()
767      {
768        reStart(this.session);
769      }
770    
771      /**
772       * Restart the ReplicationServer broker after a failure.
773       *
774       * @param failingSession the socket which failed
775       */
776      public void reStart(ProtocolSession failingSession)
777      {
778        try
779        {
780          if (failingSession != null)
781          {
782            failingSession.close();
783            numLostConnections++;
784          }
785        } catch (IOException e1)
786        {
787        // ignore
788        }
789    
790        if (failingSession == session)
791        {
792          this.connected = false;
793        }
794        while (!this.connected && (!this.shutdown))
795        {
796          try
797          {
798            this.connect();
799          } catch (Exception e)
800          {
801            MessageBuilder mb = new MessageBuilder();
802            mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
803              baseDn.toNormalizedString(), e.getLocalizedMessage()));
804            mb.append(stackTraceToSingleLineString(e));
805            logError(mb.toMessage());
806          }
807          if ((!connected) && (!shutdown))
808          {
809            try
810            {
811              Thread.sleep(500);
812            } catch (InterruptedException e)
813            {
814            // ignore
815            }
816          }
817        }
818      }
819    
820      /**
821       * Publish a message to the other servers.
822       * @param msg the message to publish
823       */
824      public void publish(ReplicationMessage msg)
825      {
826        boolean done = false;
827    
828        while (!done && !shutdown)
829        {
830          if (connectionError)
831          {
832            // It was not possible to connect to any replication server.
833            // Since the operation was already processed, we have no other
834            // choice than to return without sending the ReplicationMessage
835            // and relying on the resend procedure of the connect phase to
836            // fix the problem when we finally connect.
837    
838            if (debugEnabled())
839            {
840              debugInfo("ReplicationBroker.publish() Publishing a " +
841                " message is not possible due to existing connection error.");
842            }
843    
844            return;
845          }
846    
847          try
848          {
849            boolean credit;
850            ProtocolSession current_session;
851            Semaphore currentWindowSemaphore;
852    
853            // save the session at the time when we acquire the
854            // sendwindow credit so that we can make sure later
855            // that the session did not change in between.
856            // This is necessary to make sure that we don't publish a message
857            // on a session with a credit that was acquired from a previous
858            // session.
859            synchronized (connectPhaseLock)
860            {
861              current_session = session;
862              currentWindowSemaphore = sendWindow;
863            }
864    
865            if (msg instanceof UpdateMessage)
866            {
867              // Acquiring the window credit must be done outside of the
868              // connectPhaseLock because it can be blocking and we don't
869              // want to hold off reconnection in case the connection dropped.
870              credit =
871                currentWindowSemaphore.tryAcquire(
872                (long) 500, TimeUnit.MILLISECONDS);
873            } else
874            {
875              credit = true;
876            }
877            if (credit)
878            {
879              synchronized (connectPhaseLock)
880              {
881                // check the session. If it has changed, some
882                // deconnection/reconnection happened and we need to restart from
883                // scratch.
884                if (session == current_session)
885                {
886                  session.publish(msg);
887                  done = true;
888                }
889              }
890            }
891            if (!credit)
892            {
893              // the window is still closed.
894              // Send a WindowProbe message to wakeup the receiver in case the
895              // window update message was lost somehow...
896              // then loop to check again if connection was closed.
897              session.publish(new WindowProbe());
898            }
899          } catch (IOException e)
900          {
901            // The receive threads should handle reconnection or
902            // mark this broker in error. Just retry.
903            synchronized (connectPhaseLock)
904            {
905              try
906              {
907                connectPhaseLock.wait(100);
908              } catch (InterruptedException e1)
909              {
910                // ignore
911                if (debugEnabled())
912                {
913                  debugInfo("ReplicationBroker.publish() " +
914                    "IO exception raised : " + e.getLocalizedMessage());
915                }
916              }
917            }
918          } catch (InterruptedException e)
919          {
920            // just loop.
921            if (debugEnabled())
922            {
923              debugInfo("ReplicationBroker.publish() " +
924                "Interrupted exception raised." + e.getLocalizedMessage());
925            }
926          }
927        }
928      }
929    
930      /**
931       * Receive a message.
932       * This method is not multithread safe and should either always be
933       * called in a single thread or protected by a locking mechanism
934       * before being called.
935       *
936       * @return the received message
937       * @throws SocketTimeoutException if the timeout set by setSoTimeout
938       *         has expired
939       */
940      public ReplicationMessage receive() throws SocketTimeoutException
941      {
942        while (shutdown == false)
943        {
944          if (!connected)
945          {
946            reStart(null);
947          }
948    
949          ProtocolSession failingSession = session;
950          try
951          {
952            ReplicationMessage msg = session.receive();
953            if (msg instanceof WindowMessage)
954            {
955              WindowMessage windowMsg = (WindowMessage) msg;
956              sendWindow.release(windowMsg.getNumAck());
957            }
958            else
959            {
960              return msg;
961            }
962          } catch (SocketTimeoutException e)
963          {
964            throw e;
965          } catch (Exception e)
966          {
967            if (shutdown == false)
968            {
969              Message message =
970                NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
971              logError(message);
972    
973              debugInfo("ReplicationBroker.receive() " + baseDn +
974                " Exception raised." + e + e.getLocalizedMessage());
975              this.reStart(failingSession);
976            }
977          }
978        }
979        return null;
980      }
981    
982      /**
983       * This method allows to do the necessary computing for the window
984       * management after treatment by the worker threads.
985       *
986       * This should be called once the replay thread have done their job
987       * and the window can be open again.
988       */
989      public synchronized void updateWindowAfterReplay()
990      {
991        try
992        {
993          rcvWindow--;
994          if (rcvWindow < halfRcvWindow)
995          {
996            session.publish(new WindowMessage(halfRcvWindow));
997            rcvWindow += halfRcvWindow;
998          }
999        } catch (IOException e)
1000        {
1001          // Any error on the socket will be handled by the thread calling receive()
1002          // just ignore.
1003        }
1004      }
1005    
1006      /**
1007       * stop the server.
1008       */
1009      public void stop()
1010      {
1011        replicationServer = "stopped";
1012        shutdown = true;
1013        connected = false;
1014        try
1015        {
1016          if (debugEnabled())
1017          {
1018            debugInfo("ReplicationBroker is stopping. and will" +
1019              " close the connection");
1020          }
1021    
1022          if (session != null)
1023          {
1024            session.close();
1025          }
1026        } catch (IOException e)
1027        {
1028        }
1029      }
1030    
1031      /**
1032       * Set a timeout value.
1033       * With this option set to a non-zero value, calls to the receive() method
1034       * block for only this amount of time after which a
1035       * java.net.SocketTimeoutException is raised.
1036       * The Broker is valid and useable even after such an Exception is raised.
1037       *
1038       * @param timeout the specified timeout, in milliseconds.
1039       * @throws SocketException if there is an error in the underlying protocol,
1040       *         such as a TCP error.
1041       */
1042      public void setSoTimeout(int timeout) throws SocketException
1043      {
1044        this.timeout = timeout;
1045        if (session != null)
1046        {
1047          session.setSoTimeout(timeout);
1048        }
1049      }
1050    
1051      /**
1052       * Set the value of the generationId for that broker. Normally the
1053       * generationId is set through the constructor but there are cases
1054       * where the value of the generationId must be changed while the broker
1055       * already exist for example after an on-line import.
1056       *
1057       * @param generationId The value of the generationId.
1058       *
1059       */
1060      public void setGenerationId(long generationId)
1061      {
1062        this.generationId = generationId;
1063      }
1064    
1065      /**
1066       * Get the name of the replicationServer to which this broker is currently
1067       * connected.
1068       *
1069       * @return the name of the replicationServer to which this domain
1070       *         is currently connected.
1071       */
1072      public String getReplicationServer()
1073      {
1074        return replicationServer;
1075      }
1076    
1077      /**
1078       * {@inheritDoc}
1079       */
1080      public void handleInternalSearchEntry(
1081        InternalSearchOperation searchOperation,
1082        SearchResultEntry searchEntry)
1083      {
1084        /*
1085         * Only deal with modify operation so far
1086         * TODO : implement code for ADD, DEL, MODDN operation
1087         *
1088         * Parse all ds-sync-hist attribute values
1089         *   - for each Changenumber > replication server MaxChangeNumber :
1090         *          build an attribute mod
1091         *
1092         */
1093        Iterable<FakeOperation> updates =
1094          Historical.generateFakeOperations(searchEntry);
1095        for (FakeOperation op : updates)
1096        {
1097          replayOperations.add(op);
1098        }
1099      }
1100    
1101      /**
1102       * {@inheritDoc}
1103       */
1104      public void handleInternalSearchReference(
1105        InternalSearchOperation searchOperation,
1106        SearchResultReference searchReference)
1107      {
1108      // TODO to be implemented
1109      }
1110    
1111      /**
1112       * Get the maximum receive window size.
1113       *
1114       * @return The maximum receive window size.
1115       */
1116      public int getMaxRcvWindow()
1117      {
1118        return maxRcvWindow;
1119      }
1120    
1121      /**
1122       * Get the current receive window size.
1123       *
1124       * @return The current receive window size.
1125       */
1126      public int getCurrentRcvWindow()
1127      {
1128        return rcvWindow;
1129      }
1130    
1131      /**
1132       * Get the maximum send window size.
1133       *
1134       * @return The maximum send window size.
1135       */
1136      public int getMaxSendWindow()
1137      {
1138        return maxSendWindow;
1139      }
1140    
1141      /**
1142       * Get the current send window size.
1143       *
1144       * @return The current send window size.
1145       */
1146      public int getCurrentSendWindow()
1147      {
1148        if (connected)
1149        {
1150          return sendWindow.availablePermits();
1151        } else
1152        {
1153          return 0;
1154        }
1155      }
1156    
1157      /**
1158       * Get the number of times the connection was lost.
1159       * @return The number of times the connection was lost.
1160       */
1161      public int getNumLostConnections()
1162      {
1163        return numLostConnections;
1164      }
1165    
1166      /**
1167       * Change some config parameters.
1168       *
1169       * @param replicationServers    The new list of replication servers.
1170       * @param maxReceiveQueue     The max size of receive queue.
1171       * @param maxReceiveDelay     The max receive delay.
1172       * @param maxSendQueue        The max send queue.
1173       * @param maxSendDelay        The max Send Delay.
1174       * @param window              The max window size.
1175       * @param heartbeatInterval   The heartbeat interval.
1176       */
1177      public void changeConfig(Collection<String> replicationServers,
1178        int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
1179        int maxSendDelay, int window, long heartbeatInterval)
1180      {
1181        this.servers = replicationServers;
1182        this.maxRcvWindow = window;
1183        this.heartbeatInterval = heartbeatInterval;
1184        this.maxReceiveDelay = maxReceiveDelay;
1185        this.maxReceiveQueue = maxReceiveQueue;
1186        this.maxSendDelay = maxSendDelay;
1187        this.maxSendQueue = maxSendQueue;
1188      // TODO : Changing those parameters requires to either restart a new
1189      // session with the replicationServer or renegociate the parameters that
1190      // were sent in the ServerStart message
1191      }
1192    
1193      /**
1194       * Get the version of the replication protocol.
1195       * @return The version of the replication protocol.
1196       */
1197      public short getProtocolVersion()
1198      {
1199        return protocolVersion;
1200      }
1201    
1202      /**
1203       * Check if the broker is connected to a ReplicationServer and therefore
1204       * ready to received and send Replication Messages.
1205       *
1206       * @return true if the server is connected, false if not.
1207       */
1208      public boolean isConnected()
1209      {
1210        return !connectionError;
1211      }
1212    
1213      private boolean debugEnabled()
1214      {
1215        return true;
1216      }
1217    
1218      private static final void debugInfo(String s)
1219      {
1220        logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
1221        TRACER.debugInfo(s);
1222      }
1223    
1224      /**
1225       * Determine whether the connection to the replication server is encrypted.
1226       * @return true if the connection is encrypted, false otherwise.
1227       */
1228      public boolean isSessionEncrypted()
1229      {
1230        boolean isEncrypted = false;
1231        if (session != null)
1232        {
1233          return session.isEncrypted();
1234        }
1235        return isEncrypted;
1236      }
1237    }