WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * WvIStreamList holds a list of IWvStream objects -- and its select() and 00006 * callback() functions know how to handle multiple simultaneous streams. 00007 */ 00008 #include "wvistreamlist.h" 00009 #include "wvstringlist.h" 00010 #include "wvstreamsdebugger.h" 00011 #include "wvstrutils.h" 00012 00013 #include "wvassert.h" 00014 #include "wvstrutils.h" 00015 00016 #ifndef _WIN32 00017 #include "wvfork.h" 00018 #endif 00019 00020 #ifdef HAVE_VALGRIND_MEMCHECK_H 00021 #include <valgrind/memcheck.h> 00022 #else 00023 #define RUNNING_ON_VALGRIND false 00024 #endif 00025 00026 // enable this to add some read/write trace messages (this can be VERY 00027 // verbose) 00028 #define STREAMTRACE 0 00029 #if STREAMTRACE 00030 # define TRACE(x, y...) fprintf(stderr, x, ## y) 00031 #else 00032 #ifndef _MSC_VER 00033 # define TRACE(x, y...) 00034 #else 00035 # define TRACE 00036 #endif 00037 #endif 00038 00039 WvIStreamList WvIStreamList::globallist; 00040 00041 00042 WvIStreamList::WvIStreamList(): 00043 in_select(false), dead_stream(false) 00044 { 00045 readcb = writecb = exceptcb = 0; 00046 auto_prune = true; 00047 if (this == &globallist) 00048 { 00049 globalstream = this; 00050 #ifndef _WIN32 00051 add_wvfork_callback(WvIStreamList::onfork); 00052 #endif 00053 set_wsname("globallist"); 00054 add_debugger_commands(); 00055 } 00056 } 00057 00058 00059 WvIStreamList::~WvIStreamList() 00060 { 00061 close(); 00062 } 00063 00064 00065 bool WvIStreamList::isok() const 00066 { 00067 return WvStream::isok(); 00068 } 00069 00070 00071 class BoolGuard 00072 { 00073 public: 00074 BoolGuard(bool &_guard_bool): 00075 guard_bool(_guard_bool) 00076 { 00077 assert(!guard_bool); 00078 guard_bool = true; 00079 } 00080 ~BoolGuard() 00081 { 00082 guard_bool = false; 00083 } 00084 private: 00085 bool &guard_bool; 00086 }; 00087 00088 00089 void WvIStreamList::pre_select(SelectInfo &si) 00090 { 00091 //BoolGuard guard(in_select); 00092 bool already_sure = false; 00093 SelectRequest oldwant = si.wants; 00094 00095 sure_thing.zap(); 00096 00097 time_t alarmleft = alarm_remaining(); 00098 if (alarmleft == 0) 00099 already_sure = true; 00100 00101 IWvStream *old_in_stream = WvCrashInfo::in_stream; 00102 const char *old_in_stream_id = WvCrashInfo::in_stream_id; 00103 WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state; 00104 WvCrashInfo::in_stream_state = WvCrashInfo::PRE_SELECT; 00105 00106 Iter i(*this); 00107 for (i.rewind(); i.next(); ) 00108 { 00109 IWvStream &s(*i); 00110 #if I_ENJOY_FORMATTING_STRINGS 00111 WvCrashWill will("doing pre_select for \"%s\" (%s)\n%s", 00112 i.link->id, ptr2str(&s), wvcrash_read_will()); 00113 #else 00114 WvCrashInfo::in_stream = &s; 00115 WvCrashInfo::in_stream_id = i.link->id; 00116 #endif 00117 si.wants = oldwant; 00118 s.pre_select(si); 00119 00120 if (!s.isok()) 00121 already_sure = true; 00122 00123 TRACE("after pre_select(%s): msec_timeout is %ld\n", 00124 i.link->id, (long)si.msec_timeout); 00125 } 00126 00127 WvCrashInfo::in_stream = old_in_stream; 00128 WvCrashInfo::in_stream_id = old_in_stream_id; 00129 WvCrashInfo::in_stream_state = old_in_stream_state; 00130 00131 if (alarmleft >= 0 && (alarmleft < si.msec_timeout || si.msec_timeout < 0)) 00132 si.msec_timeout = alarmleft; 00133 00134 si.wants = oldwant; 00135 00136 if (already_sure) 00137 si.msec_timeout = 0; 00138 } 00139 00140 00141 bool WvIStreamList::post_select(SelectInfo &si) 00142 { 00143 //BoolGuard guard(in_select); 00144 bool already_sure = false; 00145 SelectRequest oldwant = si.wants; 00146 00147 time_t alarmleft = alarm_remaining(); 00148 if (alarmleft == 0) 00149 already_sure = true; 00150 00151 IWvStream *old_in_stream = WvCrashInfo::in_stream; 00152 const char *old_in_stream_id = WvCrashInfo::in_stream_id; 00153 WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state; 00154 WvCrashInfo::in_stream_state = WvCrashInfo::POST_SELECT; 00155 00156 Iter i(*this); 00157 for (i.rewind(); i.cur() && i.next(); ) 00158 { 00159 IWvStream &s(*i); 00160 #if I_ENJOY_FORMATTING_STRINGS 00161 WvCrashWill will("doing post_select for \"%s\" (%s)\n%s", 00162 i.link->id, ptr2str(&s), wvcrash_read_will()); 00163 #else 00164 WvCrashInfo::in_stream = &s; 00165 WvCrashInfo::in_stream_id = i.link->id; 00166 #endif 00167 00168 si.wants = oldwant; 00169 if (s.post_select(si)) 00170 { 00171 TRACE("post_select(%s) was true\n", i.link->id); 00172 sure_thing.unlink(&s); // don't add it twice! 00173 s.addRef(); 00174 sure_thing.append(&s, true, i.link->id); 00175 } 00176 else 00177 { 00178 TRACE("post_select(%s) was false\n", i.link->id); 00179 WvIStreamListBase::Iter j(sure_thing); 00180 WvLink* link = j.find(&s); 00181 00182 wvassert(!link, "stream \"%s\" (%s) was ready in " 00183 "pre_select, but not in post_select", 00184 link->id, ptr2str(link->data)); 00185 } 00186 00187 if (!s.isok()) 00188 { 00189 already_sure = true; 00190 if (auto_prune) 00191 i.xunlink(); 00192 } 00193 } 00194 00195 WvCrashInfo::in_stream = old_in_stream; 00196 WvCrashInfo::in_stream_id = old_in_stream_id; 00197 WvCrashInfo::in_stream_state = old_in_stream_state; 00198 00199 si.wants = oldwant; 00200 return already_sure || !sure_thing.isempty(); 00201 } 00202 00203 00204 // distribute the callback() request to all children that select 'true' 00205 void WvIStreamList::execute() 00206 { 00207 static int level = 0; 00208 const char *id; 00209 level++; 00210 00211 WvStream::execute(); 00212 00213 TRACE("\n%*sList@%p: (%d sure) ", level, "", this, sure_thing.count()); 00214 00215 IWvStream *old_in_stream = WvCrashInfo::in_stream; 00216 const char *old_in_stream_id = WvCrashInfo::in_stream_id; 00217 WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state; 00218 WvCrashInfo::in_stream_state = WvCrashInfo::EXECUTE; 00219 00220 WvIStreamListBase::Iter i(sure_thing); 00221 for (i.rewind(); i.next(); ) 00222 { 00223 #if STREAMTRACE 00224 WvIStreamListBase::Iter x(*this); 00225 if (!x.find(&i())) 00226 TRACE("Yikes! %p in sure_thing, but not in main list!\n", 00227 i.cur()); 00228 #endif 00229 IWvStream &s(*i); 00230 s.addRef(); 00231 00232 id = i.link->id; 00233 00234 TRACE("[%p:%s]", &s, id); 00235 00236 i.xunlink(); 00237 00238 #if DEBUG 00239 if (!RUNNING_ON_VALGRIND) 00240 { 00241 WvString strace_node("%s: %s", s.wstype(), s.wsname()); 00242 ::write(-1, strace_node, strace_node.len()); 00243 } 00244 #endif 00245 #if I_ENJOY_FORMATTING_STRINGS 00246 WvCrashWill my_will("executing stream: %s\n%s", 00247 id ? id : "unknown stream", 00248 wvcrash_read_will()); 00249 #else 00250 WvCrashInfo::in_stream = &s; 00251 WvCrashInfo::in_stream_id = id; 00252 #endif 00253 00254 s.callback(); 00255 s.release(); 00256 00257 // list might have changed! 00258 i.rewind(); 00259 } 00260 00261 WvCrashInfo::in_stream = old_in_stream; 00262 WvCrashInfo::in_stream_id = old_in_stream_id; 00263 WvCrashInfo::in_stream_state = old_in_stream_state; 00264 00265 sure_thing.zap(); 00266 00267 level--; 00268 TRACE("[DONE %p]\n", this); 00269 } 00270 00271 #ifndef _WIN32 00272 void WvIStreamList::onfork(pid_t p) 00273 { 00274 if (p == 0) 00275 { 00276 // this is a child process: don't inherit the global streamlist 00277 globallist.zap(false); 00278 } 00279 } 00280 #endif 00281 00282 00283 void WvIStreamList::add_debugger_commands() 00284 { 00285 WvStreamsDebugger::add_command("globallist", 0, debugger_globallist_run_cb, 0); 00286 } 00287 00288 00289 WvString WvIStreamList::debugger_globallist_run_cb(WvStringParm cmd, 00290 WvStringList &args, 00291 WvStreamsDebugger::ResultCallback result_cb, void *) 00292 { 00293 debugger_streams_display_header(cmd, result_cb); 00294 WvIStreamList::Iter i(globallist); 00295 for (i.rewind(); i.next(); ) 00296 debugger_streams_maybe_display_one_stream(static_cast<WvStream *>(i.ptr()), 00297 cmd, args, result_cb); 00298 00299 return WvString::null; 00300 } 00301