001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.extensions;
028    import org.opends.messages.Message;
029    
030    
031    
032    import java.util.Map;
033    
034    import org.opends.server.api.DirectoryThread;
035    import org.opends.server.core.DirectoryServer;
036    import org.opends.server.types.AbstractOperation;
037    import org.opends.server.types.CancelRequest;
038    import org.opends.server.types.DebugLogLevel;
039    import org.opends.server.types.DisconnectReason;
040    
041    import static org.opends.server.loggers.ErrorLogger.*;
042    import static org.opends.server.loggers.debug.DebugLogger.*;
043    import org.opends.server.loggers.debug.DebugTracer;
044    import static org.opends.messages.CoreMessages.*;
045    import static org.opends.server.util.StaticUtils.*;
046    
047    
048    /**
049     * This class defines a data structure for storing and interacting with a
050     * Directory Server worker thread.
051     */
052    public class TraditionalWorkerThread
053           extends DirectoryThread
054    {
055      /**
056       * The tracer object for the debug logger.
057       */
058      private static final DebugTracer TRACER = getTracer();
059    
060      // Indicates whether the Directory Server is shutting down and this thread
061      // should stop running.
062      private boolean shutdownRequested;
063    
064      // Indicates whether this thread was stopped because the server threadnumber
065      // was reduced.
066      private boolean stoppedByReducedThreadNumber;
067    
068      // Indicates whether this thread is currently waiting for work.
069      private boolean waitingForWork;
070    
071      // The operation that this worker thread is currently processing.
072      private AbstractOperation operation;
073    
074      // The handle to the actual thread for this worker thread.
075      private Thread workerThread;
076    
077      // The work queue that this worker thread will service.
078      private TraditionalWorkQueue workQueue;
079    
080    
081    
082      /**
083       * Creates a new worker thread that will service the provided work queue and
084       * process any new requests that are submitted.
085       *
086       * @param  workQueue  The work queue with which this worker thread is
087       *                    associated.
088       * @param  threadID   The thread ID for this worker thread.
089       */
090      public TraditionalWorkerThread(TraditionalWorkQueue workQueue, int threadID)
091      {
092        super("Worker Thread " + threadID);
093    
094    
095        this.workQueue = workQueue;
096    
097        stoppedByReducedThreadNumber = false;
098        shutdownRequested            = false;
099        waitingForWork               = false;
100        operation                    = null;
101        workerThread                 = null;
102      }
103    
104    
105    
106      /**
107       * Indicates that this thread is about to be stopped because the Directory
108       * Server configuration has been updated to reduce the number of worker
109       * threads.
110       */
111      public void setStoppedByReducedThreadNumber()
112      {
113        stoppedByReducedThreadNumber = true;
114      }
115    
116    
117    
118      /**
119       * Indicates whether this worker thread is actively processing a request.
120       * Note that this is a point-in-time determination and if a reliable answer is
121       * expected then the server should impose some external constraint to ensure
122       * that no new requests are enqueued.
123       *
124       * @return  {@code true} if this worker thread is actively processing a
125       *          request, or {@code false} if it is idle.
126       */
127      public boolean isActive()
128      {
129        return (isAlive() && (operation != null));
130      }
131    
132    
133    
134      /**
135       * Operates in a loop, retrieving the next request from the work queue,
136       * processing it, and then going back to the queue for more.
137       */
138      public void run()
139      {
140        workerThread = currentThread();
141    
142        while (! shutdownRequested)
143        {
144          try
145          {
146            waitingForWork = true;
147            operation = null;
148            operation = workQueue.nextOperation(this);
149            waitingForWork = false;
150    
151    
152            if (operation == null)
153            {
154              // The operation may be null if the server is shutting down.  If that
155              // is the case, then break out of the while loop.
156              break;
157            }
158            else
159            {
160              // The operation is not null, so process it.  Make sure that when
161              // processing is complete.
162              operation.run();
163              operation.operationCompleted();
164            }
165          }
166          catch (Throwable t)
167          {
168            if (debugEnabled())
169            {
170              TRACER.debugWarning(
171                "Uncaught exception in worker thread while processing " +
172                    "operation %s: %s", String.valueOf(operation), t);
173    
174              TRACER.debugCaught(DebugLogLevel.ERROR, t);
175            }
176    
177            try
178            {
179              Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.
180                  get(getName(), String.valueOf(operation),
181                      stackTraceToSingleLineString(t));
182              logError(message);
183    
184              operation.setResultCode(DirectoryServer.getServerErrorResultCode());
185              operation.appendErrorMessage(message);
186              operation.getClientConnection().sendResponse(operation);
187            }
188            catch (Throwable t2)
189            {
190              if (debugEnabled())
191              {
192                TRACER.debugWarning(
193                  "Exception in worker thread while trying to log a " +
194                      "message about an uncaught exception %s: %s", t, t2);
195    
196                TRACER.debugCaught(DebugLogLevel.ERROR, t2);
197              }
198            }
199    
200    
201            try
202            {
203              Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(),
204                                          String.valueOf(operation),
205                                          stackTraceToSingleLineString(t));
206    
207              operation.disconnectClient(DisconnectReason.SERVER_ERROR,
208                                         true, message);
209            }
210            catch (Throwable t2)
211            {
212              if (debugEnabled())
213              {
214                TRACER.debugCaught(DebugLogLevel.ERROR, t2);
215              }
216            }
217          }
218        }
219    
220        // If we have gotten here, then we presume that the server thread is
221        // shutting down.  However, if that's not the case then that is a problem
222        // and we will want to log a message.
223        if (stoppedByReducedThreadNumber)
224        {
225          logError(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER.get(getName()));
226        }
227        else if (! workQueue.shutdownRequested())
228        {
229          logError(WARN_UNEXPECTED_WORKER_THREAD_EXIT.get(getName()));
230        }
231    
232    
233        if (debugEnabled())
234        {
235          TRACER.debugInfo(getName() + " exiting.");
236        }
237      }
238    
239    
240    
241      /**
242       * Indicates that the Directory Server has received a request to stop running
243       * and that this thread should stop running as soon as possible.
244       */
245      public void shutDown()
246      {
247        if (debugEnabled())
248        {
249          TRACER.debugInfo(getName() + " being signaled to shut down.");
250        }
251    
252        // Set a flag that indicates that the thread should stop running.
253        shutdownRequested = true;
254    
255    
256        // Check to see if the thread is waiting for work.  If so, then interrupt
257        // it.
258        if (waitingForWork)
259        {
260          try
261          {
262            workerThread.interrupt();
263          }
264          catch (Exception e)
265          {
266            if (debugEnabled())
267            {
268              TRACER.debugWarning(
269                "Caught an exception while trying to interrupt the worker " +
270                    "thread waiting for work: %s", e);
271              TRACER.debugCaught(DebugLogLevel.ERROR, e);
272            }
273          }
274        }
275        else
276        {
277          try
278          {
279            CancelRequest cancelRequest =
280              new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
281            operation.cancel(cancelRequest);
282          }
283          catch (Exception e)
284          {
285            if (debugEnabled())
286            {
287              TRACER.debugWarning(
288                "Caught an exception while trying to abandon the " +
289                    "operation in progress for the worker thread: %s", e);
290              TRACER.debugCaught(DebugLogLevel.ERROR, e);
291            }
292          }
293        }
294      }
295    
296      /**
297       * Retrieves any relevent debug information with which this tread is
298       * associated so they can be included in debug messages.
299       *
300       * @return debug information about this thread as a string.
301       */
302      public Map<String, String> getDebugProperties()
303      {
304        Map<String, String> properties = super.getDebugProperties();
305        properties.put("clientConnection",
306                       operation.getClientConnection().toString());
307        properties.put("operation", operation.toString());
308    
309        return properties;
310      }
311    }
312