WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * Base class for streams built on Unix file descriptors. 00006 */ 00007 #include "wvfdstream.h" 00008 #include "wvmoniker.h" 00009 #include <fcntl.h> 00010 00011 #ifndef _WIN32 00012 #include <sys/socket.h> 00013 00014 inline bool isselectable(int fd) 00015 { 00016 return true; 00017 } 00018 00019 #else // _WIN32 00020 00021 #define getsockopt(a,b,c,d,e) getsockopt(a,b,c,(char *)d, e) 00022 #define SHUT_RD SD_RECEIVE 00023 #define SHUT_WR SD_SEND 00024 #define ENOBUFS WSAENOBUFS 00025 #undef EAGAIN 00026 #define EAGAIN WSAEWOULDBLOCK 00027 00028 #include "streams.h" 00029 00030 #undef errno 00031 #define errno GetLastError() 00032 00033 // in win32, only sockets can be in the FD_SET for select() 00034 static inline bool isselectable(int s) 00035 { 00036 // if _get_osfhandle() works, it's a msvcrt fd, not a winsock handle. 00037 // msvcrt fds can't be select()ed on correctly. 00038 return ((HANDLE)_get_osfhandle(s) == INVALID_HANDLE_VALUE) 00039 ? true : false; 00040 } 00041 00042 #endif // _WIN32 00043 00044 00045 /***** WvFdStream *****/ 00046 00047 static IWvStream *creator(WvStringParm s, IObject *) 00048 { 00049 return new WvFdStream(s.num()); 00050 } 00051 00052 static WvMoniker<IWvStream> reg("fd", creator); 00053 00054 WvFdStream::WvFdStream(int _rwfd) 00055 : rfd(_rwfd), wfd(_rwfd) 00056 { 00057 shutdown_read = shutdown_write = false; 00058 } 00059 00060 00061 WvFdStream::WvFdStream(int _rfd, int _wfd) 00062 : rfd(_rfd), wfd(_wfd) 00063 { 00064 shutdown_read = shutdown_write = false; 00065 } 00066 00067 00068 WvFdStream::~WvFdStream() 00069 { 00070 close(); 00071 } 00072 00073 00074 static int _cloexec(int fd, bool close_on_exec) 00075 { 00076 #ifndef _WIN32 // there is no exec() in win32, so this is meaningless there 00077 return fcntl(fd, F_SETFD, close_on_exec ? FD_CLOEXEC : 0); 00078 #else 00079 return 0; 00080 #endif 00081 } 00082 00083 00084 static int _nonblock(int fd, bool nonblock) 00085 { 00086 #ifndef _WIN32 00087 int flag = fcntl(fd, F_GETFL); 00088 return fcntl(fd, F_SETFL, 00089 (flag & ~O_NONBLOCK) | (nonblock ? O_NONBLOCK : 0)); 00090 #else 00091 u_long arg = nonblock ? 1 : 0; 00092 return ioctlsocket(fd, FIONBIO, &arg); 00093 #endif 00094 } 00095 00096 00097 void WvFdStream::set_nonblock(bool nonblock) 00098 { 00099 int rfd = getrfd(), wfd = getwfd(); 00100 if (rfd >= 0) 00101 _nonblock(rfd, nonblock); 00102 if (wfd >= 0 && rfd != wfd) 00103 _nonblock(wfd, nonblock); 00104 } 00105 00106 00107 void WvFdStream::set_close_on_exec(bool close_on_exec) 00108 { 00109 int rfd = getrfd(), wfd = getwfd(); 00110 if (rfd >= 0) 00111 _cloexec(rfd, close_on_exec); 00112 if (wfd >= 0 && rfd != wfd) 00113 _cloexec(wfd, close_on_exec); 00114 } 00115 00116 00117 void WvFdStream::close() 00118 { 00119 // fprintf(stderr, "closing fdstream!\n"); 00120 if (!closed) 00121 { 00122 WvStream::close(); 00123 //fprintf(stderr, "closing%d:%d/%d\n", (int)this, rfd, wfd); 00124 if (rfd >= 0) 00125 ::close(rfd); 00126 if (wfd >= 0 && wfd != rfd) 00127 ::close(wfd); 00128 rfd = wfd = -1; 00129 //fprintf(stderr, "closed!\n"); 00130 } 00131 } 00132 00133 00134 bool WvFdStream::isok() const 00135 { 00136 return WvStream::isok() && (rfd != -1 || wfd != -1); 00137 } 00138 00139 00140 size_t WvFdStream::uread(void *buf, size_t count) 00141 { 00142 assert(!count || buf); 00143 if (!count || !buf || !isok()) return 0; 00144 00145 int in = ::read(rfd, buf, count); 00146 00147 // a read that returns zero bytes signifies end-of-file (EOF). 00148 if (in <= 0) 00149 { 00150 if (in < 0 && (errno==EINTR || errno==EAGAIN || errno==ENOBUFS)) 00151 return 0; // interrupted 00152 00153 seterr(in < 0 ? errno : 0); 00154 return 0; 00155 } 00156 00157 // fprintf(stderr, "read %d bytes\n", in); 00158 return in; 00159 } 00160 00161 00162 size_t WvFdStream::uwrite(const void *buf, size_t count) 00163 { 00164 assert(!count || buf); 00165 if (!buf || !count || !isok()) return 0; 00166 // fprintf(stderr, "write %d bytes\n", count); 00167 00168 int out = ::write(wfd, buf, count); 00169 00170 if (out <= 0) 00171 { 00172 int err = errno; 00173 // fprintf(stderr, "(fd%d-err-%d)", wfd, err); 00174 if (out < 0 && (err == ENOBUFS || err==EAGAIN)) 00175 return 0; // kernel buffer full - data not written (yet!) 00176 00177 seterr(out < 0 ? err : 0); // a more critical error 00178 return 0; 00179 } 00180 00181 //TRACE("write obj 0x%08x, bytes %d/%d\n", (unsigned int)this, out, count); 00182 return out; 00183 } 00184 00185 00186 void WvFdStream::maybe_autoclose() 00187 { 00188 if (stop_write && !shutdown_write && !outbuf.used()) 00189 { 00190 shutdown_write = true; 00191 if (wfd < 0) 00192 return; 00193 if (rfd != wfd) 00194 ::close(wfd); 00195 else 00196 ::shutdown(wfd, SHUT_WR); // might be a socket 00197 wfd = -1; 00198 } 00199 00200 if (stop_read && !shutdown_read && !inbuf.used()) 00201 { 00202 shutdown_read = true; 00203 if (rfd != wfd) 00204 ::close(rfd); 00205 else 00206 ::shutdown(rfd, SHUT_RD); // might be a socket 00207 rfd = -1; 00208 } 00209 00210 WvStream::maybe_autoclose(); 00211 } 00212 00213 00214 void WvFdStream::pre_select(SelectInfo &si) 00215 { 00216 WvStream::pre_select(si); 00217 00218 #if 0 00219 fprintf(stderr, "%d/%d wr:%d ww:%d wx:%d inh:%d\n", rfd, wfd, 00220 si.wants.readable, si.wants.writable, si.wants.isexception, 00221 si.inherit_request); 00222 #endif 00223 if (si.wants.readable && (rfd >= 0)) 00224 { 00225 if (isselectable(rfd)) 00226 FD_SET(rfd, &si.read); 00227 else 00228 si.msec_timeout = 0; // not selectable -> *always* readable 00229 } 00230 00231 // FIXME: outbuf flushing should really be in WvStream::pre_select() 00232 // instead! But it's hard to get the equivalent behaviour there. 00233 if ((si.wants.writable || outbuf.used() || autoclose_time) && (wfd >= 0)) 00234 { 00235 if (isselectable(wfd)) 00236 FD_SET(wfd, &si.write); 00237 else 00238 si.msec_timeout = 0; // not selectable -> *always* writable 00239 } 00240 00241 if (si.wants.isexception) 00242 { 00243 if (rfd >= 0 && isselectable(rfd)) FD_SET(rfd, &si.except); 00244 if (wfd >= 0 && isselectable(wfd)) FD_SET(wfd, &si.except); 00245 } 00246 00247 if (si.max_fd < rfd) 00248 si.max_fd = rfd; 00249 if (si.max_fd < wfd) 00250 si.max_fd = wfd; 00251 } 00252 00253 00254 bool WvFdStream::post_select(SelectInfo &si) 00255 { 00256 bool result = WvStream::post_select(si); 00257 00258 // flush the output buffer if possible 00259 size_t outbuf_used = outbuf.used(); 00260 if (wfd >= 0 && (outbuf_used || autoclose_time) 00261 && FD_ISSET(wfd, &si.write) && should_flush()) 00262 { 00263 flush_outbuf(0); 00264 00265 // flush_outbuf() might have closed the file! 00266 if (!isok()) 00267 return result; 00268 } 00269 00270 bool rforce = si.wants.readable && !isselectable(rfd), 00271 wforce = si.wants.writable && !isselectable(wfd); 00272 bool val = 00273 (rfd >= 0 && (rforce || FD_ISSET(rfd, &si.read))) 00274 || (wfd >= 0 && (wforce || FD_ISSET(wfd, &si.write))) 00275 || (rfd >= 0 && (FD_ISSET(rfd, &si.except))) 00276 || (wfd >= 0 && (FD_ISSET(wfd, &si.except))); 00277 00278 // fprintf(stderr, "fds_post_select: %d/%d %d/%d %d\n", 00279 // rfd, wfd, rforce, wforce, val); 00280 00281 if (val && si.wants.readable && read_requires_writable 00282 && read_requires_writable->isok() 00283 && !read_requires_writable->select(0, false, true)) 00284 return result; 00285 if (val && si.wants.writable && write_requires_readable 00286 && write_requires_readable->isok() 00287 && !write_requires_readable->select(0, true, false)) 00288 return result; 00289 return val || result; 00290 }