WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * A way to enqueue a series of WvSubProc objects. See wvsubprocqueue.h. 00006 */ 00007 #include "wvsubprocqueue.h" 00008 #include <unistd.h> 00009 #include <assert.h> 00010 00011 00012 WvSubProcQueue::WvSubProcQueue(unsigned _maxrunning) 00013 { 00014 maxrunning = _maxrunning; 00015 } 00016 00017 00018 WvSubProcQueue::~WvSubProcQueue() 00019 { 00020 } 00021 00022 00023 void WvSubProcQueue::add(void *cookie, WvSubProc *proc) 00024 { 00025 assert(proc); 00026 assert(!proc->running); 00027 if (cookie) 00028 { 00029 // search for other enqueued objects with this cookie 00030 EntList::Iter i(waitq); 00031 for (i.rewind(); i.next(); ) 00032 { 00033 if (i->cookie == cookie) 00034 { 00035 // already enqueued; mark it as "redo" unless it's already 00036 // the last one. That way we guarantee it'll still run 00037 // in the future from now, and it'll come later than anything 00038 // else in the queue, but it won't pointlessly run twice at 00039 // the end. 00040 Ent *e = i.ptr(); 00041 if (i.next()) 00042 e->redo = true; 00043 delete proc; 00044 return; 00045 } 00046 } 00047 } 00048 00049 waitq.append(new Ent(cookie, proc), true); 00050 } 00051 00052 00053 void WvSubProcQueue::add(void *cookie, 00054 const char *cmd, const char * const *argv) 00055 { 00056 WvSubProc *p = new WvSubProc; 00057 p->preparev(cmd, argv); 00058 add(cookie, p); 00059 } 00060 00061 00062 bool WvSubProcQueue::cookie_running() 00063 { 00064 EntList::Iter i(runq); 00065 for (i.rewind(); i.next(); ) 00066 if (i->cookie) 00067 return true; 00068 return false; 00069 } 00070 00071 00072 int WvSubProcQueue::go() 00073 { 00074 int started = 0; 00075 00076 //fprintf(stderr, "go: %d waiting, %d running\n", 00077 // waitq.count(), runq.count()); 00078 00079 // first we need to clean up any finished processes 00080 { 00081 EntList::Iter i(runq); 00082 for (i.rewind(); i.next(); ) 00083 { 00084 Ent *e = i.ptr(); 00085 00086 e->proc->wait(0, true); 00087 if (!e->proc->running) 00088 { 00089 if (e->redo) 00090 { 00091 // someone re-enqueued this task while it was 00092 // waiting/running 00093 e->redo = false; 00094 i.xunlink(false); 00095 waitq.append(e, true); 00096 } 00097 else 00098 i.xunlink(); 00099 } 00100 } 00101 } 00102 00103 while (!waitq.isempty() && runq.count() < maxrunning) 00104 { 00105 EntList::Iter i(waitq); 00106 for (i.rewind(); i.next(); ) 00107 { 00108 // elements with cookies are "sync points" in the queue; 00109 // they guarantee that everything before that point has 00110 // finished running before they run, and don't let anything 00111 // after them run until they've finished. 00112 if (i->cookie && !runq.isempty()) 00113 goto out; 00114 if (cookie_running()) 00115 goto out; 00116 00117 // jump it into the running queue, but be careful not to 00118 // delete the object when removing! 00119 Ent *e = i.ptr(); 00120 i.xunlink(false); 00121 runq.append(e, true); 00122 e->proc->start_again(); 00123 started++; 00124 break; 00125 } 00126 } 00127 00128 out: 00129 assert(runq.count() <= maxrunning); 00130 return started; 00131 } 00132 00133 00134 unsigned WvSubProcQueue::running() const 00135 { 00136 return runq.count(); 00137 } 00138 00139 00140 unsigned WvSubProcQueue::remaining() const 00141 { 00142 return runq.count() + waitq.count(); 00143 } 00144 00145 00146 bool WvSubProcQueue::isempty() const 00147 { 00148 return runq.isempty() && waitq.isempty(); 00149 } 00150 00151 00152 void WvSubProcQueue::finish() 00153 { 00154 while (!isempty()) 00155 { 00156 go(); 00157 if (!isempty()) 00158 usleep(100*1000); 00159 } 00160 }