WvStreams
wvstream.cc
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Unified support for streams, that is, sequences of bytes that may or
00006  * may not be ready for read/write at any given time.
00007  * 
00008  * We provide typical read and write routines, as well as a select() function
00009  * for each stream.
00010  */
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 #include "wvstreamsdebugger.h"
00019 #include "wvstrutils.h"
00020 #include "wvistreamlist.h"
00021 #include "wvlinkerhack.h"
00022 #include "wvmoniker.h"
00023 
00024 #ifdef _WIN32
00025 #define ENOBUFS WSAENOBUFS
00026 #undef errno
00027 #define errno GetLastError()
00028 #ifdef __GNUC__
00029 #include <sys/socket.h>
00030 #endif
00031 #include "streams.h"
00032 #else
00033 #include <errno.h>
00034 #endif
00035 
00036 #include <map>
00037 
00038 using std::make_pair;
00039 using std::map;
00040 
00041 
00042 // enable this to add some read/write trace messages (this can be VERY
00043 // verbose)
00044 #if 0
00045 # ifndef _MSC_VER
00046 #  define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00047 # else
00048 #  define TRACE printf
00049 # endif
00050 #else
00051 # ifndef _MSC_VER
00052 #  define TRACE(x, y...)
00053 # else
00054 #  define TRACE
00055 # endif
00056 #endif
00057 
00058 WvStream *WvStream::globalstream = NULL;
00059 
00060 UUID_MAP_BEGIN(WvStream)
00061   UUID_MAP_ENTRY(IObject)
00062   UUID_MAP_ENTRY(IWvStream)
00063   UUID_MAP_END
00064 
00065 
00066 static map<WSID, WvStream*> *wsid_map;
00067 static WSID next_wsid_to_try;
00068 
00069 
00070 WV_LINK(WvStream);
00071 
00072 static IWvStream *create_null(WvStringParm, IObject *)
00073 {
00074     return new WvStream();
00075 }
00076 
00077 static WvMoniker<IWvStream> reg("null",  create_null);
00078 
00079 
00080 IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj)
00081 {
00082     IWvStream *s = wvcreate<IWvStream>(moniker, obj);
00083     if (!s)
00084     {
00085         s = new WvStream();
00086         s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker);
00087         WVRELEASE(obj); // we're not going to use it after all
00088     }
00089     return s;
00090 }
00091 
00092 
00093 static bool is_prefix_insensitive(const char *str, const char *prefix)
00094 {
00095     size_t len = strlen(prefix);
00096     return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
00097 }
00098 
00099 
00100 static const char *strstr_insensitive(const char *haystack, const char *needle)
00101 {
00102     while (*haystack != '\0')
00103     {
00104         if (is_prefix_insensitive(haystack, needle))
00105             return haystack;
00106         ++haystack;
00107     }
00108     return NULL;
00109 }
00110 
00111 
00112 static bool contains_insensitive(const char *haystack, const char *needle)
00113 {
00114     return strstr_insensitive(haystack, needle) != NULL;
00115 }
00116 
00117 
00118 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
00119 static inline const char *Yes_No(bool val)
00120 {
00121     return val? "Yes": "No";
00122 }
00123 
00124 
00125 void WvStream::debugger_streams_display_header(WvStringParm cmd,
00126         WvStreamsDebugger::ResultCallback result_cb)
00127 {
00128     WvStringList result;
00129     result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
00130             "----------------Type", "-", "Name--------------------");
00131     result_cb(cmd, result);
00132 }
00133 
00134 
00135 // Set to fit in 6 chars
00136 static WvString friendly_ms(time_t ms)
00137 {
00138     if (ms <= 0)
00139         return WvString("(%s)", ms);
00140     else if (ms < 1000)
00141         return WvString("%sms", ms);
00142     else if (ms < 60*1000)
00143         return WvString("%ss", ms/1000);
00144     else if (ms < 60*60*1000)
00145         return WvString("%sm", ms/(60*1000));
00146     else if (ms <= 24*60*60*1000)
00147         return WvString("%sh", ms/(60*60*1000));
00148     else
00149         return WvString("%sd", ms/(24*60*60*1000));
00150 }
00151 
00152 void WvStream::debugger_streams_display_one_stream(WvStream *s,
00153         WvStringParm cmd,
00154         WvStreamsDebugger::ResultCallback result_cb)
00155 {
00156     WvStringList result;
00157     s->addRef();
00158     unsigned refcount = s->release();
00159     result.append(list_format,
00160             s->wsid(), " ",
00161             refcount, " ",
00162             Yes_No(s->isok()), " ",
00163             Yes_No(s->uses_continue_select), " ",
00164             friendly_ms(s->alarm_remaining()), " ",
00165             s->wstype(), " ",
00166             s->wsname());
00167     result_cb(cmd, result);
00168 }
00169 
00170 
00171 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
00172         WvStringParm cmd,
00173         const WvStringList &args,
00174         WvStreamsDebugger::ResultCallback result_cb)
00175 {
00176     bool show = args.isempty();
00177     WvStringList::Iter arg(args);
00178     for (arg.rewind(); arg.next(); )
00179     {
00180         WSID wsid;
00181         bool is_num = wvstring_to_num(*arg, wsid);
00182         
00183         if (is_num)
00184         {
00185             if (s->wsid() == wsid)
00186             {
00187                 show = true;
00188                 break;
00189             }
00190         }
00191         else
00192         {
00193             if (s->wsname() && contains_insensitive(s->wsname(), *arg)
00194                     || s->wstype() && contains_insensitive(s->wstype(), *arg))
00195             {
00196                 show = true;
00197                 break;
00198             }
00199         }
00200     }
00201     if (show)
00202         debugger_streams_display_one_stream(s, cmd, result_cb);
00203 }
00204 
00205 
00206 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
00207         WvStringList &args,
00208         WvStreamsDebugger::ResultCallback result_cb, void *)
00209 {
00210     debugger_streams_display_header(cmd, result_cb);
00211     if (wsid_map)
00212     {
00213         map<WSID, WvStream*>::iterator it;
00214 
00215         for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
00216             debugger_streams_maybe_display_one_stream(it->second, cmd, args,
00217                                                       result_cb);
00218     }
00219     
00220     return WvString::null;
00221 }
00222 
00223 
00224 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
00225         WvStringList &args,
00226         WvStreamsDebugger::ResultCallback result_cb, void *)
00227 {
00228     if (args.isempty())
00229         return WvString("Usage: %s <WSID>", cmd);
00230     WSID wsid;
00231     WvString wsid_str = args.popstr();
00232     if (!wvstring_to_num(wsid_str, wsid))
00233         return WvString("Invalid WSID '%s'", wsid_str);
00234     IWvStream *s = WvStream::find_by_wsid(wsid);
00235     if (!s)
00236         return WvString("No such stream");
00237     s->close();
00238     return WvString::null;
00239 }
00240 
00241 
00242 void WvStream::add_debugger_commands()
00243 {
00244     WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
00245     WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
00246 }
00247 
00248 
00249 WvStream::WvStream():
00250     read_requires_writable(NULL),
00251     write_requires_readable(NULL),
00252     uses_continue_select(false),
00253     personal_stack_size(131072),
00254     alarm_was_ticking(false),
00255     stop_read(false),
00256     stop_write(false),
00257     closed(false),
00258     readcb(wv::bind(&WvStream::legacy_callback, this)),
00259     max_outbuf_size(0),
00260     outbuf_delayed_flush(false),
00261     is_auto_flush(true),
00262     want_to_flush(true),
00263     is_flushing(false),
00264     queue_min(0),
00265     autoclose_time(0),
00266     alarm_time(wvtime_zero),
00267     last_alarm_check(wvtime_zero)
00268 {
00269     TRACE("Creating wvstream %p\n", this);
00270     
00271     static bool first = true;
00272     if (first)
00273     {
00274         first = false;
00275         WvStream::add_debugger_commands();
00276     }
00277     
00278     // Choose a wsid;
00279     if (!wsid_map)
00280         wsid_map = new map<WSID, WvStream*>;
00281     WSID first_wsid_tried = next_wsid_to_try;
00282     do
00283     {
00284         if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
00285             break;
00286         ++next_wsid_to_try;
00287     } while (next_wsid_to_try != first_wsid_tried);
00288     my_wsid = next_wsid_to_try++;
00289     bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second;
00290     assert(inserted);
00291     
00292 #ifdef _WIN32
00293     WSAData wsaData;
00294     int result = WSAStartup(MAKEWORD(2,0), &wsaData); 
00295     assert(result == 0);
00296 #endif
00297 }
00298 
00299 
00300 // FIXME: interfaces (IWvStream) shouldn't have implementations!
00301 IWvStream::IWvStream()
00302 {
00303 }
00304 
00305 
00306 IWvStream::~IWvStream()
00307 {
00308 }
00309 
00310 
00311 WvStream::~WvStream()
00312 {
00313     TRACE("destroying %p\n", this);
00314     close();
00315     
00316     // if this assertion fails, then uses_continue_select is true, but you
00317     // didn't call terminate_continue_select() or close() before destroying
00318     // your object.  Shame on you!
00319     assert(!uses_continue_select || !call_ctx);
00320     
00321     call_ctx = 0; // finish running the suspended callback, if any
00322 
00323     assert(wsid_map);
00324     wsid_map->erase(my_wsid);
00325     if (wsid_map->empty())
00326     {
00327         delete wsid_map;
00328         wsid_map = NULL;
00329     }
00330     
00331     // eventually, streams will auto-add themselves to the globallist.  But
00332     // even before then, it'll never be useful for them to be on the
00333     // globallist *after* they get destroyed, so we might as well auto-remove
00334     // them already.  It's harmless for people to try to remove them twice.
00335     WvIStreamList::globallist.unlink(this);
00336     
00337     TRACE("done destroying %p\n", this);
00338 }
00339 
00340 
00341 void WvStream::close()
00342 {
00343     TRACE("flushing in wvstream...\n");
00344     flush(2000); // fixme: should not hardcode this stuff
00345     TRACE("(flushed)\n");
00346 
00347     closed = true;
00348     
00349     if (!!closecb)
00350     {
00351         IWvStreamCallback cb = closecb;
00352         closecb = 0; // ensure callback is only called once
00353         cb();
00354     }
00355     
00356     // I would like to delete call_ctx here, but then if someone calls
00357     // close() from *inside* a continuable callback, we explode.  Oops!
00358     //call_ctx = 0; // destroy the context, if necessary
00359 }
00360 
00361 
00362 void WvStream::autoforward(WvStream &s)
00363 {
00364     setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s)));
00365     read_requires_writable = &s;
00366 }
00367 
00368 
00369 void WvStream::noautoforward()
00370 {
00371     setcallback(0);
00372     read_requires_writable = NULL;
00373 }
00374 
00375 
00376 void WvStream::autoforward_callback(WvStream &input, WvStream &output)
00377 {
00378     char buf[1024];
00379     size_t len;
00380     
00381     len = input.read(buf, sizeof(buf));
00382     output.write(buf, len);
00383 }
00384 
00385 
00386 void WvStream::_callback()
00387 {
00388     execute();
00389     if (!!callfunc)
00390         callfunc();
00391 }
00392 
00393 
00394 void *WvStream::_callwrap(void *)
00395 {
00396     _callback();
00397     return NULL;
00398 }
00399 
00400 
00401 void WvStream::callback()
00402 {
00403     TRACE("(?)");
00404     
00405     // if the alarm has gone off and we're calling callback... good!
00406     if (alarm_remaining() == 0)
00407     {
00408         alarm_time = wvtime_zero;
00409         alarm_was_ticking = true;
00410     }
00411     else
00412         alarm_was_ticking = false;
00413     
00414     assert(!uses_continue_select || personal_stack_size >= 1024);
00415 
00416 #define TEST_CONTINUES_HARSHLY 0
00417 #if TEST_CONTINUES_HARSHLY
00418 #ifndef _WIN32
00419 # warning "Using WvCont for *all* streams for testing!"
00420 #endif
00421     if (1)
00422 #else
00423     if (uses_continue_select && personal_stack_size >= 1024)
00424 #endif
00425     {
00426         if (!call_ctx) // no context exists yet!
00427         {
00428             call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1),
00429                               personal_stack_size);
00430         }
00431         
00432         call_ctx(NULL);
00433     }
00434     else
00435         _callback();
00436 
00437     // if this assertion fails, a derived class's virtual execute() function
00438     // didn't call its parent's execute() function, and we didn't make it
00439     // all the way back up to WvStream::execute().  This doesn't always
00440     // matter right now, but it could lead to obscure bugs later, so we'll
00441     // enforce it.
00442 }
00443 
00444 
00445 bool WvStream::isok() const
00446 {
00447     return !closed && WvErrorBase::isok();
00448 }
00449 
00450 
00451 void WvStream::seterr(int _errnum)
00452 {
00453     if (!geterr()) // no pre-existing error
00454     {
00455         WvErrorBase::seterr(_errnum);
00456         close();
00457     }
00458 }
00459 
00460 
00461 size_t WvStream::read(WvBuf &outbuf, size_t count)
00462 {
00463     // for now, just wrap the older read function
00464     size_t free = outbuf.free();
00465     if (count > free)
00466         count = free;
00467 
00468     WvDynBuf tmp;
00469     unsigned char *buf = tmp.alloc(count);
00470     size_t len = read(buf, count);
00471     tmp.unalloc(count - len);
00472     outbuf.merge(tmp);
00473     return len;
00474 }
00475 
00476 
00477 size_t WvStream::write(WvBuf &inbuf, size_t count)
00478 {
00479     // for now, just wrap the older write function
00480     size_t avail = inbuf.used();
00481     if (count > avail)
00482         count = avail;
00483     const unsigned char *buf = inbuf.get(count);
00484     size_t len = write(buf, count);
00485     inbuf.unget(count - len);
00486     return len;
00487 }
00488 
00489 
00490 size_t WvStream::read(void *buf, size_t count)
00491 {
00492     assert(!count || buf);
00493     
00494     size_t bufu, i;
00495     unsigned char *newbuf;
00496 
00497     bufu = inbuf.used();
00498     if (bufu < queue_min)
00499     {
00500         newbuf = inbuf.alloc(queue_min - bufu);
00501         assert(newbuf);
00502         i = uread(newbuf, queue_min - bufu);
00503         inbuf.unalloc(queue_min - bufu - i);
00504         
00505         bufu = inbuf.used();
00506     }
00507     
00508     if (bufu < queue_min)
00509     {
00510         maybe_autoclose();
00511         return 0;
00512     }
00513         
00514     // if buffer is empty, do a hard read
00515     if (!bufu)
00516         bufu = uread(buf, count);
00517     else
00518     {
00519         // otherwise just read from the buffer
00520         if (bufu > count)
00521             bufu = count;
00522     
00523         memcpy(buf, inbuf.get(bufu), bufu);
00524     }
00525     
00526     TRACE("read  obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00527     maybe_autoclose();
00528     return bufu;
00529 }
00530 
00531 
00532 size_t WvStream::write(const void *buf, size_t count)
00533 {
00534     assert(!count || buf);
00535     if (!isok() || !buf || !count || stop_write) return 0;
00536     
00537     size_t wrote = 0;
00538     if (!outbuf_delayed_flush && !outbuf.used())
00539     {
00540         wrote = uwrite(buf, count);
00541         count -= wrote;
00542         buf = (const unsigned char *)buf + wrote;
00543         // if (!count) return wrote; // short circuit if no buffering needed
00544     }
00545     if (max_outbuf_size != 0)
00546     {
00547         size_t canbuffer = max_outbuf_size - outbuf.used();
00548         if (count > canbuffer)
00549             count = canbuffer; // can't write the whole amount
00550     }
00551     if (count != 0)
00552     {
00553         outbuf.put(buf, count);
00554         wrote += count;
00555     }
00556 
00557     if (should_flush())
00558     {
00559         if (is_auto_flush)
00560             flush(0);
00561         else 
00562             flush_outbuf(0);
00563     }
00564 
00565     return wrote;
00566 }
00567 
00568 
00569 void WvStream::noread()
00570 {
00571     stop_read = true;
00572     maybe_autoclose();
00573 }
00574 
00575 
00576 void WvStream::nowrite()
00577 {
00578     stop_write = true;
00579     maybe_autoclose();
00580 }
00581 
00582 
00583 void WvStream::maybe_autoclose()
00584 {
00585     if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00586         close();
00587 }
00588 
00589 
00590 bool WvStream::isreadable()
00591 {
00592     return isok() && select(0, true, false, false);
00593 }
00594 
00595 
00596 bool WvStream::iswritable()
00597 {
00598     return !stop_write && isok() && select(0, false, true, false);
00599 }
00600 
00601 
00602 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00603                                  int readahead)
00604 {
00605     assert(separator >= 0);
00606     assert(separator <= 255);
00607     
00608     //assert(uses_continue_select || wait_msec == 0);
00609 
00610     WvTime timeout_time(0);
00611     if (wait_msec > 0)
00612         timeout_time = msecadd(wvtime(), wait_msec);
00613     
00614     maybe_autoclose();
00615 
00616     // if we get here, we either want to wait a bit or there is data
00617     // available.
00618     while (isok())
00619     {
00620         // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
00621         queuemin(0);
00622     
00623         // if there is a newline already, we have enough data.
00624         if (inbuf.strchr(separator) > 0)
00625             break;
00626         else if (!isok() || stop_read)    // uh oh, stream is in trouble.
00627             break;
00628 
00629         // make select not return true until more data is available
00630         queuemin(inbuf.used() + 1);
00631 
00632         // compute remaining timeout
00633         if (wait_msec > 0)
00634         {
00635             wait_msec = msecdiff(timeout_time, wvtime());
00636             if (wait_msec < 0)
00637                 wait_msec = 0;
00638         }
00639         
00640         // FIXME: this is blocking_getline.  It shouldn't
00641         // call continue_select()!
00642         bool hasdata;
00643         if (wait_msec != 0 && uses_continue_select)
00644             hasdata = continue_select(wait_msec);
00645         else
00646             hasdata = select(wait_msec, true, false);
00647         
00648         if (!isok())
00649             break;
00650 
00651         if (hasdata)
00652         {
00653             // read a few bytes
00654             WvDynBuf tmp;
00655             unsigned char *buf = tmp.alloc(readahead);
00656             assert(buf);
00657             size_t len = uread(buf, readahead);
00658             tmp.unalloc(readahead - len);
00659             inbuf.put(tmp.get(len), len);
00660             hasdata = len > 0; // enough?
00661         }
00662 
00663         if (!isok())
00664             break;
00665         
00666         if (!hasdata && wait_msec == 0)
00667             return NULL; // handle timeout
00668     }
00669     if (!inbuf.used())
00670         return NULL;
00671 
00672     // return the appropriate data
00673     size_t i = 0;
00674     i = inbuf.strchr(separator);
00675     if (i > 0) {
00676         char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00677         assert(eol && *eol == separator);
00678         *eol = 0;
00679         return const_cast<char*>((const char *)inbuf.get(i));
00680     } else {
00681         // handle "EOF without newline" condition
00682         // FIXME: it's very silly that buffers can't return editable
00683         // char* arrays.
00684         inbuf.alloc(1)[0] = 0; // null-terminate it
00685         return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00686     }
00687 }
00688 
00689 
00690 char *WvStream::continue_getline(time_t wait_msec, int separator,
00691                                  int readahead)
00692 {
00693     assert(false && "not implemented, come back later!");
00694     assert(uses_continue_select);
00695     return NULL;
00696 }
00697 
00698 
00699 void WvStream::drain()
00700 {
00701     char buf[1024];
00702     while (isreadable())
00703         read(buf, sizeof(buf));
00704 }
00705 
00706 
00707 bool WvStream::flush(time_t msec_timeout)
00708 {
00709     if (is_flushing) return false;
00710     
00711     TRACE("%p flush starts\n", this);
00712 
00713     is_flushing = true;
00714     want_to_flush = true;
00715     bool done = flush_internal(msec_timeout) // any other internal buffers
00716         && flush_outbuf(msec_timeout);  // our own outbuf
00717     is_flushing = false;
00718 
00719     TRACE("flush stops (%d)\n", done);
00720     return done;
00721 }
00722 
00723 
00724 bool WvStream::should_flush()
00725 {
00726     return want_to_flush;
00727 }
00728 
00729 
00730 bool WvStream::flush_outbuf(time_t msec_timeout)
00731 {
00732     TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00733     bool outbuf_was_used = outbuf.used();
00734     
00735     // do-nothing shortcut for speed
00736     // FIXME: definitely makes a "measurable" difference...
00737     //   but is it worth the risk?
00738     if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00739     {
00740         maybe_autoclose();
00741         return true;
00742     }
00743     
00744     WvTime stoptime = msecadd(wvtime(), msec_timeout);
00745     
00746     // flush outbuf
00747     while (outbuf_was_used && isok())
00748     {
00749 //      fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 
00750 //              this, getrfd(), getwfd(), outbuf.used());
00751         
00752         size_t attempt = outbuf.optgettable();
00753         size_t real = uwrite(outbuf.get(attempt), attempt);
00754         
00755         // WARNING: uwrite() may have messed up our outbuf!
00756         // This probably only happens if uwrite() closed the stream because
00757         // of an error, so we'll check isok().
00758         if (isok() && real < attempt)
00759         {
00760             TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00761             assert(outbuf.ungettable() >= attempt - real);
00762             outbuf.unget(attempt - real);
00763         }
00764         
00765         // since post_select() can call us, and select() calls post_select(),
00766         // we need to be careful not to call select() if we don't need to!
00767         // post_select() will only call us with msec_timeout==0, and we don't
00768         // need to do select() in that case anyway.
00769         if (!msec_timeout)
00770             break;
00771         if (msec_timeout >= 0 
00772           && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00773             break;
00774         
00775         outbuf_was_used = outbuf.used();
00776     }
00777 
00778     // handle autoclose
00779     if (autoclose_time && isok())
00780     {
00781         time_t now = time(NULL);
00782         TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 
00783               this, now - autoclose_time, outbuf.used());
00784         if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00785         {
00786             autoclose_time = 0; // avoid infinite recursion!
00787             close();
00788         }
00789     }
00790 
00791     TRACE("flush_outbuf: after autoclose chunk\n");
00792     if (outbuf_delayed_flush && !outbuf_was_used)
00793         want_to_flush = false;
00794     
00795     TRACE("flush_outbuf: now isok=%d\n", isok());
00796 
00797     // if we can't flush the outbuf, at least empty it!
00798     if (outbuf_was_used && !isok())
00799         outbuf.zap();
00800 
00801     maybe_autoclose();
00802     TRACE("flush_outbuf stops\n");
00803     
00804     return !outbuf_was_used;
00805 }
00806 
00807 
00808 bool WvStream::flush_internal(time_t msec_timeout)
00809 {
00810     // once outbuf emptied, that's it for most streams
00811     return true;
00812 }
00813 
00814 
00815 int WvStream::getrfd() const
00816 {
00817     return -1;
00818 }
00819 
00820 
00821 int WvStream::getwfd() const
00822 {
00823     return -1;
00824 }
00825 
00826 
00827 void WvStream::flush_then_close(int msec_timeout)
00828 {
00829     time_t now = time(NULL);
00830     autoclose_time = now + (msec_timeout + 999) / 1000;
00831     
00832     TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 
00833             this, outbuf.used(), autoclose_time - now);
00834 
00835     // as a fast track, we _could_ close here: but that's not a good idea,
00836     // since flush_then_close() deals with obscure situations, and we don't
00837     // want the caller to use it incorrectly.  So we make things _always_
00838     // break when the caller forgets to call select() later.
00839     
00840     flush(0);
00841 }
00842 
00843 
00844 void WvStream::pre_select(SelectInfo &si)
00845 {
00846     maybe_autoclose();
00847     
00848     time_t alarmleft = alarm_remaining();
00849     
00850     if (!isok() || (!si.inherit_request && alarmleft == 0))
00851     {
00852         si.msec_timeout = 0;
00853         return; // alarm has rung
00854     }
00855 
00856     if (!si.inherit_request)
00857     {
00858         si.wants.readable |= static_cast<bool>(readcb);
00859         si.wants.writable |= static_cast<bool>(writecb);
00860         si.wants.isexception |= static_cast<bool>(exceptcb);
00861     }
00862     
00863     // handle read-ahead buffering
00864     if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00865     {
00866         si.msec_timeout = 0; // already ready
00867         return;
00868     }
00869     if (alarmleft >= 0
00870       && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00871         si.msec_timeout = alarmleft + 10;
00872 }
00873 
00874 
00875 bool WvStream::post_select(SelectInfo &si)
00876 {
00877     if (!si.inherit_request)
00878     {
00879         si.wants.readable |= static_cast<bool>(readcb);
00880         si.wants.writable |= static_cast<bool>(writecb);
00881         si.wants.isexception |= static_cast<bool>(exceptcb);
00882     }
00883     
00884     // FIXME: need sane buffer flush support for non FD-based streams
00885     // FIXME: need read_requires_writable and write_requires_readable
00886     //        support for non FD-based streams
00887     
00888     // note: flush(nonzero) might call select(), but flush(0) never does,
00889     // so this is safe.
00890     if (should_flush())
00891         flush(0);
00892     if (!si.inherit_request && alarm_remaining() == 0)
00893         return true; // alarm ticked
00894     if ((si.wants.readable || (!si.inherit_request && readcb))
00895         && inbuf.used() && inbuf.used() >= queue_min)
00896         return true; // already ready
00897     return false;
00898 }
00899 
00900 
00901 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00902     bool readable, bool writable, bool isexcept, bool forceable)
00903 {
00904     FD_ZERO(&si.read);
00905     FD_ZERO(&si.write);
00906     FD_ZERO(&si.except);
00907     
00908     if (forceable)
00909     {
00910         si.wants.readable = readcb;
00911         si.wants.writable = writecb;
00912         si.wants.isexception = exceptcb;
00913     }
00914     else
00915     {
00916         si.wants.readable = readable;
00917         si.wants.writable = writable;
00918         si.wants.isexception = isexcept;
00919     }
00920     
00921     si.max_fd = -1;
00922     si.msec_timeout = msec_timeout;
00923     si.inherit_request = ! forceable;
00924     si.global_sure = false;
00925 
00926     wvstime_sync();
00927 
00928     pre_select(si);
00929     if (globalstream && forceable && (globalstream != this))
00930     {
00931         WvStream *s = globalstream;
00932         globalstream = NULL; // prevent recursion
00933         s->xpre_select(si, SelectRequest(false, false, false));
00934         globalstream = s;
00935     }
00936 }
00937 
00938 
00939 int WvStream::_do_select(SelectInfo &si)
00940 {
00941     // prepare timeout
00942     timeval tv;
00943     tv.tv_sec = si.msec_timeout / 1000;
00944     tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00945     
00946 #ifdef _WIN32
00947     // selecting on an empty set of sockets doesn't cause a delay in win32.
00948     SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00949     FD_SET(fakefd, &si.except);
00950 #endif    
00951     
00952     // block
00953     int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00954         si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00955 
00956     // handle errors.
00957     //   EAGAIN and EINTR don't matter because they're totally normal.
00958     //   ENOBUFS is hopefully transient.
00959     //   EBADF is kind of gross and might imply that something is wrong,
00960     //      but it happens sometimes...
00961     if (sel < 0 
00962       && errno != EAGAIN && errno != EINTR 
00963       && errno != EBADF
00964       && errno != ENOBUFS
00965       )
00966     {
00967         seterr(errno);
00968     }
00969 #ifdef _WIN32
00970     ::close(fakefd);
00971 #endif
00972     TRACE("select() returned %d\n", sel);
00973     return sel;
00974 }
00975 
00976 
00977 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00978 {
00979     // We cannot move the clock backward here, because timers that
00980     // were expired in pre_select could then not be expired anymore,
00981     // and while time going backward is rather unsettling in general,
00982     // for it to be happening between pre_select and post_select is
00983     // just outright insanity.
00984     wvstime_sync_forward();
00985         
00986     bool sure = post_select(si);
00987     if (globalstream && forceable && (globalstream != this))
00988     {
00989         WvStream *s = globalstream;
00990         globalstream = NULL; // prevent recursion
00991         si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00992                                          || si.global_sure;
00993         globalstream = s;
00994     }
00995     return sure;
00996 }
00997 
00998 
00999 bool WvStream::_select(time_t msec_timeout, bool readable, bool writable,
01000                        bool isexcept, bool forceable)
01001 {
01002     // Detect use of deleted stream
01003     assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
01004         
01005     SelectInfo si;
01006     _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
01007                       forceable);
01008     
01009     bool sure = false;
01010     int sel = _do_select(si);
01011     if (sel >= 0)
01012         sure = _process_selectinfo(si, forceable); 
01013     if (si.global_sure && globalstream && forceable && (globalstream != this))
01014         globalstream->callback();
01015 
01016     return sure;
01017 }
01018 
01019 
01020 IWvStream::SelectRequest WvStream::get_select_request()
01021 {
01022     return IWvStream::SelectRequest(readcb, writecb, exceptcb);
01023 }
01024 
01025 
01026 void WvStream::force_select(bool readable, bool writable, bool isexception)
01027 {
01028     if (readable)
01029         readcb = wv::bind(&WvStream::legacy_callback, this);
01030     if (writable)
01031         writecb = wv::bind(&WvStream::legacy_callback, this);
01032     if (isexception)
01033         exceptcb = wv::bind(&WvStream::legacy_callback, this);
01034 }
01035 
01036 
01037 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
01038 {
01039     if (readable)
01040         readcb = 0;
01041     if (writable)
01042         writecb = 0;
01043     if (isexception)
01044         exceptcb = 0;
01045 }
01046 
01047 
01048 void WvStream::alarm(time_t msec_timeout)
01049 {
01050     if (msec_timeout >= 0)
01051         alarm_time = msecadd(wvstime(), msec_timeout);
01052     else
01053         alarm_time = wvtime_zero;
01054 }
01055 
01056 
01057 time_t WvStream::alarm_remaining()
01058 {
01059     if (alarm_time.tv_sec)
01060     {
01061         WvTime now = wvstime();
01062 
01063         // Time is going backward!
01064         if (now < last_alarm_check)
01065         {
01066 #if 0 // okay, I give up.  Time just plain goes backwards on some systems.
01067             // warn only if it's a "big" difference (sigh...)
01068             if (msecdiff(last_alarm_check, now) > 200)
01069                 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
01070                         "(%ld:%ld %ld:%ld)\n",
01071                         last_alarm_check.tv_sec, last_alarm_check.tv_usec,
01072                         now.tv_sec, now.tv_usec);
01073 #endif
01074             alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
01075         }
01076 
01077         last_alarm_check = now;
01078 
01079         time_t remaining = msecdiff(alarm_time, now);
01080         if (remaining < 0)
01081             remaining = 0;
01082         return remaining;
01083     }
01084     return -1;
01085 }
01086 
01087 
01088 bool WvStream::continue_select(time_t msec_timeout)
01089 {
01090     assert(uses_continue_select);
01091     
01092     // if this assertion triggers, you probably tried to do continue_select()
01093     // while inside terminate_continue_select().
01094     assert(call_ctx);
01095     
01096     if (msec_timeout >= 0)
01097         alarm(msec_timeout);
01098 
01099     alarm(msec_timeout);
01100     WvCont::yield();
01101     alarm(-1); // cancel the still-pending alarm, or it might go off later!
01102     
01103     // when we get here, someone has jumped back into our task.
01104     // We have to select(0) here because it's possible that the alarm was 
01105     // ticking _and_ data was available.  This is aggravated especially if
01106     // msec_delay was zero.  Note that running select() here isn't
01107     // inefficient, because if the alarm was expired then pre_select()
01108     // returned true anyway and short-circuited the previous select().
01109     TRACE("hello-%p\n", this);
01110     return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
01111 }
01112 
01113 
01114 void WvStream::terminate_continue_select()
01115 {
01116     close();
01117     call_ctx = 0; // destroy the context, if necessary
01118 }
01119 
01120 
01121 const WvAddr *WvStream::src() const
01122 {
01123     return NULL;
01124 }
01125 
01126 
01127 void WvStream::setcallback(IWvStreamCallback _callfunc)
01128 { 
01129     callfunc = _callfunc;
01130     call_ctx = 0; // delete any in-progress WvCont
01131 }
01132 
01133 
01134 void WvStream::legacy_callback()
01135 {
01136     execute();
01137     if (!!callfunc)
01138         callfunc();
01139 }
01140 
01141 
01142 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
01143 {
01144     IWvStreamCallback tmp = readcb;
01145 
01146     readcb = _callback;
01147 
01148     return tmp;
01149 }
01150 
01151 
01152 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
01153 {
01154     IWvStreamCallback tmp = writecb;
01155 
01156     writecb = _callback;
01157 
01158     return tmp;
01159 }
01160 
01161 
01162 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
01163 {
01164     IWvStreamCallback tmp = exceptcb;
01165 
01166     exceptcb = _callback;
01167 
01168     return tmp;
01169 }
01170 
01171 
01172 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
01173 {
01174     IWvStreamCallback tmp = closecb;
01175     if (isok())
01176         closecb = _callback;
01177     else
01178     {
01179         // already closed?  notify immediately!
01180         closecb = 0;
01181         if (!!_callback)
01182             _callback();
01183     }
01184     return tmp;
01185 }
01186 
01187 
01188 void WvStream::unread(WvBuf &unreadbuf, size_t count)
01189 {
01190     WvDynBuf tmp;
01191     tmp.merge(unreadbuf, count);
01192     tmp.merge(inbuf);
01193     inbuf.zap();
01194     inbuf.merge(tmp);
01195 }
01196 
01197 
01198 IWvStream *WvStream::find_by_wsid(WSID wsid)
01199 {
01200     IWvStream *retval = NULL;
01201 
01202     if (wsid_map)
01203     {
01204         map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
01205 
01206         if (it != wsid_map->end())
01207             retval = it->second;
01208     }
01209 
01210     return retval;
01211 }