WvStreams
|
00001 /* 00002 * Worldvisions Tunnel Vision Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * WvEncoderStream chains a series of encoders on the input and 00006 * output ports of the underlying stream to effect on-the-fly data 00007 * transformations. 00008 */ 00009 #include "wvencoderstream.h" 00010 00011 WvEncoderStream::WvEncoderStream(WvStream *_cloned) : WvStreamClone(_cloned) 00012 { 00013 is_closing = false; 00014 min_readsize = 0; 00015 } 00016 00017 00018 WvEncoderStream::~WvEncoderStream() 00019 { 00020 close(); 00021 } 00022 00023 00024 void WvEncoderStream::close() 00025 { 00026 // fprintf(stderr, "Encoderstream close!\n"); 00027 00028 // we want to finish the encoders even if !isok() since we 00029 // might just have encountered an EOF condition, and we want 00030 // to ensure that the remaining data is processed, but this 00031 // might cause recursion if the encoders set a new error condition 00032 if (is_closing) return; 00033 is_closing = true; 00034 00035 // finish encoders 00036 finish_read(); 00037 finish_write(); 00038 00039 // flush write chain and close the stream 00040 WvStreamClone::close(); 00041 } 00042 00043 00044 bool WvEncoderStream::isok() const 00045 { 00046 //fprintf(stderr, "encoderstream isok: %d %p %d %d\n", 00047 // WvStream::isok(), cloned, cloned->isok(), cloned->geterr()); 00048 00049 // handle encoder error conditions 00050 if (!WvStream::isok()) 00051 return false; 00052 00053 // handle substream error conditions 00054 // we don't check substream isok() because that is handled 00055 // during read operations to distinguish EOF from errors 00056 if (!cloned || cloned->geterr() != 0) 00057 return false; 00058 00059 return true; 00060 } 00061 00062 00063 bool WvEncoderStream::flush_internal(time_t msec_timeout) 00064 { 00065 flush_write(); 00066 return WvStreamClone::flush_internal(msec_timeout); 00067 } 00068 00069 00070 bool WvEncoderStream::flush_read() 00071 { 00072 bool success = readchain.flush(readinbuf, readoutbuf); 00073 checkreadisok(); 00074 inbuf.merge(readoutbuf); 00075 return success; 00076 } 00077 00078 00079 bool WvEncoderStream::flush_write() 00080 { 00081 bool success = push(true /*flush*/, false /*finish*/); 00082 return success; 00083 } 00084 00085 00086 bool WvEncoderStream::finish_read() 00087 { 00088 bool success = readchain.flush(readinbuf, readoutbuf); 00089 if (!readchain.finish(readoutbuf)) 00090 success = false; 00091 checkreadisok(); 00092 inbuf.merge(readoutbuf); 00093 // noread(); 00094 return success; 00095 } 00096 00097 00098 bool WvEncoderStream::finish_write() 00099 { 00100 return push(true /*flush*/, true /*finish*/); 00101 } 00102 00103 00104 void WvEncoderStream::pull(size_t size) 00105 { 00106 // fprintf(stderr, "encoder pull %d\n", size); 00107 00108 // pull a chunk of unencoded input 00109 bool finish = false; 00110 if (cloned) 00111 { 00112 if (size != 0) 00113 cloned->read(readinbuf, size); 00114 if (!cloned->isok()) 00115 finish = true; // underlying stream hit EOF or error 00116 } 00117 00118 // deal with any encoders that have been added recently 00119 WvDynBuf tmpbuf; 00120 tmpbuf.merge(readoutbuf); 00121 readchain.continue_encode(tmpbuf, readoutbuf); 00122 00123 // apenwarr 2004/11/06: always flush on read, because otherwise there's 00124 // no clear way to decide when we need to flush. Anyway, most "decoders" 00125 // (the kind of thing you'd put in the readchain) don't care whether you 00126 // flush or not. 00127 readchain.encode(readinbuf, readoutbuf, true); 00128 //readchain.encode(readinbuf, readoutbuf, finish /*flush*/); 00129 if (finish) 00130 { 00131 readchain.finish(readoutbuf); 00132 // if (readoutbuf.used() == 0 && inbuf.used() == 0) 00133 // noread(); 00134 close(); 00135 // otherwise defer EOF until the buffered data has been read 00136 } 00137 else if (!readoutbuf.used() && !inbuf.used() && readchain.isfinished()) 00138 { 00139 // only get EOF when the chain is finished and we have no 00140 // more data 00141 //noread(); 00142 close(); 00143 } 00144 checkreadisok(); 00145 } 00146 00147 00148 bool WvEncoderStream::push(bool flush, bool finish) 00149 { 00150 WvDynBuf writeoutbuf; 00151 00152 // encode the output 00153 if (flush) 00154 writeinbuf.merge(outbuf); 00155 bool success = writechain.encode(writeinbuf, writeoutbuf, flush); 00156 if (finish) 00157 if (!writechain.finish(writeoutbuf)) 00158 success = false; 00159 checkwriteisok(); 00160 00161 #if 0 00162 // push encoded output to cloned stream 00163 size_t size = writeoutbuf.used(); 00164 if (size != 0) 00165 { 00166 const unsigned char *writeout = writeoutbuf.get(size); 00167 size_t len = WvStreamClone::uwrite(writeout, size); 00168 writeoutbuf.unget(size - len); 00169 } 00170 #endif 00171 if (cloned) 00172 cloned->write(writeoutbuf, writeoutbuf.used()); 00173 00174 return success; 00175 } 00176 00177 00178 size_t WvEncoderStream::uread(void *buf, size_t size) 00179 { 00180 // fprintf(stderr, "encstream::uread(%d)\n", size); 00181 if (size && readoutbuf.used() == 0) 00182 pull(min_readsize > size ? min_readsize : size); 00183 size_t avail = readoutbuf.used(); 00184 if (size > avail) 00185 size = avail; 00186 readoutbuf.move(buf, size); 00187 return size; 00188 } 00189 00190 00191 size_t WvEncoderStream::uwrite(const void *buf, size_t size) 00192 { 00193 writeinbuf.put(buf, size); 00194 push(false /*flush*/, false /*finish*/); 00195 return size; 00196 } 00197 00198 00199 void WvEncoderStream::pre_select(SelectInfo &si) 00200 { 00201 WvStreamClone::pre_select(si); 00202 00203 if (si.wants.readable && readoutbuf.used() != 0) 00204 si.msec_timeout = 0; 00205 } 00206 00207 00208 bool WvEncoderStream::post_select(SelectInfo &si) 00209 { 00210 bool sure = false; 00211 00212 // if we have buffered input data and we want to check for 00213 // readability, then cause a callback to occur that will 00214 // hopefully ask us for more data via uread() 00215 if (si.wants.readable && readoutbuf.used() != 0) 00216 { 00217 pull(0); // try an encode 00218 if (readoutbuf.used() != 0) 00219 sure = true; 00220 } 00221 00222 // try to push pending encoded output to cloned stream 00223 // outbuf_delayed_flush condition already handled by uwrite() 00224 push(false /*flush*/, false /*finish*/); 00225 00226 // consult the underlying stream 00227 sure |= WvStreamClone::post_select(si); 00228 00229 return sure; 00230 } 00231 00232 00233 void WvEncoderStream::checkreadisok() 00234 { 00235 if (!readchain.isok()) 00236 { 00237 seterr(WvString("read chain: %s", readchain.geterror())); 00238 noread(); 00239 } 00240 } 00241 00242 00243 void WvEncoderStream::checkwriteisok() 00244 { 00245 if (!writechain.isok()) 00246 seterr(WvString("write chain: %s", writechain.geterror())); 00247 }