001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.extensions;
028    
029    
030    
031    import java.util.ArrayList;
032    import java.util.Iterator;
033    import java.util.LinkedList;
034    import java.util.List;
035    import java.util.concurrent.LinkedBlockingQueue;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicLong;
038    
039    import org.opends.messages.Message;
040    import org.opends.server.admin.server.ConfigurationChangeListener;
041    import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
042    import org.opends.server.api.WorkQueue;
043    import org.opends.server.config.ConfigException;
044    import org.opends.server.core.DirectoryServer;
045    import org.opends.server.loggers.debug.DebugTracer;
046    import org.opends.server.monitors.TraditionalWorkQueueMonitor;
047    import org.opends.server.types.AbstractOperation;
048    import org.opends.server.types.CancelRequest;
049    import org.opends.server.types.ConfigChangeResult;
050    import org.opends.server.types.DebugLogLevel;
051    import org.opends.server.types.DirectoryException;
052    import org.opends.server.types.DN;
053    import org.opends.server.types.InitializationException;
054    import org.opends.server.types.Operation;
055    import org.opends.server.types.ResultCode;
056    
057    import static org.opends.messages.ConfigMessages.*;
058    import static org.opends.messages.CoreMessages.*;
059    import static org.opends.server.loggers.ErrorLogger.*;
060    import static org.opends.server.loggers.debug.DebugLogger.*;
061    
062    
063    
064    /**
065     * This class defines a data structure for storing and interacting with the
066     * Directory Server work queue.
067     */
068    public class TraditionalWorkQueue
069           extends WorkQueue<TraditionalWorkQueueCfg>
070           implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
071    {
072      /**
073       * The tracer object for the debug logger.
074       */
075      private static final DebugTracer TRACER = getTracer();
076    
077    
078    
079    
080      /**
081       * The maximum number of times to retry getting the next operation from the
082       * queue if an unexpected failure occurs.
083       */
084      private static final int MAX_RETRY_COUNT = 5;
085    
086    
087    
088      // The set of worker threads that will be used to process this work queue.
089      private ArrayList<TraditionalWorkerThread> workerThreads;
090    
091      // The number of operations that have been submitted to the work queue for
092      // processing.
093      private AtomicLong opsSubmitted;
094    
095      // The number of times that an attempt to submit a new request has been
096      // rejected because the work queue is already at its maximum capacity.
097      private AtomicLong queueFullRejects;
098    
099      // Indicates whether one or more of the worker threads needs to be killed at
100      // the next convenient opportunity.
101      private boolean killThreads;
102    
103      // Indicates whether the Directory Server is shutting down.
104      private boolean shutdownRequested;
105    
106      // The DN of the configuration entry with information to use to configure the
107      // work queue.
108      private DN configEntryDN;
109    
110      // The thread number used for the last worker thread that was created.
111      private int lastThreadNumber;
112    
113      // The maximum number of pending requests that this work queue will allow
114      // before it will start rejecting them.
115      private int maxCapacity;
116    
117      // The number of worker threads that should be active (or will be shortly if
118      // a configuration change has not been completely applied).
119      private int numWorkerThreads;
120    
121      // The queue that will be used to actually hold the pending operations.
122      private LinkedBlockingQueue<AbstractOperation> opQueue;
123    
124      // The lock used to provide threadsafe access for the queue.
125      private Object queueLock;
126    
127    
128    
129      /**
130       * Creates a new instance of this work queue.  All initialization should be
131       * performed in the <CODE>initializeWorkQueue</CODE> method.
132       */
133      public TraditionalWorkQueue()
134      {
135        // No implementation should be performed here.
136      }
137    
138    
139    
140      /**
141       * {@inheritDoc}
142       */
143      @Override()
144      public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
145             throws ConfigException, InitializationException
146      {
147        shutdownRequested = false;
148        killThreads       = false;
149        opsSubmitted      = new AtomicLong(0);
150        queueFullRejects  = new AtomicLong(0);
151        queueLock         = new Object();
152    
153    
154        // Register to be notified of any configuration changes.
155        configuration.addTraditionalChangeListener(this);
156    
157    
158        // Get the necessary configuration from the provided entry.
159        configEntryDN    = configuration.dn();
160        numWorkerThreads = configuration.getNumWorkerThreads();
161        maxCapacity      = configuration.getMaxWorkQueueCapacity();
162    
163    
164        // Create the actual work queue.
165        if (maxCapacity > 0)
166        {
167          opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
168        }
169        else
170        {
171          opQueue = new LinkedBlockingQueue<AbstractOperation>();
172        }
173    
174    
175        // Create the set of worker threads that should be used to service the
176        // work queue.
177        workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads);
178        for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
179        lastThreadNumber++)
180        {
181          TraditionalWorkerThread t =
182               new TraditionalWorkerThread(this, lastThreadNumber);
183          t.start();
184          workerThreads.add(t);
185        }
186    
187    
188        // Create and register a monitor provider for the work queue.
189        try
190        {
191          TraditionalWorkQueueMonitor monitor =
192               new TraditionalWorkQueueMonitor(this);
193          monitor.initializeMonitorProvider(null);
194          monitor.start();
195          DirectoryServer.registerMonitorProvider(monitor);
196        }
197        catch (Exception e)
198        {
199          if (debugEnabled())
200          {
201            TRACER.debugCaught(DebugLogLevel.ERROR, e);
202          }
203    
204          Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
205              String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e));
206          logError(message);
207        }
208      }
209    
210    
211    
212      /**
213       * {@inheritDoc}
214       */
215      @Override()
216      public void finalizeWorkQueue(Message reason)
217      {
218        shutdownRequested = true;
219    
220    
221        // Send responses to any operations in the pending queue to indicate that
222        // they won't be processed because the server is shutting down.
223        CancelRequest cancelRequest = new CancelRequest(true, reason);
224        ArrayList<Operation> pendingOperations = new ArrayList<Operation>();
225        opQueue.removeAll(pendingOperations);
226        for (Operation o : pendingOperations)
227        {
228          try
229          {
230            // The operation has no chance of responding to the cancel
231            // request so avoid waiting for a cancel response.
232            if (o.getCancelResult() == null) {
233              o.abort(cancelRequest);
234            }
235          }
236          catch (Exception e)
237          {
238            if (debugEnabled())
239            {
240              TRACER.debugCaught(DebugLogLevel.ERROR, e);
241            }
242    
243            logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
244                String.valueOf(o), String.valueOf(e)));
245          }
246        }
247    
248    
249        // Notify all the worker threads of the shutdown.
250        for (TraditionalWorkerThread t : workerThreads)
251        {
252          try
253          {
254            t.shutDown();
255          }
256          catch (Exception e)
257          {
258            if (debugEnabled())
259            {
260              TRACER.debugCaught(DebugLogLevel.ERROR, e);
261            }
262    
263            logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
264                t.getName(), String.valueOf(e)));
265          }
266        }
267      }
268    
269    
270    
271      /**
272       * Indicates whether this work queue has received a request to shut down.
273       *
274       * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
275       *          down, or <CODE>false</CODE> if not.
276       */
277      public boolean shutdownRequested()
278      {
279        return shutdownRequested;
280      }
281    
282    
283    
284      /**
285       * Submits an operation to be processed by one of the worker threads
286       * associated with this work queue.
287       *
288       * @param  operation  The operation to be processed.
289       *
290       * @throws  DirectoryException  If the provided operation is not accepted for
291       *                              some reason (e.g., if the server is shutting
292       *                              down or the pending operation queue is already
293       *                              at its maximum capacity).
294       */
295      public void submitOperation(AbstractOperation operation)
296             throws DirectoryException
297      {
298        if (shutdownRequested)
299        {
300          Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
301          throw new DirectoryException(ResultCode.UNAVAILABLE, message);
302        }
303    
304        if (! opQueue.offer(operation))
305        {
306          queueFullRejects.incrementAndGet();
307    
308          Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
309          throw new DirectoryException(ResultCode.UNAVAILABLE, message);
310        }
311    
312        opsSubmitted.incrementAndGet();
313      }
314    
315    
316    
317      /**
318       * Retrieves the next operation that should be processed by one of the worker
319       * threads, blocking if necessary until a new request arrives.  This method
320       * should only be called by a worker thread associated with this work queue.
321       *
322       * @param  workerThread  The worker thread that is requesting the operation.
323       *
324       * @return  The next operation that should be processed, or <CODE>null</CODE>
325       *          if the server is shutting down and no more operations will be
326       *          processed.
327       */
328      public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
329      {
330        return retryNextOperation(workerThread, 0);
331      }
332    
333    
334    
335      /**
336       * Retrieves the next operation that should be processed by one of the worker
337       * threads following a previous failure attempt.  A maximum of five
338       * consecutive failures will be allowed before returning <CODE>null</CODE>,
339       * which will cause the associated thread to exit.
340       *
341       * @param  workerThread  The worker thread that is requesting the operation.
342       * @param  numFailures   The number of consecutive failures that the worker
343       *                       thread has experienced so far.  If this gets too
344       *                       high, then this method will return <CODE>null</CODE>
345       *                       rather than retrying.
346       *
347       * @return  The next operation that should be processed, or <CODE>null</CODE>
348       *          if the server is shutting down and no more operations will be
349       *          processed, or if there have been too many consecutive failures.
350       */
351      private AbstractOperation retryNextOperation(
352                                           TraditionalWorkerThread workerThread,
353                                           int numFailures)
354      {
355        // See if we should kill off this thread.  This could be necessary if the
356        // number of worker threads has been decreased with the server online.  If
357        // so, then return null and the thread will exit.
358        if (killThreads)
359        {
360          synchronized (queueLock)
361          {
362            try
363            {
364              int currentThreads = workerThreads.size();
365              if (currentThreads > numWorkerThreads)
366              {
367                if (workerThreads.remove(Thread.currentThread()))
368                {
369                  currentThreads--;
370                }
371    
372                if (currentThreads <= numWorkerThreads)
373                {
374                  killThreads = false;
375                }
376    
377                workerThread.setStoppedByReducedThreadNumber();
378                return null;
379              }
380            }
381            catch (Exception e)
382            {
383              if (debugEnabled())
384              {
385                TRACER.debugCaught(DebugLogLevel.ERROR, e);
386              }
387            }
388          }
389        }
390    
391        if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
392        {
393          if (numFailures > MAX_RETRY_COUNT)
394          {
395            Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
396                Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
397            logError(message);
398          }
399    
400          return null;
401        }
402    
403        try
404        {
405          while (true)
406          {
407            AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
408            if (nextOperation == null)
409            {
410              // There was no work to do in the specified length of time.  See if
411              // we should shutdown, and if not then just check again.
412              if (shutdownRequested)
413              {
414                return null;
415              }
416              else if (killThreads)
417              {
418                synchronized (queueLock)
419                {
420                  try
421                  {
422                    int currentThreads = workerThreads.size();
423                    if (currentThreads > numWorkerThreads)
424                    {
425                      if (workerThreads.remove(Thread.currentThread()))
426                      {
427                        currentThreads--;
428                      }
429    
430                      if (currentThreads <= numWorkerThreads)
431                      {
432                        killThreads = false;
433                      }
434    
435                      workerThread.setStoppedByReducedThreadNumber();
436                      return null;
437                    }
438                  }
439                  catch (Exception e)
440                  {
441                    if (debugEnabled())
442                    {
443                      TRACER.debugCaught(DebugLogLevel.ERROR, e);
444                    }
445                  }
446                }
447              }
448            }
449            else
450            {
451              return nextOperation;
452            }
453          }
454        }
455        catch (InterruptedException ie)
456        {
457          // This is somewhat expected so don't log.
458          //      assert debugException(CLASS_NAME, "retryNextOperation", ie);
459    
460          // If this occurs, then the worker thread must have been interrupted for
461          // some reason.  This could be because the Directory Server is shutting
462          // down, in which case we should return null.
463          if (shutdownRequested)
464          {
465            return null;
466          }
467    
468          // If we've gotten here, then the worker thread was interrupted for some
469          // other reason.  This should not happen, and we need to log a message.
470          logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(
471              Thread.currentThread().getName(), String.valueOf(ie)));
472          return retryNextOperation(workerThread, numFailures+1);
473        }
474        catch (Exception e)
475        {
476          if (debugEnabled())
477          {
478            TRACER.debugCaught(DebugLogLevel.ERROR, e);
479          }
480    
481          // This should not happen.  The only recourse we have is to log a message
482          // and try again.
483          logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
484              Thread.currentThread().getName(), String.valueOf(e)));
485          return retryNextOperation(workerThread, numFailures + 1);
486        }
487      }
488    
489    
490    
491      /**
492       * Attempts to remove the specified operation from this queue if it has not
493       * yet been picked up for processing by one of the worker threads.
494       *
495       * @param  operation  The operation to remove from the queue.
496       *
497       * @return  <CODE>true</CODE> if the provided request was present in the queue
498       *          and was removed successfully, or <CODE>false</CODE> it not.
499       */
500      public boolean removeOperation(AbstractOperation operation)
501      {
502        return opQueue.remove(operation);
503      }
504    
505    
506    
507      /**
508       * Retrieves the total number of operations that have been successfully
509       * submitted to this work queue for processing since server startup.  This
510       * does not include operations that have been rejected for some reason like
511       * the queue already at its maximum capacity.
512       *
513       * @return  The total number of operations that have been successfully
514       *          submitted to this work queue since startup.
515       */
516      public long getOpsSubmitted()
517      {
518        return opsSubmitted.longValue();
519      }
520    
521    
522    
523      /**
524       * Retrieves the total number of operations that have been rejected because
525       * the work queue was already at its maximum capacity.
526       *
527       * @return  The total number of operations that have been rejected because the
528       *          work queue was already at its maximum capacity.
529       */
530      public long getOpsRejectedDueToQueueFull()
531      {
532        return queueFullRejects.longValue();
533      }
534    
535    
536    
537      /**
538       * Retrieves the number of pending operations in the queue that have not yet
539       * been picked up for processing.  Note that this method is not a
540       * constant-time operation and can be relatively inefficient, so it should be
541       * used sparingly.
542       *
543       * @return  The number of pending operations in the queue that have not yet
544       *          been picked up for processing.
545       */
546      public int size()
547      {
548        return opQueue.size();
549      }
550    
551    
552    
553      /**
554       * {@inheritDoc}
555       */
556      public boolean isConfigurationChangeAcceptable(
557                          TraditionalWorkQueueCfg configuration,
558                          List<Message> unacceptableReasons)
559      {
560        // The provided configuration will always be acceptable.
561        return true;
562      }
563    
564    
565    
566      /**
567       * {@inheritDoc}
568       */
569      public ConfigChangeResult applyConfigurationChange(
570                                     TraditionalWorkQueueCfg configuration)
571      {
572        ArrayList<Message> resultMessages = new ArrayList<Message>();
573        int newNumThreads  = configuration.getNumWorkerThreads();
574        int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
575    
576    
577        // Apply a change to the number of worker threads if appropriate.
578        int currentThreads = workerThreads.size();
579        if (newNumThreads != currentThreads)
580        {
581          synchronized (queueLock)
582          {
583            try
584            {
585              int threadsToAdd = newNumThreads - currentThreads;
586              if (threadsToAdd > 0)
587              {
588                for (int i=0; i < threadsToAdd; i++)
589                {
590                  TraditionalWorkerThread t =
591                       new TraditionalWorkerThread(this, lastThreadNumber++);
592                  workerThreads.add(t);
593                  t.start();
594                }
595    
596                killThreads = false;
597              }
598              else
599              {
600                killThreads = true;
601              }
602    
603              numWorkerThreads = newNumThreads;
604            }
605            catch (Exception e)
606            {
607              if (debugEnabled())
608              {
609                TRACER.debugCaught(DebugLogLevel.ERROR, e);
610              }
611            }
612          }
613        }
614    
615    
616        // Apply a change to the maximum capacity if appropriate.  Since we can't
617        // change capacity on the fly, then we'll have to create a new queue and
618        // transfer any remaining items into it.  Any thread that is waiting on the
619        // original queue will time out after at most a few seconds and further
620        // checks will be against the new queue.
621        if (newMaxCapacity != maxCapacity)
622        {
623          synchronized (queueLock)
624          {
625            try
626            {
627              LinkedBlockingQueue<AbstractOperation> newOpQueue;
628              if (newMaxCapacity > 0)
629              {
630                newOpQueue =
631                  new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
632              }
633              else
634              {
635                newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
636              }
637    
638              LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
639              opQueue = newOpQueue;
640    
641              LinkedList<AbstractOperation> pendingOps =
642                new LinkedList<AbstractOperation>();
643              oldOpQueue.drainTo(pendingOps);
644    
645    
646              // We have to be careful when adding any existing pending operations
647              // because the new capacity could be less than what was already
648              // backlogged in the previous queue.  If that happens, we may have to
649              // loop a few times to get everything in there.
650              while (! pendingOps.isEmpty())
651              {
652                Iterator<AbstractOperation> iterator = pendingOps.iterator();
653                while (iterator.hasNext())
654                {
655                  AbstractOperation o = iterator.next();
656                  try
657                  {
658                    if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
659                    {
660                      iterator.remove();
661                    }
662                  }
663                  catch (InterruptedException ie)
664                  {
665                    if (debugEnabled())
666                    {
667                      TRACER.debugCaught(DebugLogLevel.ERROR, ie);
668                    }
669                  }
670                }
671              }
672    
673              maxCapacity = newMaxCapacity;
674            }
675            catch (Exception e)
676            {
677              if (debugEnabled())
678              {
679                TRACER.debugCaught(DebugLogLevel.ERROR, e);
680              }
681            }
682          }
683        }
684    
685    
686        return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
687      }
688    
689    
690    
691      /**
692       * {@inheritDoc}
693       */
694      @Override()
695      public boolean isIdle()
696      {
697        if (opQueue.size() > 0)
698        {
699          return false;
700        }
701    
702        synchronized (queueLock)
703        {
704          for (TraditionalWorkerThread t : workerThreads)
705          {
706            if (t.isActive())
707            {
708              return false;
709            }
710          }
711    
712          return true;
713        }
714      }
715    }
716