WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * Unified support for streams, that is, sequences of bytes that may or 00006 * may not be ready for read/write at any given time. 00007 * 00008 * We provide typical read and write routines, as well as a select() function 00009 * for each stream. 00010 */ 00011 #include <time.h> 00012 #include <sys/types.h> 00013 #include <assert.h> 00014 #define __WVSTREAM_UNIT_TEST 1 00015 #include "wvstream.h" 00016 #include "wvtimeutils.h" 00017 #include "wvcont.h" 00018 #include "wvstreamsdebugger.h" 00019 #include "wvstrutils.h" 00020 #include "wvistreamlist.h" 00021 #include "wvlinkerhack.h" 00022 #include "wvmoniker.h" 00023 00024 #ifdef _WIN32 00025 #define ENOBUFS WSAENOBUFS 00026 #undef errno 00027 #define errno GetLastError() 00028 #ifdef __GNUC__ 00029 #include <sys/socket.h> 00030 #endif 00031 #include "streams.h" 00032 #else 00033 #include <errno.h> 00034 #endif 00035 00036 #include <map> 00037 00038 using std::make_pair; 00039 using std::map; 00040 00041 00042 // enable this to add some read/write trace messages (this can be VERY 00043 // verbose) 00044 #if 0 00045 # ifndef _MSC_VER 00046 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr); 00047 # else 00048 # define TRACE printf 00049 # endif 00050 #else 00051 # ifndef _MSC_VER 00052 # define TRACE(x, y...) 00053 # else 00054 # define TRACE 00055 # endif 00056 #endif 00057 00058 WvStream *WvStream::globalstream = NULL; 00059 00060 UUID_MAP_BEGIN(WvStream) 00061 UUID_MAP_ENTRY(IObject) 00062 UUID_MAP_ENTRY(IWvStream) 00063 UUID_MAP_END 00064 00065 00066 static map<WSID, WvStream*> *wsid_map; 00067 static WSID next_wsid_to_try; 00068 00069 00070 WV_LINK(WvStream); 00071 00072 static IWvStream *create_null(WvStringParm, IObject *) 00073 { 00074 return new WvStream(); 00075 } 00076 00077 static WvMoniker<IWvStream> reg("null", create_null); 00078 00079 00080 IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj) 00081 { 00082 IWvStream *s = wvcreate<IWvStream>(moniker, obj); 00083 if (!s) 00084 { 00085 s = new WvStream(); 00086 s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker); 00087 WVRELEASE(obj); // we're not going to use it after all 00088 } 00089 return s; 00090 } 00091 00092 00093 static bool is_prefix_insensitive(const char *str, const char *prefix) 00094 { 00095 size_t len = strlen(prefix); 00096 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0; 00097 } 00098 00099 00100 static const char *strstr_insensitive(const char *haystack, const char *needle) 00101 { 00102 while (*haystack != '\0') 00103 { 00104 if (is_prefix_insensitive(haystack, needle)) 00105 return haystack; 00106 ++haystack; 00107 } 00108 return NULL; 00109 } 00110 00111 00112 static bool contains_insensitive(const char *haystack, const char *needle) 00113 { 00114 return strstr_insensitive(haystack, needle) != NULL; 00115 } 00116 00117 00118 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s"; 00119 static inline const char *Yes_No(bool val) 00120 { 00121 return val? "Yes": "No"; 00122 } 00123 00124 00125 void WvStream::debugger_streams_display_header(WvStringParm cmd, 00126 WvStreamsDebugger::ResultCallback result_cb) 00127 { 00128 WvStringList result; 00129 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-", 00130 "----------------Type", "-", "Name--------------------"); 00131 result_cb(cmd, result); 00132 } 00133 00134 00135 // Set to fit in 6 chars 00136 static WvString friendly_ms(time_t ms) 00137 { 00138 if (ms <= 0) 00139 return WvString("(%s)", ms); 00140 else if (ms < 1000) 00141 return WvString("%sms", ms); 00142 else if (ms < 60*1000) 00143 return WvString("%ss", ms/1000); 00144 else if (ms < 60*60*1000) 00145 return WvString("%sm", ms/(60*1000)); 00146 else if (ms <= 24*60*60*1000) 00147 return WvString("%sh", ms/(60*60*1000)); 00148 else 00149 return WvString("%sd", ms/(24*60*60*1000)); 00150 } 00151 00152 void WvStream::debugger_streams_display_one_stream(WvStream *s, 00153 WvStringParm cmd, 00154 WvStreamsDebugger::ResultCallback result_cb) 00155 { 00156 WvStringList result; 00157 s->addRef(); 00158 unsigned refcount = s->release(); 00159 result.append(list_format, 00160 s->wsid(), " ", 00161 refcount, " ", 00162 Yes_No(s->isok()), " ", 00163 Yes_No(s->uses_continue_select), " ", 00164 friendly_ms(s->alarm_remaining()), " ", 00165 s->wstype(), " ", 00166 s->wsname()); 00167 result_cb(cmd, result); 00168 } 00169 00170 00171 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s, 00172 WvStringParm cmd, 00173 const WvStringList &args, 00174 WvStreamsDebugger::ResultCallback result_cb) 00175 { 00176 bool show = args.isempty(); 00177 WvStringList::Iter arg(args); 00178 for (arg.rewind(); arg.next(); ) 00179 { 00180 WSID wsid; 00181 bool is_num = wvstring_to_num(*arg, wsid); 00182 00183 if (is_num) 00184 { 00185 if (s->wsid() == wsid) 00186 { 00187 show = true; 00188 break; 00189 } 00190 } 00191 else 00192 { 00193 if (s->wsname() && contains_insensitive(s->wsname(), *arg) 00194 || s->wstype() && contains_insensitive(s->wstype(), *arg)) 00195 { 00196 show = true; 00197 break; 00198 } 00199 } 00200 } 00201 if (show) 00202 debugger_streams_display_one_stream(s, cmd, result_cb); 00203 } 00204 00205 00206 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd, 00207 WvStringList &args, 00208 WvStreamsDebugger::ResultCallback result_cb, void *) 00209 { 00210 debugger_streams_display_header(cmd, result_cb); 00211 if (wsid_map) 00212 { 00213 map<WSID, WvStream*>::iterator it; 00214 00215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it) 00216 debugger_streams_maybe_display_one_stream(it->second, cmd, args, 00217 result_cb); 00218 } 00219 00220 return WvString::null; 00221 } 00222 00223 00224 WvString WvStream::debugger_close_run_cb(WvStringParm cmd, 00225 WvStringList &args, 00226 WvStreamsDebugger::ResultCallback result_cb, void *) 00227 { 00228 if (args.isempty()) 00229 return WvString("Usage: %s <WSID>", cmd); 00230 WSID wsid; 00231 WvString wsid_str = args.popstr(); 00232 if (!wvstring_to_num(wsid_str, wsid)) 00233 return WvString("Invalid WSID '%s'", wsid_str); 00234 IWvStream *s = WvStream::find_by_wsid(wsid); 00235 if (!s) 00236 return WvString("No such stream"); 00237 s->close(); 00238 return WvString::null; 00239 } 00240 00241 00242 void WvStream::add_debugger_commands() 00243 { 00244 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0); 00245 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0); 00246 } 00247 00248 00249 WvStream::WvStream(): 00250 read_requires_writable(NULL), 00251 write_requires_readable(NULL), 00252 uses_continue_select(false), 00253 personal_stack_size(131072), 00254 alarm_was_ticking(false), 00255 stop_read(false), 00256 stop_write(false), 00257 closed(false), 00258 readcb(wv::bind(&WvStream::legacy_callback, this)), 00259 max_outbuf_size(0), 00260 outbuf_delayed_flush(false), 00261 is_auto_flush(true), 00262 want_to_flush(true), 00263 is_flushing(false), 00264 queue_min(0), 00265 autoclose_time(0), 00266 alarm_time(wvtime_zero), 00267 last_alarm_check(wvtime_zero) 00268 { 00269 TRACE("Creating wvstream %p\n", this); 00270 00271 static bool first = true; 00272 if (first) 00273 { 00274 first = false; 00275 WvStream::add_debugger_commands(); 00276 } 00277 00278 // Choose a wsid; 00279 if (!wsid_map) 00280 wsid_map = new map<WSID, WvStream*>; 00281 WSID first_wsid_tried = next_wsid_to_try; 00282 do 00283 { 00284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end()) 00285 break; 00286 ++next_wsid_to_try; 00287 } while (next_wsid_to_try != first_wsid_tried); 00288 my_wsid = next_wsid_to_try++; 00289 bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second; 00290 assert(inserted); 00291 00292 #ifdef _WIN32 00293 WSAData wsaData; 00294 int result = WSAStartup(MAKEWORD(2,0), &wsaData); 00295 assert(result == 0); 00296 #endif 00297 } 00298 00299 00300 // FIXME: interfaces (IWvStream) shouldn't have implementations! 00301 IWvStream::IWvStream() 00302 { 00303 } 00304 00305 00306 IWvStream::~IWvStream() 00307 { 00308 } 00309 00310 00311 WvStream::~WvStream() 00312 { 00313 TRACE("destroying %p\n", this); 00314 close(); 00315 00316 // if this assertion fails, then uses_continue_select is true, but you 00317 // didn't call terminate_continue_select() or close() before destroying 00318 // your object. Shame on you! 00319 assert(!uses_continue_select || !call_ctx); 00320 00321 call_ctx = 0; // finish running the suspended callback, if any 00322 00323 assert(wsid_map); 00324 wsid_map->erase(my_wsid); 00325 if (wsid_map->empty()) 00326 { 00327 delete wsid_map; 00328 wsid_map = NULL; 00329 } 00330 00331 // eventually, streams will auto-add themselves to the globallist. But 00332 // even before then, it'll never be useful for them to be on the 00333 // globallist *after* they get destroyed, so we might as well auto-remove 00334 // them already. It's harmless for people to try to remove them twice. 00335 WvIStreamList::globallist.unlink(this); 00336 00337 TRACE("done destroying %p\n", this); 00338 } 00339 00340 00341 void WvStream::close() 00342 { 00343 TRACE("flushing in wvstream...\n"); 00344 flush(2000); // fixme: should not hardcode this stuff 00345 TRACE("(flushed)\n"); 00346 00347 closed = true; 00348 00349 if (!!closecb) 00350 { 00351 IWvStreamCallback cb = closecb; 00352 closecb = 0; // ensure callback is only called once 00353 cb(); 00354 } 00355 00356 // I would like to delete call_ctx here, but then if someone calls 00357 // close() from *inside* a continuable callback, we explode. Oops! 00358 //call_ctx = 0; // destroy the context, if necessary 00359 } 00360 00361 00362 void WvStream::autoforward(WvStream &s) 00363 { 00364 setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s))); 00365 read_requires_writable = &s; 00366 } 00367 00368 00369 void WvStream::noautoforward() 00370 { 00371 setcallback(0); 00372 read_requires_writable = NULL; 00373 } 00374 00375 00376 void WvStream::autoforward_callback(WvStream &input, WvStream &output) 00377 { 00378 char buf[1024]; 00379 size_t len; 00380 00381 len = input.read(buf, sizeof(buf)); 00382 output.write(buf, len); 00383 } 00384 00385 00386 void WvStream::_callback() 00387 { 00388 execute(); 00389 if (!!callfunc) 00390 callfunc(); 00391 } 00392 00393 00394 void *WvStream::_callwrap(void *) 00395 { 00396 _callback(); 00397 return NULL; 00398 } 00399 00400 00401 void WvStream::callback() 00402 { 00403 TRACE("(?)"); 00404 00405 // if the alarm has gone off and we're calling callback... good! 00406 if (alarm_remaining() == 0) 00407 { 00408 alarm_time = wvtime_zero; 00409 alarm_was_ticking = true; 00410 } 00411 else 00412 alarm_was_ticking = false; 00413 00414 assert(!uses_continue_select || personal_stack_size >= 1024); 00415 00416 #define TEST_CONTINUES_HARSHLY 0 00417 #if TEST_CONTINUES_HARSHLY 00418 #ifndef _WIN32 00419 # warning "Using WvCont for *all* streams for testing!" 00420 #endif 00421 if (1) 00422 #else 00423 if (uses_continue_select && personal_stack_size >= 1024) 00424 #endif 00425 { 00426 if (!call_ctx) // no context exists yet! 00427 { 00428 call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1), 00429 personal_stack_size); 00430 } 00431 00432 call_ctx(NULL); 00433 } 00434 else 00435 _callback(); 00436 00437 // if this assertion fails, a derived class's virtual execute() function 00438 // didn't call its parent's execute() function, and we didn't make it 00439 // all the way back up to WvStream::execute(). This doesn't always 00440 // matter right now, but it could lead to obscure bugs later, so we'll 00441 // enforce it. 00442 } 00443 00444 00445 bool WvStream::isok() const 00446 { 00447 return !closed && WvErrorBase::isok(); 00448 } 00449 00450 00451 void WvStream::seterr(int _errnum) 00452 { 00453 if (!geterr()) // no pre-existing error 00454 { 00455 WvErrorBase::seterr(_errnum); 00456 close(); 00457 } 00458 } 00459 00460 00461 size_t WvStream::read(WvBuf &outbuf, size_t count) 00462 { 00463 // for now, just wrap the older read function 00464 size_t free = outbuf.free(); 00465 if (count > free) 00466 count = free; 00467 00468 WvDynBuf tmp; 00469 unsigned char *buf = tmp.alloc(count); 00470 size_t len = read(buf, count); 00471 tmp.unalloc(count - len); 00472 outbuf.merge(tmp); 00473 return len; 00474 } 00475 00476 00477 size_t WvStream::write(WvBuf &inbuf, size_t count) 00478 { 00479 // for now, just wrap the older write function 00480 size_t avail = inbuf.used(); 00481 if (count > avail) 00482 count = avail; 00483 const unsigned char *buf = inbuf.get(count); 00484 size_t len = write(buf, count); 00485 inbuf.unget(count - len); 00486 return len; 00487 } 00488 00489 00490 size_t WvStream::read(void *buf, size_t count) 00491 { 00492 assert(!count || buf); 00493 00494 size_t bufu, i; 00495 unsigned char *newbuf; 00496 00497 bufu = inbuf.used(); 00498 if (bufu < queue_min) 00499 { 00500 newbuf = inbuf.alloc(queue_min - bufu); 00501 assert(newbuf); 00502 i = uread(newbuf, queue_min - bufu); 00503 inbuf.unalloc(queue_min - bufu - i); 00504 00505 bufu = inbuf.used(); 00506 } 00507 00508 if (bufu < queue_min) 00509 { 00510 maybe_autoclose(); 00511 return 0; 00512 } 00513 00514 // if buffer is empty, do a hard read 00515 if (!bufu) 00516 bufu = uread(buf, count); 00517 else 00518 { 00519 // otherwise just read from the buffer 00520 if (bufu > count) 00521 bufu = count; 00522 00523 memcpy(buf, inbuf.get(bufu), bufu); 00524 } 00525 00526 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count); 00527 maybe_autoclose(); 00528 return bufu; 00529 } 00530 00531 00532 size_t WvStream::write(const void *buf, size_t count) 00533 { 00534 assert(!count || buf); 00535 if (!isok() || !buf || !count || stop_write) return 0; 00536 00537 size_t wrote = 0; 00538 if (!outbuf_delayed_flush && !outbuf.used()) 00539 { 00540 wrote = uwrite(buf, count); 00541 count -= wrote; 00542 buf = (const unsigned char *)buf + wrote; 00543 // if (!count) return wrote; // short circuit if no buffering needed 00544 } 00545 if (max_outbuf_size != 0) 00546 { 00547 size_t canbuffer = max_outbuf_size - outbuf.used(); 00548 if (count > canbuffer) 00549 count = canbuffer; // can't write the whole amount 00550 } 00551 if (count != 0) 00552 { 00553 outbuf.put(buf, count); 00554 wrote += count; 00555 } 00556 00557 if (should_flush()) 00558 { 00559 if (is_auto_flush) 00560 flush(0); 00561 else 00562 flush_outbuf(0); 00563 } 00564 00565 return wrote; 00566 } 00567 00568 00569 void WvStream::noread() 00570 { 00571 stop_read = true; 00572 maybe_autoclose(); 00573 } 00574 00575 00576 void WvStream::nowrite() 00577 { 00578 stop_write = true; 00579 maybe_autoclose(); 00580 } 00581 00582 00583 void WvStream::maybe_autoclose() 00584 { 00585 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed) 00586 close(); 00587 } 00588 00589 00590 bool WvStream::isreadable() 00591 { 00592 return isok() && select(0, true, false, false); 00593 } 00594 00595 00596 bool WvStream::iswritable() 00597 { 00598 return !stop_write && isok() && select(0, false, true, false); 00599 } 00600 00601 00602 char *WvStream::blocking_getline(time_t wait_msec, int separator, 00603 int readahead) 00604 { 00605 assert(separator >= 0); 00606 assert(separator <= 255); 00607 00608 //assert(uses_continue_select || wait_msec == 0); 00609 00610 WvTime timeout_time(0); 00611 if (wait_msec > 0) 00612 timeout_time = msecadd(wvtime(), wait_msec); 00613 00614 maybe_autoclose(); 00615 00616 // if we get here, we either want to wait a bit or there is data 00617 // available. 00618 while (isok()) 00619 { 00620 // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr); 00621 queuemin(0); 00622 00623 // if there is a newline already, we have enough data. 00624 if (inbuf.strchr(separator) > 0) 00625 break; 00626 else if (!isok() || stop_read) // uh oh, stream is in trouble. 00627 break; 00628 00629 // make select not return true until more data is available 00630 queuemin(inbuf.used() + 1); 00631 00632 // compute remaining timeout 00633 if (wait_msec > 0) 00634 { 00635 wait_msec = msecdiff(timeout_time, wvtime()); 00636 if (wait_msec < 0) 00637 wait_msec = 0; 00638 } 00639 00640 // FIXME: this is blocking_getline. It shouldn't 00641 // call continue_select()! 00642 bool hasdata; 00643 if (wait_msec != 0 && uses_continue_select) 00644 hasdata = continue_select(wait_msec); 00645 else 00646 hasdata = select(wait_msec, true, false); 00647 00648 if (!isok()) 00649 break; 00650 00651 if (hasdata) 00652 { 00653 // read a few bytes 00654 WvDynBuf tmp; 00655 unsigned char *buf = tmp.alloc(readahead); 00656 assert(buf); 00657 size_t len = uread(buf, readahead); 00658 tmp.unalloc(readahead - len); 00659 inbuf.put(tmp.get(len), len); 00660 hasdata = len > 0; // enough? 00661 } 00662 00663 if (!isok()) 00664 break; 00665 00666 if (!hasdata && wait_msec == 0) 00667 return NULL; // handle timeout 00668 } 00669 if (!inbuf.used()) 00670 return NULL; 00671 00672 // return the appropriate data 00673 size_t i = 0; 00674 i = inbuf.strchr(separator); 00675 if (i > 0) { 00676 char *eol = (char *)inbuf.mutablepeek(i - 1, 1); 00677 assert(eol && *eol == separator); 00678 *eol = 0; 00679 return const_cast<char*>((const char *)inbuf.get(i)); 00680 } else { 00681 // handle "EOF without newline" condition 00682 // FIXME: it's very silly that buffers can't return editable 00683 // char* arrays. 00684 inbuf.alloc(1)[0] = 0; // null-terminate it 00685 return const_cast<char *>((const char *)inbuf.get(inbuf.used())); 00686 } 00687 } 00688 00689 00690 char *WvStream::continue_getline(time_t wait_msec, int separator, 00691 int readahead) 00692 { 00693 assert(false && "not implemented, come back later!"); 00694 assert(uses_continue_select); 00695 return NULL; 00696 } 00697 00698 00699 void WvStream::drain() 00700 { 00701 char buf[1024]; 00702 while (isreadable()) 00703 read(buf, sizeof(buf)); 00704 } 00705 00706 00707 bool WvStream::flush(time_t msec_timeout) 00708 { 00709 if (is_flushing) return false; 00710 00711 TRACE("%p flush starts\n", this); 00712 00713 is_flushing = true; 00714 want_to_flush = true; 00715 bool done = flush_internal(msec_timeout) // any other internal buffers 00716 && flush_outbuf(msec_timeout); // our own outbuf 00717 is_flushing = false; 00718 00719 TRACE("flush stops (%d)\n", done); 00720 return done; 00721 } 00722 00723 00724 bool WvStream::should_flush() 00725 { 00726 return want_to_flush; 00727 } 00728 00729 00730 bool WvStream::flush_outbuf(time_t msec_timeout) 00731 { 00732 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok()); 00733 bool outbuf_was_used = outbuf.used(); 00734 00735 // do-nothing shortcut for speed 00736 // FIXME: definitely makes a "measurable" difference... 00737 // but is it worth the risk? 00738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush) 00739 { 00740 maybe_autoclose(); 00741 return true; 00742 } 00743 00744 WvTime stoptime = msecadd(wvtime(), msec_timeout); 00745 00746 // flush outbuf 00747 while (outbuf_was_used && isok()) 00748 { 00749 // fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 00750 // this, getrfd(), getwfd(), outbuf.used()); 00751 00752 size_t attempt = outbuf.optgettable(); 00753 size_t real = uwrite(outbuf.get(attempt), attempt); 00754 00755 // WARNING: uwrite() may have messed up our outbuf! 00756 // This probably only happens if uwrite() closed the stream because 00757 // of an error, so we'll check isok(). 00758 if (isok() && real < attempt) 00759 { 00760 TRACE("flush_outbuf: unget %d-%d\n", attempt, real); 00761 assert(outbuf.ungettable() >= attempt - real); 00762 outbuf.unget(attempt - real); 00763 } 00764 00765 // since post_select() can call us, and select() calls post_select(), 00766 // we need to be careful not to call select() if we don't need to! 00767 // post_select() will only call us with msec_timeout==0, and we don't 00768 // need to do select() in that case anyway. 00769 if (!msec_timeout) 00770 break; 00771 if (msec_timeout >= 0 00772 && (stoptime < wvtime() || !select(msec_timeout, false, true))) 00773 break; 00774 00775 outbuf_was_used = outbuf.used(); 00776 } 00777 00778 // handle autoclose 00779 if (autoclose_time && isok()) 00780 { 00781 time_t now = time(NULL); 00782 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 00783 this, now - autoclose_time, outbuf.used()); 00784 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time) 00785 { 00786 autoclose_time = 0; // avoid infinite recursion! 00787 close(); 00788 } 00789 } 00790 00791 TRACE("flush_outbuf: after autoclose chunk\n"); 00792 if (outbuf_delayed_flush && !outbuf_was_used) 00793 want_to_flush = false; 00794 00795 TRACE("flush_outbuf: now isok=%d\n", isok()); 00796 00797 // if we can't flush the outbuf, at least empty it! 00798 if (outbuf_was_used && !isok()) 00799 outbuf.zap(); 00800 00801 maybe_autoclose(); 00802 TRACE("flush_outbuf stops\n"); 00803 00804 return !outbuf_was_used; 00805 } 00806 00807 00808 bool WvStream::flush_internal(time_t msec_timeout) 00809 { 00810 // once outbuf emptied, that's it for most streams 00811 return true; 00812 } 00813 00814 00815 int WvStream::getrfd() const 00816 { 00817 return -1; 00818 } 00819 00820 00821 int WvStream::getwfd() const 00822 { 00823 return -1; 00824 } 00825 00826 00827 void WvStream::flush_then_close(int msec_timeout) 00828 { 00829 time_t now = time(NULL); 00830 autoclose_time = now + (msec_timeout + 999) / 1000; 00831 00832 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 00833 this, outbuf.used(), autoclose_time - now); 00834 00835 // as a fast track, we _could_ close here: but that's not a good idea, 00836 // since flush_then_close() deals with obscure situations, and we don't 00837 // want the caller to use it incorrectly. So we make things _always_ 00838 // break when the caller forgets to call select() later. 00839 00840 flush(0); 00841 } 00842 00843 00844 void WvStream::pre_select(SelectInfo &si) 00845 { 00846 maybe_autoclose(); 00847 00848 time_t alarmleft = alarm_remaining(); 00849 00850 if (!isok() || (!si.inherit_request && alarmleft == 0)) 00851 { 00852 si.msec_timeout = 0; 00853 return; // alarm has rung 00854 } 00855 00856 if (!si.inherit_request) 00857 { 00858 si.wants.readable |= static_cast<bool>(readcb); 00859 si.wants.writable |= static_cast<bool>(writecb); 00860 si.wants.isexception |= static_cast<bool>(exceptcb); 00861 } 00862 00863 // handle read-ahead buffering 00864 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min) 00865 { 00866 si.msec_timeout = 0; // already ready 00867 return; 00868 } 00869 if (alarmleft >= 0 00870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0)) 00871 si.msec_timeout = alarmleft + 10; 00872 } 00873 00874 00875 bool WvStream::post_select(SelectInfo &si) 00876 { 00877 if (!si.inherit_request) 00878 { 00879 si.wants.readable |= static_cast<bool>(readcb); 00880 si.wants.writable |= static_cast<bool>(writecb); 00881 si.wants.isexception |= static_cast<bool>(exceptcb); 00882 } 00883 00884 // FIXME: need sane buffer flush support for non FD-based streams 00885 // FIXME: need read_requires_writable and write_requires_readable 00886 // support for non FD-based streams 00887 00888 // note: flush(nonzero) might call select(), but flush(0) never does, 00889 // so this is safe. 00890 if (should_flush()) 00891 flush(0); 00892 if (!si.inherit_request && alarm_remaining() == 0) 00893 return true; // alarm ticked 00894 if ((si.wants.readable || (!si.inherit_request && readcb)) 00895 && inbuf.used() && inbuf.used() >= queue_min) 00896 return true; // already ready 00897 return false; 00898 } 00899 00900 00901 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout, 00902 bool readable, bool writable, bool isexcept, bool forceable) 00903 { 00904 FD_ZERO(&si.read); 00905 FD_ZERO(&si.write); 00906 FD_ZERO(&si.except); 00907 00908 if (forceable) 00909 { 00910 si.wants.readable = readcb; 00911 si.wants.writable = writecb; 00912 si.wants.isexception = exceptcb; 00913 } 00914 else 00915 { 00916 si.wants.readable = readable; 00917 si.wants.writable = writable; 00918 si.wants.isexception = isexcept; 00919 } 00920 00921 si.max_fd = -1; 00922 si.msec_timeout = msec_timeout; 00923 si.inherit_request = ! forceable; 00924 si.global_sure = false; 00925 00926 wvstime_sync(); 00927 00928 pre_select(si); 00929 if (globalstream && forceable && (globalstream != this)) 00930 { 00931 WvStream *s = globalstream; 00932 globalstream = NULL; // prevent recursion 00933 s->xpre_select(si, SelectRequest(false, false, false)); 00934 globalstream = s; 00935 } 00936 } 00937 00938 00939 int WvStream::_do_select(SelectInfo &si) 00940 { 00941 // prepare timeout 00942 timeval tv; 00943 tv.tv_sec = si.msec_timeout / 1000; 00944 tv.tv_usec = (si.msec_timeout % 1000) * 1000; 00945 00946 #ifdef _WIN32 00947 // selecting on an empty set of sockets doesn't cause a delay in win32. 00948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0); 00949 FD_SET(fakefd, &si.except); 00950 #endif 00951 00952 // block 00953 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except, 00954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL); 00955 00956 // handle errors. 00957 // EAGAIN and EINTR don't matter because they're totally normal. 00958 // ENOBUFS is hopefully transient. 00959 // EBADF is kind of gross and might imply that something is wrong, 00960 // but it happens sometimes... 00961 if (sel < 0 00962 && errno != EAGAIN && errno != EINTR 00963 && errno != EBADF 00964 && errno != ENOBUFS 00965 ) 00966 { 00967 seterr(errno); 00968 } 00969 #ifdef _WIN32 00970 ::close(fakefd); 00971 #endif 00972 TRACE("select() returned %d\n", sel); 00973 return sel; 00974 } 00975 00976 00977 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable) 00978 { 00979 // We cannot move the clock backward here, because timers that 00980 // were expired in pre_select could then not be expired anymore, 00981 // and while time going backward is rather unsettling in general, 00982 // for it to be happening between pre_select and post_select is 00983 // just outright insanity. 00984 wvstime_sync_forward(); 00985 00986 bool sure = post_select(si); 00987 if (globalstream && forceable && (globalstream != this)) 00988 { 00989 WvStream *s = globalstream; 00990 globalstream = NULL; // prevent recursion 00991 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false)) 00992 || si.global_sure; 00993 globalstream = s; 00994 } 00995 return sure; 00996 } 00997 00998 00999 bool WvStream::_select(time_t msec_timeout, bool readable, bool writable, 01000 bool isexcept, bool forceable) 01001 { 01002 // Detect use of deleted stream 01003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end())); 01004 01005 SelectInfo si; 01006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept, 01007 forceable); 01008 01009 bool sure = false; 01010 int sel = _do_select(si); 01011 if (sel >= 0) 01012 sure = _process_selectinfo(si, forceable); 01013 if (si.global_sure && globalstream && forceable && (globalstream != this)) 01014 globalstream->callback(); 01015 01016 return sure; 01017 } 01018 01019 01020 IWvStream::SelectRequest WvStream::get_select_request() 01021 { 01022 return IWvStream::SelectRequest(readcb, writecb, exceptcb); 01023 } 01024 01025 01026 void WvStream::force_select(bool readable, bool writable, bool isexception) 01027 { 01028 if (readable) 01029 readcb = wv::bind(&WvStream::legacy_callback, this); 01030 if (writable) 01031 writecb = wv::bind(&WvStream::legacy_callback, this); 01032 if (isexception) 01033 exceptcb = wv::bind(&WvStream::legacy_callback, this); 01034 } 01035 01036 01037 void WvStream::undo_force_select(bool readable, bool writable, bool isexception) 01038 { 01039 if (readable) 01040 readcb = 0; 01041 if (writable) 01042 writecb = 0; 01043 if (isexception) 01044 exceptcb = 0; 01045 } 01046 01047 01048 void WvStream::alarm(time_t msec_timeout) 01049 { 01050 if (msec_timeout >= 0) 01051 alarm_time = msecadd(wvstime(), msec_timeout); 01052 else 01053 alarm_time = wvtime_zero; 01054 } 01055 01056 01057 time_t WvStream::alarm_remaining() 01058 { 01059 if (alarm_time.tv_sec) 01060 { 01061 WvTime now = wvstime(); 01062 01063 // Time is going backward! 01064 if (now < last_alarm_check) 01065 { 01066 #if 0 // okay, I give up. Time just plain goes backwards on some systems. 01067 // warn only if it's a "big" difference (sigh...) 01068 if (msecdiff(last_alarm_check, now) > 200) 01069 fprintf(stderr, " ************* TIME WENT BACKWARDS! " 01070 "(%ld:%ld %ld:%ld)\n", 01071 last_alarm_check.tv_sec, last_alarm_check.tv_usec, 01072 now.tv_sec, now.tv_usec); 01073 #endif 01074 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now)); 01075 } 01076 01077 last_alarm_check = now; 01078 01079 time_t remaining = msecdiff(alarm_time, now); 01080 if (remaining < 0) 01081 remaining = 0; 01082 return remaining; 01083 } 01084 return -1; 01085 } 01086 01087 01088 bool WvStream::continue_select(time_t msec_timeout) 01089 { 01090 assert(uses_continue_select); 01091 01092 // if this assertion triggers, you probably tried to do continue_select() 01093 // while inside terminate_continue_select(). 01094 assert(call_ctx); 01095 01096 if (msec_timeout >= 0) 01097 alarm(msec_timeout); 01098 01099 alarm(msec_timeout); 01100 WvCont::yield(); 01101 alarm(-1); // cancel the still-pending alarm, or it might go off later! 01102 01103 // when we get here, someone has jumped back into our task. 01104 // We have to select(0) here because it's possible that the alarm was 01105 // ticking _and_ data was available. This is aggravated especially if 01106 // msec_delay was zero. Note that running select() here isn't 01107 // inefficient, because if the alarm was expired then pre_select() 01108 // returned true anyway and short-circuited the previous select(). 01109 TRACE("hello-%p\n", this); 01110 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb); 01111 } 01112 01113 01114 void WvStream::terminate_continue_select() 01115 { 01116 close(); 01117 call_ctx = 0; // destroy the context, if necessary 01118 } 01119 01120 01121 const WvAddr *WvStream::src() const 01122 { 01123 return NULL; 01124 } 01125 01126 01127 void WvStream::setcallback(IWvStreamCallback _callfunc) 01128 { 01129 callfunc = _callfunc; 01130 call_ctx = 0; // delete any in-progress WvCont 01131 } 01132 01133 01134 void WvStream::legacy_callback() 01135 { 01136 execute(); 01137 if (!!callfunc) 01138 callfunc(); 01139 } 01140 01141 01142 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback) 01143 { 01144 IWvStreamCallback tmp = readcb; 01145 01146 readcb = _callback; 01147 01148 return tmp; 01149 } 01150 01151 01152 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback) 01153 { 01154 IWvStreamCallback tmp = writecb; 01155 01156 writecb = _callback; 01157 01158 return tmp; 01159 } 01160 01161 01162 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback) 01163 { 01164 IWvStreamCallback tmp = exceptcb; 01165 01166 exceptcb = _callback; 01167 01168 return tmp; 01169 } 01170 01171 01172 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback) 01173 { 01174 IWvStreamCallback tmp = closecb; 01175 if (isok()) 01176 closecb = _callback; 01177 else 01178 { 01179 // already closed? notify immediately! 01180 closecb = 0; 01181 if (!!_callback) 01182 _callback(); 01183 } 01184 return tmp; 01185 } 01186 01187 01188 void WvStream::unread(WvBuf &unreadbuf, size_t count) 01189 { 01190 WvDynBuf tmp; 01191 tmp.merge(unreadbuf, count); 01192 tmp.merge(inbuf); 01193 inbuf.zap(); 01194 inbuf.merge(tmp); 01195 } 01196 01197 01198 IWvStream *WvStream::find_by_wsid(WSID wsid) 01199 { 01200 IWvStream *retval = NULL; 01201 01202 if (wsid_map) 01203 { 01204 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid); 01205 01206 if (it != wsid_map->end()) 01207 retval = it->second; 01208 } 01209 01210 return retval; 01211 }