WvStreams
|
00001 /* -*- Mode: C++ -*- 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * Provides basic streaming I/O support. 00006 */ 00007 #ifndef __WVSTREAM_H 00008 #define __WVSTREAM_H 00009 00010 #include "iwvstream.h" 00011 #include "wvtimeutils.h" 00012 #include "wvstreamsdebugger.h" 00013 #include <errno.h> 00014 #include <limits.h> 00015 #include "wvattrs.h" 00016 00024 class WvStream: public IWvStream 00025 { 00026 IMPLEMENT_IOBJECT(WvStream); 00027 00028 WvString my_wsname; 00029 WSID my_wsid; 00030 WvAttrs attrs; 00031 public: 00036 WvStream *read_requires_writable; 00037 00042 WvStream *write_requires_readable; 00043 00045 bool uses_continue_select; 00046 00048 size_t personal_stack_size; 00049 00054 bool alarm_was_ticking; 00055 00057 bool stop_read, stop_write, closed; 00058 00060 WvStream(); 00061 virtual ~WvStream(); 00062 00070 virtual void close(); 00071 00073 virtual void seterr(int _errnum); 00074 void seterr(WvStringParm specialerr) 00075 { WvErrorBase::seterr(specialerr); } 00076 void seterr(WVSTRING_FORMAT_DECL) 00077 { seterr(WvString(WVSTRING_FORMAT_CALL)); } 00078 00080 virtual bool isok() const; 00081 00083 virtual size_t read(void *buf, size_t count); 00084 00094 virtual size_t read(WvBuf &outbuf, size_t count); 00095 00101 virtual void unread(WvBuf &outbuf, size_t count); 00102 00109 virtual size_t write(const void *buf, size_t count); 00110 00118 virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX); 00119 00129 void outbuf_limit(size_t size) 00130 { max_outbuf_size = size; } 00131 00132 virtual void noread(); 00133 virtual void nowrite(); 00134 virtual void maybe_autoclose(); 00135 00136 virtual bool isreadable(); 00137 virtual bool iswritable(); 00138 00146 virtual size_t uread(void *buf, size_t count) 00147 { return 0; /* basic WvStream doesn't actually do anything! */ } 00148 00156 virtual size_t uwrite(const void *buf, size_t count) 00157 { return count; /* basic WvStream doesn't actually do anything! */ } 00158 00175 char *getline(time_t wait_msec = 0, 00176 char separator = '\n', int readahead = 1024) 00177 { 00178 return blocking_getline(wait_msec, separator, readahead); 00179 } 00180 00182 char *getline(int wait_msec, 00183 char separator = '\n', int readahead = 1024) 00184 { 00185 return getline(time_t(wait_msec), separator, readahead); 00186 } 00187 00189 char *getline(double wait_msec, 00190 char separator = '\n', int readahead = 1024) 00191 { 00192 return getline(time_t(wait_msec), separator, readahead); 00193 } 00194 00195 private: 00200 char *getline(char, int i = 0); 00201 char *getline(bool, int i = 0); 00202 public: 00203 00215 char *blocking_getline(time_t wait_msec, int separator = '\n', 00216 int readahead = 1024); 00217 00222 char *continue_getline(time_t wait_msec, int separator = '\n', 00223 int readahead = 1024); 00224 00232 void queuemin(size_t count) 00233 { queue_min = count; } 00234 00239 void drain(); 00240 00246 void delay_output(bool is_delayed) 00247 { 00248 outbuf_delayed_flush = is_delayed; 00249 want_to_flush = !is_delayed; 00250 } 00251 00258 void auto_flush(bool is_automatic) 00259 { is_auto_flush = is_automatic; } 00260 00267 virtual bool flush(time_t msec_timeout); 00268 00269 virtual bool should_flush(); 00270 00277 void flush_then_close(int msec_timeout); 00278 00300 virtual void pre_select(SelectInfo &si); 00301 00306 void pre_select(SelectInfo &si, const SelectRequest &r) 00307 { 00308 SelectRequest oldwant = si.wants; 00309 si.wants = r; 00310 pre_select(si); 00311 si.wants = oldwant; 00312 } 00313 00318 void xpre_select(SelectInfo &si, const SelectRequest &r) 00319 { pre_select(si, r); } 00320 00333 virtual bool post_select(SelectInfo &si); 00334 00339 bool xpost_select(SelectInfo &si, const SelectRequest &r) 00340 { return post_select(si, r); } 00341 00346 bool post_select(SelectInfo &si, const SelectRequest &r) 00347 { 00348 SelectRequest oldwant = si.wants; 00349 si.wants = r; 00350 bool val = post_select(si); 00351 si.wants = oldwant; 00352 return val; 00353 } 00354 00376 bool select(time_t msec_timeout) 00377 { return _select(msec_timeout, false, false, false, true); } 00378 00391 void runonce(time_t msec_timeout = -1) 00392 { if (select(msec_timeout)) callback(); } 00393 00415 bool select(time_t msec_timeout, 00416 bool readable, bool writable, bool isex = false) 00417 { return _select(msec_timeout, readable, writable, isex, false); } 00418 00424 IWvStream::SelectRequest get_select_request(); 00425 00434 void force_select(bool readable, bool writable, bool isexception = false); 00435 00440 void undo_force_select(bool readable, bool writable, 00441 bool isexception = false); 00442 00460 bool continue_select(time_t msec_timeout); 00461 00467 void terminate_continue_select(); 00468 00473 virtual const WvAddr *src() const; 00474 00479 void setcallback(IWvStreamCallback _callfunc); 00480 00482 IWvStreamCallback setreadcallback(IWvStreamCallback _callback); 00483 00485 IWvStreamCallback setwritecallback(IWvStreamCallback _callback); 00486 00489 IWvStreamCallback setexceptcallback(IWvStreamCallback _callback); 00490 00492 IWvStreamCallback setclosecallback(IWvStreamCallback _callback); 00493 00499 void autoforward(WvStream &s); 00500 00502 void noautoforward(); 00503 static void autoforward_callback(WvStream &input, WvStream &output); 00504 00508 void *_callwrap(void *); 00509 00513 void _callback(); 00514 00519 virtual void callback(); 00520 00525 void alarm(time_t msec_timeout); 00526 00532 time_t alarm_remaining(); 00533 00538 size_t write(WvStringParm s) 00539 { return write(s.cstr(), s.len()); } 00540 size_t print(WvStringParm s) 00541 { return write(s); } 00542 size_t operator() (WvStringParm s) 00543 { return write(s); } 00544 00546 size_t print(WVSTRING_FORMAT_DECL) 00547 { return write(WvString(WVSTRING_FORMAT_CALL)); } 00548 size_t operator() (WVSTRING_FORMAT_DECL) 00549 { return write(WvString(WVSTRING_FORMAT_CALL)); } 00550 00551 const char *wsname() const 00552 { return my_wsname; } 00553 void set_wsname(WvStringParm wsname) 00554 { my_wsname = wsname; } 00555 void set_wsname(WVSTRING_FORMAT_DECL) 00556 { set_wsname(WvString(WVSTRING_FORMAT_CALL)); } 00557 00558 const char *wstype() const { return "WvStream"; } 00559 00560 WSID wsid() const { return my_wsid; } 00561 static IWvStream *find_by_wsid(WSID wsid); 00562 00563 virtual WvString getattr(WvStringParm name) const 00564 { return attrs.get(name); } 00565 00566 // ridiculous hackery for now so that the wvstream unit test can poke 00567 // around in the insides of WvStream. Eventually, inbuf will go away 00568 // from the base WvStream class, so nothing like this will be needed. 00569 #ifdef __WVSTREAM_UNIT_TEST 00570 public: 00571 size_t outbuf_used() 00572 { return outbuf.used(); } 00573 size_t inbuf_used() 00574 { return inbuf.used(); } 00575 void inbuf_putstr(WvStringParm t) 00576 { inbuf.putstr(t); } 00577 #endif 00578 00579 protected: 00580 void setattr(WvStringParm name, WvStringParm value) 00581 { attrs.set(name, value); } 00582 // builds the SelectInfo data structure (runs pre_select) 00583 // returns true if there are callbacks to be dispatched 00584 // 00585 // all of the fields are filled in with new values 00586 // si.msec_timeout contains the time until the next alarm expires 00587 void _build_selectinfo(SelectInfo &si, time_t msec_timeout, 00588 bool readable, bool writable, bool isexcept, 00589 bool forceable); 00590 00591 // runs the actual select() function over the given 00592 // SelectInfo data structure, returns the number of descriptors 00593 // in the set, and sets the error code if a problem occurs 00594 int _do_select(SelectInfo &si); 00595 00596 // processes the SelectInfo data structure (runs post_select) 00597 // returns true if there are callbacks to be dispatched 00598 bool _process_selectinfo(SelectInfo &si, bool forceable); 00599 00600 // tries to empty the output buffer if the stream is writable 00601 // not quite the same as flush() since it merely empties the output 00602 // buffer asynchronously whereas flush() might have other semantics 00603 // also handles autoclose (eg. after flush) 00604 bool flush_outbuf(time_t msec_timeout); 00605 00606 // called once flush() has emptied outbuf to ensure that any other 00607 // internal stream buffers actually do get flushed before it returns 00608 virtual bool flush_internal(time_t msec_timeout); 00609 00610 // the real implementations for these are actually in WvFDStream, which 00611 // is where they belong. By IWvStream needs them to exist for now, so 00612 // it's a hack. In standard WvStream they return -1. 00613 virtual int getrfd() const; 00614 virtual int getwfd() const; 00615 00616 // FIXME: this one is so bad, I'm not touching it. Quick hack to 00617 // make it work anyway. 00618 friend class WvHTTPClientProxyStream; 00619 00620 WvDynBuf inbuf, outbuf; 00621 00622 IWvStreamCallback callfunc; 00623 wv::function<void*(void*)> call_ctx; 00624 00625 IWvStreamCallback readcb, writecb, exceptcb, closecb; 00626 00627 size_t max_outbuf_size; 00628 bool outbuf_delayed_flush; 00629 bool is_auto_flush; 00630 00631 // Used to guard against excessive flushing when using delay_flush 00632 bool want_to_flush; 00633 00634 // Used to ensure we don't flush recursively. 00635 bool is_flushing; 00636 00637 size_t queue_min; // minimum bytes to read() 00638 time_t autoclose_time; // close eventually, even if output is queued 00639 WvTime alarm_time; // select() returns true at this time 00640 WvTime last_alarm_check; // last time we checked the alarm_remaining 00641 00652 virtual void execute() 00653 { } 00654 00655 // every call to select() selects on the globalstream. 00656 static WvStream *globalstream; 00657 00658 static void debugger_streams_display_header(WvStringParm cmd, 00659 WvStreamsDebugger::ResultCallback result_cb); 00660 static void debugger_streams_display_one_stream(WvStream *s, 00661 WvStringParm cmd, 00662 WvStreamsDebugger::ResultCallback result_cb); 00663 static void debugger_streams_maybe_display_one_stream(WvStream *s, 00664 WvStringParm cmd, 00665 const WvStringList &args, 00666 WvStreamsDebugger::ResultCallback result_cb); 00667 00668 private: 00670 bool _select(time_t msec_timeout, 00671 bool readable, bool writable, bool isexcept, 00672 bool forceable); 00673 00674 void legacy_callback(); 00675 00677 WvStream(const WvStream &s); 00678 WvStream& operator= (const WvStream &s); 00679 static void add_debugger_commands(); 00680 00681 static WvString debugger_streams_run_cb(WvStringParm cmd, 00682 WvStringList &args, 00683 WvStreamsDebugger::ResultCallback result_cb, void *); 00684 static WvString debugger_close_run_cb(WvStringParm cmd, 00685 WvStringList &args, 00686 WvStreamsDebugger::ResultCallback result_cb, void *); 00687 }; 00688 00695 extern WvStream *wvcon; // tied stdin and stdout stream 00696 extern WvStream *wvin; // stdin stream 00697 extern WvStream *wvout; // stdout stream 00698 extern WvStream *wverr; // stderr stream 00699 00700 #endif // __WVSTREAM_H