001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2008 Sun Microsystems, Inc.
026     */
027    
028    package org.opends.server.replication.protocol;
029    
030    import org.opends.server.api.DirectoryThread;
031    import static org.opends.server.loggers.debug.DebugLogger.*;
032    
033    import org.opends.server.loggers.debug.DebugTracer;
034    
035    import java.io.IOException;
036    
037    /**
038     * This thread publishes a heartbeat message on a given protocol session at
039     * regular intervals when there are no other replication messages being
040     * published.
041     */
042    public class HeartbeatThread extends DirectoryThread
043    {
044      /**
045       * The tracer object for the debug logger.
046       */
047      private static final DebugTracer TRACER = getTracer();
048    
049    
050      /**
051       * For test purposes only to simulate loss of heartbeats.
052       */
053      static private boolean heartbeatsDisabled = false;
054    
055      /**
056       * The session on which heartbeats are to be sent.
057       */
058      private ProtocolSession session;
059    
060    
061      /**
062       * The time in milliseconds between heartbeats.
063       */
064      private long heartbeatInterval;
065    
066    
067      /**
068       * Set this to stop the thread.
069       */
070      private Boolean shutdown = false;
071      private final Object shutdown_lock = new Object();
072    
073    
074      /**
075       * Create a heartbeat thread.
076       * @param threadName The name of the heartbeat thread.
077       * @param session The session on which heartbeats are to be sent.
078       * @param heartbeatInterval The desired interval between heartbeats in
079       * milliseconds.
080       */
081      public HeartbeatThread(String threadName, ProtocolSession session,
082                      long heartbeatInterval)
083      {
084        super(threadName);
085        this.session = session;
086        this.heartbeatInterval = heartbeatInterval;
087      }
088    
089      /**
090       * {@inheritDoc}
091       */
092      @Override
093      public void run()
094      {
095        try
096        {
097          if (debugEnabled())
098          {
099            TRACER.debugInfo("Heartbeat thread is starting, interval is %d",
100                      heartbeatInterval);
101          }
102          HeartbeatMessage heartbeatMessage = new HeartbeatMessage();
103    
104          while (!shutdown)
105          {
106            long now = System.currentTimeMillis();
107            if (debugEnabled())
108            {
109              TRACER.debugVerbose("Heartbeat thread awoke at %d, last message " +
110                  "was sent at %d", now, session.getLastPublishTime());
111            }
112    
113            if (now > session.getLastPublishTime() + heartbeatInterval)
114            {
115              if (!heartbeatsDisabled)
116              {
117                if (debugEnabled())
118                {
119                  TRACER.debugVerbose("Heartbeat sent at %d", now);
120                }
121                session.publish(heartbeatMessage);
122              }
123            }
124    
125            try
126            {
127              long sleepTime = session.getLastPublishTime() +
128                  heartbeatInterval - now;
129              if (sleepTime <= 0)
130              {
131                sleepTime = heartbeatInterval;
132              }
133    
134              if (debugEnabled())
135              {
136                TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
137              }
138    
139              synchronized (shutdown_lock)
140              {
141                if (!shutdown)
142                {
143                  shutdown_lock.wait(sleepTime);
144                }
145              }
146            }
147            catch (InterruptedException e)
148            {
149              // Keep looping.
150            }
151          }
152        }
153        catch (IOException e)
154        {
155          if (debugEnabled())
156          {
157            TRACER.debugInfo("Heartbeat thread could not send a heartbeat.");
158          }
159          // This will be caught in another thread.
160        }
161        finally
162        {
163          if (debugEnabled())
164          {
165            TRACER.debugInfo("Heartbeat thread is exiting.");
166          }
167        }
168      }
169    
170    
171      /**
172       * Call this method to stop the thread.
173       * This method is blocking until the thread has stopped.
174       */
175      public void shutdown()
176      {
177        synchronized (shutdown_lock)
178        {
179          shutdown = true;
180          shutdown_lock.notifyAll();
181          if (debugEnabled())
182          {
183            TRACER.debugInfo("Going to notify Heartbeat thread.");
184          }
185        }
186        if (debugEnabled())
187        {
188          TRACER.debugInfo("Returning from Heartbeat shutdown.");
189        }
190      }
191    
192    
193      /**
194       * For testing purposes only to simulate loss of heartbeats.
195       * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
196       */
197      public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
198      {
199        HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
200      }
201    }