WvStreams
wvsubprocqueue.cc
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * A way to enqueue a series of WvSubProc objects.  See wvsubprocqueue.h.
00006  */
00007 #include "wvsubprocqueue.h"
00008 #include <unistd.h>
00009 #include <assert.h>
00010 
00011 
00012 WvSubProcQueue::WvSubProcQueue(unsigned _maxrunning)
00013 {
00014     maxrunning = _maxrunning;
00015 }
00016 
00017 
00018 WvSubProcQueue::~WvSubProcQueue()
00019 {
00020 }
00021 
00022 
00023 void WvSubProcQueue::add(void *cookie, WvSubProc *proc)
00024 {
00025     assert(proc);
00026     assert(!proc->running);
00027     if (cookie)
00028     {
00029         // search for other enqueued objects with this cookie
00030         EntList::Iter i(waitq);
00031         for (i.rewind(); i.next(); )
00032         {
00033             if (i->cookie == cookie)
00034             {
00035                 // already enqueued; mark it as "redo" unless it's already
00036                 // the last one.  That way we guarantee it'll still run
00037                 // in the future from now, and it'll come later than anything
00038                 // else in the queue, but it won't pointlessly run twice at
00039                 // the end.
00040                 Ent *e = i.ptr();
00041                 if (i.next())
00042                     e->redo = true;
00043                 delete proc;
00044                 return;
00045             }
00046         }
00047     }
00048     
00049     waitq.append(new Ent(cookie, proc), true);
00050 }
00051 
00052 
00053 void WvSubProcQueue::add(void *cookie,
00054                          const char *cmd, const char * const *argv)
00055 {
00056     WvSubProc *p = new WvSubProc;
00057     p->preparev(cmd, argv);
00058     add(cookie, p);
00059 }
00060 
00061 
00062 bool WvSubProcQueue::cookie_running()
00063 {
00064     EntList::Iter i(runq);
00065     for (i.rewind(); i.next(); )
00066         if (i->cookie)
00067             return true;
00068     return false;
00069 }
00070 
00071 
00072 int WvSubProcQueue::go()
00073 {
00074     int started = 0;
00075     
00076     //fprintf(stderr, "go: %d waiting, %d running\n",
00077     //  waitq.count(), runq.count());
00078     
00079     // first we need to clean up any finished processes
00080     {
00081         EntList::Iter i(runq);
00082         for (i.rewind(); i.next(); )
00083         {
00084             Ent *e = i.ptr();
00085             
00086             e->proc->wait(0, true);
00087             if (!e->proc->running)
00088             {
00089                 if (e->redo)
00090                 {
00091                     // someone re-enqueued this task while it was
00092                     // waiting/running
00093                     e->redo = false;
00094                     i.xunlink(false);
00095                     waitq.append(e, true);
00096                 }
00097                 else
00098                     i.xunlink();
00099             }
00100         }
00101     }
00102     
00103     while (!waitq.isempty() && runq.count() < maxrunning)
00104     {
00105         EntList::Iter i(waitq);
00106         for (i.rewind(); i.next(); )
00107         {
00108             // elements with cookies are "sync points" in the queue;
00109             // they guarantee that everything before that point has
00110             // finished running before they run, and don't let anything
00111             // after them run until they've finished.
00112             if (i->cookie && !runq.isempty())
00113                 goto out;
00114             if (cookie_running())
00115                 goto out;
00116             
00117             // jump it into the running queue, but be careful not to
00118             // delete the object when removing!
00119             Ent *e = i.ptr();
00120             i.xunlink(false);
00121             runq.append(e, true);
00122             e->proc->start_again();
00123             started++;
00124             break;
00125         }
00126     }
00127     
00128 out:
00129     assert(runq.count() <= maxrunning);
00130     return started;
00131 }
00132 
00133 
00134 unsigned WvSubProcQueue::running() const
00135 {
00136     return runq.count();
00137 }
00138 
00139 
00140 unsigned WvSubProcQueue::remaining() const
00141 {
00142     return runq.count() + waitq.count();
00143 }
00144 
00145 
00146 bool WvSubProcQueue::isempty() const
00147 {
00148     return runq.isempty() && waitq.isempty();
00149 }
00150 
00151 
00152 void WvSubProcQueue::finish()
00153 {
00154     while (!isempty())
00155     {
00156         go();
00157         if (!isempty())
00158             usleep(100*1000);
00159     }
00160 }