WvStreams
wvfdstream.cc
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  *
00005  * Base class for streams built on Unix file descriptors.
00006  */
00007 #include "wvfdstream.h"
00008 #include "wvmoniker.h"
00009 #include <fcntl.h>
00010 
00011 #ifndef _WIN32
00012 #include <sys/socket.h>
00013 
00014 inline bool isselectable(int fd)
00015 {
00016     return true;
00017 }
00018 
00019 #else // _WIN32
00020 
00021 #define getsockopt(a,b,c,d,e) getsockopt(a,b,c,(char *)d, e) 
00022 #define SHUT_RD SD_RECEIVE
00023 #define SHUT_WR SD_SEND
00024 #define ENOBUFS WSAENOBUFS
00025 #undef EAGAIN
00026 #define EAGAIN WSAEWOULDBLOCK
00027 
00028 #include "streams.h"
00029 
00030 #undef errno
00031 #define errno GetLastError()
00032 
00033 // in win32, only sockets can be in the FD_SET for select()
00034 static inline bool isselectable(int s)
00035 {
00036     // if _get_osfhandle() works, it's a msvcrt fd, not a winsock handle.
00037     // msvcrt fds can't be select()ed on correctly.
00038     return ((HANDLE)_get_osfhandle(s) == INVALID_HANDLE_VALUE) 
00039         ? true : false;
00040 }
00041 
00042 #endif // _WIN32
00043 
00044 
00045 /***** WvFdStream *****/
00046 
00047 static IWvStream *creator(WvStringParm s, IObject *)
00048 {
00049     return new WvFdStream(s.num());
00050 }
00051 
00052 static WvMoniker<IWvStream> reg("fd", creator);
00053 
00054 WvFdStream::WvFdStream(int _rwfd)
00055     : rfd(_rwfd), wfd(_rwfd)
00056 {
00057     shutdown_read = shutdown_write = false;
00058 }
00059 
00060 
00061 WvFdStream::WvFdStream(int _rfd, int _wfd)
00062     : rfd(_rfd), wfd(_wfd)
00063 {
00064     shutdown_read = shutdown_write = false;
00065 }
00066 
00067 
00068 WvFdStream::~WvFdStream()
00069 {
00070     close();
00071 }
00072 
00073 
00074 static int _cloexec(int fd, bool close_on_exec)
00075 {
00076 #ifndef _WIN32 // there is no exec() in win32, so this is meaningless there
00077     return fcntl(fd, F_SETFD, close_on_exec ? FD_CLOEXEC : 0);
00078 #else
00079     return 0;
00080 #endif
00081 }
00082 
00083 
00084 static int _nonblock(int fd, bool nonblock)
00085 {
00086 #ifndef _WIN32
00087     int flag = fcntl(fd, F_GETFL);
00088     return fcntl(fd, F_SETFL,
00089                  (flag & ~O_NONBLOCK) | (nonblock ? O_NONBLOCK : 0));
00090 #else
00091     u_long arg = nonblock ? 1 : 0;
00092     return ioctlsocket(fd, FIONBIO, &arg);
00093 #endif    
00094 }
00095 
00096 
00097 void WvFdStream::set_nonblock(bool nonblock)
00098 {
00099     int rfd = getrfd(), wfd = getwfd();
00100     if (rfd >= 0)
00101         _nonblock(rfd, nonblock);
00102     if (wfd >= 0 && rfd != wfd)
00103         _nonblock(wfd, nonblock);
00104 }
00105     
00106 
00107 void WvFdStream::set_close_on_exec(bool close_on_exec)
00108 {
00109     int rfd = getrfd(), wfd = getwfd();
00110     if (rfd >= 0)
00111         _cloexec(rfd, close_on_exec);
00112     if (wfd >= 0 && rfd != wfd)
00113         _cloexec(wfd, close_on_exec);
00114 }
00115 
00116 
00117 void WvFdStream::close()
00118 {
00119     // fprintf(stderr, "closing fdstream!\n");
00120     if (!closed)
00121     {
00122         WvStream::close();
00123         //fprintf(stderr, "closing%d:%d/%d\n", (int)this, rfd, wfd);
00124         if (rfd >= 0)
00125             ::close(rfd);
00126         if (wfd >= 0 && wfd != rfd)
00127             ::close(wfd);
00128         rfd = wfd = -1;
00129         //fprintf(stderr, "closed!\n");
00130     }
00131 }
00132 
00133 
00134 bool WvFdStream::isok() const
00135 {
00136     return WvStream::isok() && (rfd != -1 || wfd != -1);
00137 }
00138 
00139 
00140 size_t WvFdStream::uread(void *buf, size_t count)
00141 {
00142     assert(!count || buf);
00143     if (!count || !buf || !isok()) return 0;
00144     
00145     int in = ::read(rfd, buf, count);
00146     
00147     // a read that returns zero bytes signifies end-of-file (EOF).
00148     if (in <= 0)
00149     {
00150         if (in < 0 && (errno==EINTR || errno==EAGAIN || errno==ENOBUFS))
00151             return 0; // interrupted
00152 
00153         seterr(in < 0 ? errno : 0);
00154         return 0;
00155     }
00156 
00157     // fprintf(stderr, "read %d bytes\n", in);
00158     return in;
00159 }
00160 
00161 
00162 size_t WvFdStream::uwrite(const void *buf, size_t count)
00163 {
00164     assert(!count || buf);
00165     if (!buf || !count || !isok()) return 0;
00166     // fprintf(stderr, "write %d bytes\n", count);
00167     
00168     int out = ::write(wfd, buf, count);
00169     
00170     if (out <= 0)
00171     {
00172         int err = errno;
00173         // fprintf(stderr, "(fd%d-err-%d)", wfd, err);
00174         if (out < 0 && (err == ENOBUFS || err==EAGAIN))
00175             return 0; // kernel buffer full - data not written (yet!)
00176     
00177         seterr(out < 0 ? err : 0); // a more critical error
00178         return 0;
00179     }
00180 
00181     //TRACE("write obj 0x%08x, bytes %d/%d\n", (unsigned int)this, out, count);
00182     return out;
00183 }
00184 
00185 
00186 void WvFdStream::maybe_autoclose()
00187 {
00188     if (stop_write && !shutdown_write && !outbuf.used())
00189     {
00190         shutdown_write = true;
00191         if (wfd < 0)
00192             return;
00193         if (rfd != wfd)
00194             ::close(wfd);
00195         else
00196             ::shutdown(wfd, SHUT_WR); // might be a socket        
00197         wfd = -1;
00198     }
00199     
00200     if (stop_read && !shutdown_read && !inbuf.used())
00201     {
00202         shutdown_read = true;
00203         if (rfd != wfd)
00204             ::close(rfd);
00205         else
00206             ::shutdown(rfd, SHUT_RD); // might be a socket
00207         rfd = -1;
00208     }
00209     
00210     WvStream::maybe_autoclose();
00211 }
00212 
00213 
00214 void WvFdStream::pre_select(SelectInfo &si)
00215 {
00216     WvStream::pre_select(si);
00217     
00218 #if 0
00219     fprintf(stderr, "%d/%d wr:%d ww:%d wx:%d inh:%d\n", rfd, wfd,
00220             si.wants.readable, si.wants.writable, si.wants.isexception,
00221             si.inherit_request);
00222 #endif
00223     if (si.wants.readable && (rfd >= 0))
00224     {
00225         if (isselectable(rfd))
00226             FD_SET(rfd, &si.read);
00227         else
00228             si.msec_timeout = 0; // not selectable -> *always* readable
00229     } 
00230     
00231     // FIXME: outbuf flushing should really be in WvStream::pre_select()
00232     // instead!  But it's hard to get the equivalent behaviour there.
00233     if ((si.wants.writable || outbuf.used() || autoclose_time) && (wfd >= 0))
00234     {
00235         if (isselectable(wfd))
00236             FD_SET(wfd, &si.write);
00237         else
00238             si.msec_timeout = 0; // not selectable -> *always* writable
00239     }
00240     
00241     if (si.wants.isexception)
00242     {
00243         if (rfd >= 0 && isselectable(rfd)) FD_SET(rfd, &si.except);
00244         if (wfd >= 0 && isselectable(wfd)) FD_SET(wfd, &si.except);
00245     }
00246     
00247     if (si.max_fd < rfd)
00248         si.max_fd = rfd;
00249     if (si.max_fd < wfd)
00250         si.max_fd = wfd;
00251 }
00252 
00253 
00254 bool WvFdStream::post_select(SelectInfo &si)
00255 {
00256     bool result = WvStream::post_select(si);
00257     
00258     // flush the output buffer if possible
00259     size_t outbuf_used = outbuf.used();
00260     if (wfd >= 0 && (outbuf_used || autoclose_time)
00261         && FD_ISSET(wfd, &si.write) && should_flush())
00262     {
00263         flush_outbuf(0);
00264         
00265         // flush_outbuf() might have closed the file!
00266         if (!isok())
00267             return result;
00268     }
00269     
00270     bool rforce = si.wants.readable && !isselectable(rfd),
00271          wforce = si.wants.writable && !isselectable(wfd);
00272     bool val = 
00273            (rfd >= 0 && (rforce || FD_ISSET(rfd, &si.read)))
00274         || (wfd >= 0 && (wforce || FD_ISSET(wfd, &si.write)))
00275         || (rfd >= 0 && (FD_ISSET(rfd, &si.except)))
00276         || (wfd >= 0 && (FD_ISSET(wfd, &si.except)));
00277     
00278     // fprintf(stderr, "fds_post_select: %d/%d %d/%d %d\n", 
00279     //          rfd, wfd, rforce, wforce, val);
00280     
00281     if (val && si.wants.readable && read_requires_writable
00282       && read_requires_writable->isok()
00283       && !read_requires_writable->select(0, false, true))
00284         return result;
00285     if (val && si.wants.writable && write_requires_readable
00286       && write_requires_readable->isok()
00287       && !write_requires_readable->select(0, true, false))
00288         return result;
00289     return val || result;
00290 }