WvStreams
wvistreamlist.cc
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * WvIStreamList holds a list of IWvStream objects -- and its select() and
00006  * callback() functions know how to handle multiple simultaneous streams.
00007  */
00008 #include "wvistreamlist.h"
00009 #include "wvstringlist.h"
00010 #include "wvstreamsdebugger.h"
00011 #include "wvstrutils.h"
00012 
00013 #include "wvassert.h"
00014 #include "wvstrutils.h"
00015 
00016 #ifndef _WIN32
00017 #include "wvfork.h"
00018 #endif
00019 
00020 #ifdef HAVE_VALGRIND_MEMCHECK_H
00021 #include <valgrind/memcheck.h>
00022 #else
00023 #define RUNNING_ON_VALGRIND false
00024 #endif
00025 
00026 // enable this to add some read/write trace messages (this can be VERY
00027 // verbose)
00028 #define STREAMTRACE 0
00029 #if STREAMTRACE
00030 # define TRACE(x, y...) fprintf(stderr, x, ## y)
00031 #else
00032 #ifndef _MSC_VER
00033 # define TRACE(x, y...)
00034 #else
00035 # define TRACE
00036 #endif
00037 #endif
00038 
00039 WvIStreamList WvIStreamList::globallist;
00040 
00041 
00042 WvIStreamList::WvIStreamList():
00043     in_select(false), dead_stream(false)
00044 {
00045     readcb = writecb = exceptcb = 0;
00046     auto_prune = true;
00047     if (this == &globallist)
00048     {
00049         globalstream = this;
00050 #ifndef _WIN32
00051         add_wvfork_callback(WvIStreamList::onfork);
00052 #endif
00053         set_wsname("globallist");
00054         add_debugger_commands();
00055     }
00056 }
00057 
00058 
00059 WvIStreamList::~WvIStreamList()
00060 {
00061     close();
00062 }
00063 
00064 
00065 bool WvIStreamList::isok() const
00066 {
00067     return WvStream::isok();
00068 }
00069 
00070 
00071 class BoolGuard
00072 {
00073 public:
00074     BoolGuard(bool &_guard_bool):
00075         guard_bool(_guard_bool)
00076     {
00077         assert(!guard_bool);
00078         guard_bool = true;
00079     }
00080     ~BoolGuard()
00081     {
00082         guard_bool = false;
00083     }
00084 private:
00085     bool &guard_bool;
00086 };
00087 
00088 
00089 void WvIStreamList::pre_select(SelectInfo &si)
00090 {
00091     //BoolGuard guard(in_select);
00092     bool already_sure = false;
00093     SelectRequest oldwant = si.wants;
00094     
00095     sure_thing.zap();
00096     
00097     time_t alarmleft = alarm_remaining();
00098     if (alarmleft == 0)
00099         already_sure = true;
00100 
00101     IWvStream *old_in_stream = WvCrashInfo::in_stream;
00102     const char *old_in_stream_id = WvCrashInfo::in_stream_id;
00103     WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
00104     WvCrashInfo::in_stream_state = WvCrashInfo::PRE_SELECT;
00105 
00106     Iter i(*this);
00107     for (i.rewind(); i.next(); )
00108     {
00109         IWvStream &s(*i);
00110 #if I_ENJOY_FORMATTING_STRINGS
00111         WvCrashWill will("doing pre_select for \"%s\" (%s)\n%s",
00112                          i.link->id, ptr2str(&s), wvcrash_read_will());
00113 #else
00114         WvCrashInfo::in_stream = &s;
00115         WvCrashInfo::in_stream_id = i.link->id;
00116 #endif
00117         si.wants = oldwant;
00118         s.pre_select(si);
00119         
00120         if (!s.isok())
00121             already_sure = true;
00122 
00123         TRACE("after pre_select(%s): msec_timeout is %ld\n",
00124               i.link->id, (long)si.msec_timeout);
00125     }
00126 
00127     WvCrashInfo::in_stream = old_in_stream;
00128     WvCrashInfo::in_stream_id = old_in_stream_id;
00129     WvCrashInfo::in_stream_state = old_in_stream_state;
00130 
00131     if (alarmleft >= 0 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00132         si.msec_timeout = alarmleft;
00133     
00134     si.wants = oldwant;
00135 
00136     if (already_sure)
00137         si.msec_timeout = 0;
00138 }
00139 
00140 
00141 bool WvIStreamList::post_select(SelectInfo &si)
00142 {
00143     //BoolGuard guard(in_select);
00144     bool already_sure = false;
00145     SelectRequest oldwant = si.wants;
00146     
00147     time_t alarmleft = alarm_remaining();
00148     if (alarmleft == 0)
00149         already_sure = true;
00150 
00151     IWvStream *old_in_stream = WvCrashInfo::in_stream;
00152     const char *old_in_stream_id = WvCrashInfo::in_stream_id;
00153     WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
00154     WvCrashInfo::in_stream_state = WvCrashInfo::POST_SELECT;
00155 
00156     Iter i(*this);
00157     for (i.rewind(); i.cur() && i.next(); )
00158     {
00159         IWvStream &s(*i);
00160 #if I_ENJOY_FORMATTING_STRINGS
00161         WvCrashWill will("doing post_select for \"%s\" (%s)\n%s",
00162                          i.link->id, ptr2str(&s), wvcrash_read_will());
00163 #else
00164         WvCrashInfo::in_stream = &s;
00165         WvCrashInfo::in_stream_id = i.link->id;
00166 #endif
00167 
00168         si.wants = oldwant;
00169         if (s.post_select(si))
00170         {
00171             TRACE("post_select(%s) was true\n", i.link->id);
00172             sure_thing.unlink(&s); // don't add it twice!
00173             s.addRef();
00174             sure_thing.append(&s, true, i.link->id);
00175         }
00176         else
00177         {
00178             TRACE("post_select(%s) was false\n", i.link->id);
00179             WvIStreamListBase::Iter j(sure_thing);
00180             WvLink* link = j.find(&s);
00181             
00182             wvassert(!link, "stream \"%s\" (%s) was ready in "
00183                      "pre_select, but not in post_select",
00184                      link->id, ptr2str(link->data));
00185         }
00186         
00187         if (!s.isok())
00188         {
00189             already_sure = true;
00190             if (auto_prune)
00191                 i.xunlink();
00192         }
00193     }
00194     
00195     WvCrashInfo::in_stream = old_in_stream;
00196     WvCrashInfo::in_stream_id = old_in_stream_id;
00197     WvCrashInfo::in_stream_state = old_in_stream_state;
00198 
00199     si.wants = oldwant;
00200     return already_sure || !sure_thing.isempty();
00201 }
00202 
00203 
00204 // distribute the callback() request to all children that select 'true'
00205 void WvIStreamList::execute()
00206 {
00207     static int level = 0;
00208     const char *id;
00209     level++;
00210     
00211     WvStream::execute();
00212     
00213     TRACE("\n%*sList@%p: (%d sure) ", level, "", this, sure_thing.count());
00214     
00215     IWvStream *old_in_stream = WvCrashInfo::in_stream;
00216     const char *old_in_stream_id = WvCrashInfo::in_stream_id;
00217     WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
00218     WvCrashInfo::in_stream_state = WvCrashInfo::EXECUTE;
00219 
00220     WvIStreamListBase::Iter i(sure_thing);
00221     for (i.rewind(); i.next(); )
00222     {
00223 #if STREAMTRACE
00224         WvIStreamListBase::Iter x(*this);
00225         if (!x.find(&i()))
00226             TRACE("Yikes! %p in sure_thing, but not in main list!\n",
00227                   i.cur());
00228 #endif
00229         IWvStream &s(*i);
00230         s.addRef();
00231         
00232         id = i.link->id;
00233 
00234         TRACE("[%p:%s]", &s, id);
00235         
00236         i.xunlink();
00237         
00238 #if DEBUG
00239         if (!RUNNING_ON_VALGRIND)
00240         {
00241             WvString strace_node("%s: %s", s.wstype(), s.wsname());
00242             ::write(-1, strace_node, strace_node.len()); 
00243         }
00244 #endif
00245 #if I_ENJOY_FORMATTING_STRINGS
00246         WvCrashWill my_will("executing stream: %s\n%s",
00247                             id ? id : "unknown stream",
00248                             wvcrash_read_will());
00249 #else
00250         WvCrashInfo::in_stream = &s;
00251         WvCrashInfo::in_stream_id = id;
00252 #endif
00253         
00254         s.callback();
00255         s.release();
00256         
00257         // list might have changed!
00258         i.rewind();
00259     }
00260     
00261     WvCrashInfo::in_stream = old_in_stream;
00262     WvCrashInfo::in_stream_id = old_in_stream_id;
00263     WvCrashInfo::in_stream_state = old_in_stream_state;
00264 
00265     sure_thing.zap();
00266 
00267     level--;
00268     TRACE("[DONE %p]\n", this);
00269 }
00270 
00271 #ifndef _WIN32
00272 void WvIStreamList::onfork(pid_t p)
00273 {
00274     if (p == 0)
00275     {
00276         // this is a child process: don't inherit the global streamlist
00277         globallist.zap(false);
00278     }
00279 }
00280 #endif
00281 
00282 
00283 void WvIStreamList::add_debugger_commands()
00284 {
00285     WvStreamsDebugger::add_command("globallist", 0, debugger_globallist_run_cb, 0);
00286 }
00287 
00288 
00289 WvString WvIStreamList::debugger_globallist_run_cb(WvStringParm cmd,
00290     WvStringList &args,
00291     WvStreamsDebugger::ResultCallback result_cb, void *)
00292 {
00293     debugger_streams_display_header(cmd, result_cb);
00294     WvIStreamList::Iter i(globallist);
00295     for (i.rewind(); i.next(); )
00296         debugger_streams_maybe_display_one_stream(static_cast<WvStream *>(i.ptr()),
00297                 cmd, args, result_cb);
00298     
00299     return WvString::null;
00300 }
00301