WvStreams
wvstream.h
00001 /* -*- Mode: C++ -*-
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Provides basic streaming I/O support.
00006  */ 
00007 #ifndef __WVSTREAM_H
00008 #define __WVSTREAM_H
00009 
00010 #include "iwvstream.h"
00011 #include "wvtimeutils.h"
00012 #include "wvstreamsdebugger.h"
00013 #include <errno.h>
00014 #include <limits.h>
00015 #include "wvattrs.h"
00016 
00024 class WvStream: public IWvStream
00025 {
00026     IMPLEMENT_IOBJECT(WvStream);
00027 
00028     WvString my_wsname;
00029     WSID my_wsid;
00030     WvAttrs attrs;
00031 public:
00036     WvStream *read_requires_writable;
00037 
00042     WvStream *write_requires_readable;
00043     
00045     bool uses_continue_select;
00046 
00048     size_t personal_stack_size;
00049 
00054     bool alarm_was_ticking;
00055     
00057     bool stop_read, stop_write, closed;
00058     
00060     WvStream();
00061     virtual ~WvStream();
00062 
00070     virtual void close();
00071 
00073     virtual void seterr(int _errnum);
00074     void seterr(WvStringParm specialerr)
00075         { WvErrorBase::seterr(specialerr); }
00076     void seterr(WVSTRING_FORMAT_DECL)
00077         { seterr(WvString(WVSTRING_FORMAT_CALL)); }
00078     
00080     virtual bool isok() const;
00081     
00083     virtual size_t read(void *buf, size_t count);
00084 
00094     virtual size_t read(WvBuf &outbuf, size_t count);
00095 
00101     virtual void unread(WvBuf &outbuf, size_t count);
00102 
00109     virtual size_t write(const void *buf, size_t count);
00110 
00118     virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
00119 
00129     void outbuf_limit(size_t size)
00130         { max_outbuf_size = size; }
00131 
00132     virtual void noread();
00133     virtual void nowrite();
00134     virtual void maybe_autoclose();
00135     
00136     virtual bool isreadable();
00137     virtual bool iswritable();
00138     
00146     virtual size_t uread(void *buf, size_t count)
00147         { return 0; /* basic WvStream doesn't actually do anything! */ }
00148 
00156     virtual size_t uwrite(const void *buf, size_t count)
00157         { return count; /* basic WvStream doesn't actually do anything! */ }
00158 
00175     char *getline(time_t wait_msec = 0,
00176                   char separator = '\n', int readahead = 1024)
00177     {
00178         return blocking_getline(wait_msec, separator, readahead);
00179     }
00180 
00182     char *getline(int wait_msec,
00183                   char separator = '\n', int readahead = 1024)
00184     {
00185         return getline(time_t(wait_msec), separator, readahead);
00186     }
00187 
00189     char *getline(double wait_msec,
00190                   char separator = '\n', int readahead = 1024)
00191     {
00192         return getline(time_t(wait_msec), separator, readahead);
00193     }
00194 
00195 private:
00200     char *getline(char, int i = 0);
00201     char *getline(bool, int i = 0);
00202 public:
00203 
00215     char *blocking_getline(time_t wait_msec, int separator = '\n',
00216                            int readahead = 1024);
00217 
00222     char *continue_getline(time_t wait_msec, int separator = '\n',
00223                            int readahead = 1024);
00224 
00232     void queuemin(size_t count)
00233         { queue_min = count; }
00234 
00239     void drain();
00240     
00246     void delay_output(bool is_delayed)
00247     {
00248         outbuf_delayed_flush = is_delayed;
00249         want_to_flush = !is_delayed;
00250     }
00251 
00258     void auto_flush(bool is_automatic)
00259         { is_auto_flush = is_automatic; }
00260 
00267     virtual bool flush(time_t msec_timeout);
00268 
00269     virtual bool should_flush();
00270 
00277     void flush_then_close(int msec_timeout);
00278     
00300     virtual void pre_select(SelectInfo &si);
00301     
00306     void pre_select(SelectInfo &si, const SelectRequest &r)
00307     {
00308         SelectRequest oldwant = si.wants;
00309         si.wants = r;
00310         pre_select(si);
00311         si.wants = oldwant;
00312     }
00313     
00318     void xpre_select(SelectInfo &si, const SelectRequest &r)
00319         { pre_select(si, r); }
00320     
00333     virtual bool post_select(SelectInfo &si);
00334 
00339     bool xpost_select(SelectInfo &si, const SelectRequest &r)
00340         { return post_select(si, r); }
00341     
00346     bool post_select(SelectInfo &si, const SelectRequest &r)
00347     {
00348         SelectRequest oldwant = si.wants;
00349         si.wants = r;
00350         bool val = post_select(si);
00351         si.wants = oldwant;
00352         return val;
00353     }
00354     
00376     bool select(time_t msec_timeout)
00377         { return _select(msec_timeout, false, false, false, true); }
00378     
00391     void runonce(time_t msec_timeout = -1)
00392         { if (select(msec_timeout)) callback(); }
00393     
00415     bool select(time_t msec_timeout,
00416                 bool readable, bool writable, bool isex = false)
00417         { return _select(msec_timeout, readable, writable, isex, false); }
00418 
00424     IWvStream::SelectRequest get_select_request();
00425 
00434     void force_select(bool readable, bool writable, bool isexception = false);
00435     
00440     void undo_force_select(bool readable, bool writable,
00441                            bool isexception = false);
00442     
00460     bool continue_select(time_t msec_timeout);
00461     
00467     void terminate_continue_select();
00468 
00473     virtual const WvAddr *src() const;
00474     
00479     void setcallback(IWvStreamCallback _callfunc);
00480         
00482     IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
00483 
00485     IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
00486 
00489     IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
00490 
00492     IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
00493 
00499     void autoforward(WvStream &s);
00500 
00502     void noautoforward();
00503     static void autoforward_callback(WvStream &input, WvStream &output);
00504     
00508     void *_callwrap(void *);
00509     
00513     void _callback();
00514     
00519     virtual void callback();
00520     
00525     void alarm(time_t msec_timeout);
00526 
00532     time_t alarm_remaining();
00533 
00538     size_t write(WvStringParm s)
00539         { return write(s.cstr(), s.len()); }
00540     size_t print(WvStringParm s)
00541         { return write(s); }
00542     size_t operator() (WvStringParm s)
00543         { return write(s); }
00544 
00546     size_t print(WVSTRING_FORMAT_DECL)
00547         { return write(WvString(WVSTRING_FORMAT_CALL)); }
00548     size_t operator() (WVSTRING_FORMAT_DECL)
00549         { return write(WvString(WVSTRING_FORMAT_CALL)); }
00550 
00551     const char *wsname() const
00552         { return my_wsname; }
00553     void set_wsname(WvStringParm wsname)
00554         { my_wsname = wsname; }
00555     void set_wsname(WVSTRING_FORMAT_DECL)
00556         { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
00557         
00558     const char *wstype() const { return "WvStream"; }
00559     
00560     WSID wsid() const { return my_wsid; }
00561     static IWvStream *find_by_wsid(WSID wsid);
00562 
00563     virtual WvString getattr(WvStringParm name) const
00564         { return attrs.get(name); }
00565 
00566     // ridiculous hackery for now so that the wvstream unit test can poke
00567     // around in the insides of WvStream.  Eventually, inbuf will go away
00568     // from the base WvStream class, so nothing like this will be needed.
00569 #ifdef __WVSTREAM_UNIT_TEST
00570 public:
00571     size_t outbuf_used() 
00572         { return outbuf.used(); }
00573     size_t inbuf_used()
00574         { return inbuf.used(); }
00575     void inbuf_putstr(WvStringParm t)
00576         { inbuf.putstr(t); }
00577 #endif
00578 
00579 protected:
00580     void setattr(WvStringParm name, WvStringParm value)
00581         {  attrs.set(name, value); }
00582     // builds the SelectInfo data structure (runs pre_select)
00583     // returns true if there are callbacks to be dispatched
00584     //
00585     // all of the fields are filled in with new values
00586     // si.msec_timeout contains the time until the next alarm expires
00587     void _build_selectinfo(SelectInfo &si, time_t msec_timeout,
00588         bool readable, bool writable, bool isexcept,
00589         bool forceable);
00590 
00591     // runs the actual select() function over the given
00592     // SelectInfo data structure, returns the number of descriptors
00593     // in the set, and sets the error code if a problem occurs
00594     int _do_select(SelectInfo &si);
00595 
00596     // processes the SelectInfo data structure (runs post_select)
00597     // returns true if there are callbacks to be dispatched
00598     bool _process_selectinfo(SelectInfo &si, bool forceable);
00599 
00600     // tries to empty the output buffer if the stream is writable
00601     // not quite the same as flush() since it merely empties the output
00602     // buffer asynchronously whereas flush() might have other semantics
00603     // also handles autoclose (eg. after flush)
00604     bool flush_outbuf(time_t msec_timeout);
00605 
00606     // called once flush() has emptied outbuf to ensure that any other
00607     // internal stream buffers actually do get flushed before it returns
00608     virtual bool flush_internal(time_t msec_timeout);
00609     
00610     // the real implementations for these are actually in WvFDStream, which
00611     // is where they belong.  By IWvStream needs them to exist for now, so
00612     // it's a hack.  In standard WvStream they return -1.
00613     virtual int getrfd() const;
00614     virtual int getwfd() const;
00615     
00616     // FIXME: this one is so bad, I'm not touching it. Quick hack to
00617     // make it work anyway.
00618     friend class WvHTTPClientProxyStream;
00619 
00620     WvDynBuf inbuf, outbuf;
00621 
00622     IWvStreamCallback callfunc;
00623     wv::function<void*(void*)> call_ctx;
00624 
00625     IWvStreamCallback readcb, writecb, exceptcb, closecb;
00626 
00627     size_t max_outbuf_size;
00628     bool outbuf_delayed_flush;
00629     bool is_auto_flush;
00630 
00631     // Used to guard against excessive flushing when using delay_flush
00632     bool want_to_flush;
00633 
00634     // Used to ensure we don't flush recursively.
00635     bool is_flushing;
00636 
00637     size_t queue_min;           // minimum bytes to read()
00638     time_t autoclose_time;      // close eventually, even if output is queued
00639     WvTime alarm_time;          // select() returns true at this time
00640     WvTime last_alarm_check;    // last time we checked the alarm_remaining
00641     
00652     virtual void execute()
00653         { }
00654     
00655     // every call to select() selects on the globalstream.
00656     static WvStream *globalstream;
00657 
00658     static void debugger_streams_display_header(WvStringParm cmd,
00659             WvStreamsDebugger::ResultCallback result_cb);
00660     static void debugger_streams_display_one_stream(WvStream *s,
00661             WvStringParm cmd,
00662             WvStreamsDebugger::ResultCallback result_cb);
00663     static void debugger_streams_maybe_display_one_stream(WvStream *s,
00664             WvStringParm cmd,
00665             const WvStringList &args,
00666             WvStreamsDebugger::ResultCallback result_cb);
00667  
00668 private:
00670     bool _select(time_t msec_timeout,
00671                  bool readable, bool writable, bool isexcept,
00672                  bool forceable);
00673 
00674     void legacy_callback();
00675 
00677     WvStream(const WvStream &s);
00678     WvStream& operator= (const WvStream &s);
00679     static void add_debugger_commands();
00680 
00681     static WvString debugger_streams_run_cb(WvStringParm cmd,
00682         WvStringList &args,
00683         WvStreamsDebugger::ResultCallback result_cb, void *);
00684     static WvString debugger_close_run_cb(WvStringParm cmd,
00685         WvStringList &args,
00686         WvStreamsDebugger::ResultCallback result_cb, void *);
00687 };
00688 
00695 extern WvStream *wvcon; // tied stdin and stdout stream
00696 extern WvStream *wvin;  // stdin stream
00697 extern WvStream *wvout; // stdout stream
00698 extern WvStream *wverr; // stderr stream
00699 
00700 #endif // __WVSTREAM_H