WvStreams
wvencoderstream.cc
00001 /*
00002  * Worldvisions Tunnel Vision Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * WvEncoderStream chains a series of encoders on the input and
00006  * output ports of the underlying stream to effect on-the-fly data
00007  * transformations.
00008  */
00009 #include "wvencoderstream.h"
00010 
00011 WvEncoderStream::WvEncoderStream(WvStream *_cloned) : WvStreamClone(_cloned)
00012 {
00013     is_closing = false;
00014     min_readsize = 0;
00015 }
00016 
00017 
00018 WvEncoderStream::~WvEncoderStream()
00019 {
00020     close();
00021 }
00022 
00023 
00024 void WvEncoderStream::close()
00025 {
00026     // fprintf(stderr, "Encoderstream close!\n");
00027     
00028     // we want to finish the encoders even if !isok() since we
00029     // might just have encountered an EOF condition, and we want
00030     // to ensure that the remaining data is processed, but this
00031     // might cause recursion if the encoders set a new error condition
00032     if (is_closing) return;
00033     is_closing = true;
00034     
00035     // finish encoders
00036     finish_read();
00037     finish_write();
00038     
00039     // flush write chain and close the stream
00040     WvStreamClone::close();
00041 }
00042 
00043 
00044 bool WvEncoderStream::isok() const
00045 {
00046     //fprintf(stderr, "encoderstream isok: %d %p %d %d\n",
00047     //      WvStream::isok(), cloned, cloned->isok(), cloned->geterr());
00048     
00049     // handle encoder error conditions
00050     if (!WvStream::isok())
00051         return false;
00052 
00053     // handle substream error conditions
00054     // we don't check substream isok() because that is handled
00055     // during read operations to distinguish EOF from errors
00056     if (!cloned || cloned->geterr() != 0)
00057         return false;
00058         
00059     return true;
00060 }
00061 
00062 
00063 bool WvEncoderStream::flush_internal(time_t msec_timeout)
00064 {
00065     flush_write();
00066     return WvStreamClone::flush_internal(msec_timeout);
00067 }
00068 
00069 
00070 bool WvEncoderStream::flush_read()
00071 {
00072     bool success = readchain.flush(readinbuf, readoutbuf);
00073     checkreadisok();
00074     inbuf.merge(readoutbuf);
00075     return success;
00076 }
00077 
00078 
00079 bool WvEncoderStream::flush_write()
00080 {
00081     bool success = push(true /*flush*/, false /*finish*/);
00082     return success;
00083 }
00084 
00085 
00086 bool WvEncoderStream::finish_read()
00087 {
00088     bool success = readchain.flush(readinbuf, readoutbuf);
00089     if (!readchain.finish(readoutbuf))
00090         success = false;
00091     checkreadisok();
00092     inbuf.merge(readoutbuf);
00093     // noread();
00094     return success;
00095 }
00096 
00097 
00098 bool WvEncoderStream::finish_write()
00099 {
00100     return push(true /*flush*/, true /*finish*/);
00101 }
00102 
00103 
00104 void WvEncoderStream::pull(size_t size)
00105 {
00106     // fprintf(stderr, "encoder pull %d\n", size);
00107     
00108     // pull a chunk of unencoded input
00109     bool finish = false;
00110     if (cloned)
00111     {
00112         if (size != 0)
00113             cloned->read(readinbuf, size);
00114         if (!cloned->isok())
00115             finish = true; // underlying stream hit EOF or error
00116     }
00117     
00118     // deal with any encoders that have been added recently
00119     WvDynBuf tmpbuf;
00120     tmpbuf.merge(readoutbuf);
00121     readchain.continue_encode(tmpbuf, readoutbuf);
00122     
00123     // apenwarr 2004/11/06: always flush on read, because otherwise there's
00124     // no clear way to decide when we need to flush.  Anyway, most "decoders"
00125     // (the kind of thing you'd put in the readchain) don't care whether you
00126     // flush or not.
00127     readchain.encode(readinbuf, readoutbuf, true);
00128     //readchain.encode(readinbuf, readoutbuf, finish /*flush*/);
00129     if (finish)
00130     {
00131         readchain.finish(readoutbuf);
00132         // if (readoutbuf.used() == 0 && inbuf.used() == 0)
00133         //    noread();
00134         close();
00135         // otherwise defer EOF until the buffered data has been read
00136     }
00137     else if (!readoutbuf.used() && !inbuf.used() && readchain.isfinished())
00138     {
00139         // only get EOF when the chain is finished and we have no
00140         // more data
00141         //noread();
00142         close();
00143     }
00144     checkreadisok();
00145 }
00146 
00147 
00148 bool WvEncoderStream::push(bool flush, bool finish)
00149 {
00150     WvDynBuf writeoutbuf;
00151     
00152     // encode the output
00153     if (flush)
00154         writeinbuf.merge(outbuf);
00155     bool success = writechain.encode(writeinbuf, writeoutbuf, flush);
00156     if (finish)
00157         if (!writechain.finish(writeoutbuf))
00158             success = false;
00159     checkwriteisok();
00160 
00161 #if 0
00162     // push encoded output to cloned stream
00163     size_t size = writeoutbuf.used();
00164     if (size != 0)
00165     {
00166         const unsigned char *writeout = writeoutbuf.get(size);
00167         size_t len = WvStreamClone::uwrite(writeout, size);
00168         writeoutbuf.unget(size - len);
00169     }
00170 #endif
00171     if (cloned)
00172         cloned->write(writeoutbuf, writeoutbuf.used());
00173     
00174     return success;
00175 }
00176 
00177 
00178 size_t WvEncoderStream::uread(void *buf, size_t size)
00179 {
00180     // fprintf(stderr, "encstream::uread(%d)\n", size);
00181     if (size && readoutbuf.used() == 0)
00182         pull(min_readsize > size ? min_readsize : size);
00183     size_t avail = readoutbuf.used();
00184     if (size > avail)
00185         size = avail;
00186     readoutbuf.move(buf, size);
00187     return size;
00188 }
00189 
00190 
00191 size_t WvEncoderStream::uwrite(const void *buf, size_t size)
00192 {
00193     writeinbuf.put(buf, size);
00194     push(false /*flush*/, false /*finish*/);
00195     return size;
00196 }
00197 
00198 
00199 void WvEncoderStream::pre_select(SelectInfo &si)
00200 {
00201     WvStreamClone::pre_select(si);
00202 
00203     if (si.wants.readable && readoutbuf.used() != 0)
00204         si.msec_timeout = 0;     
00205 }
00206 
00207 
00208 bool WvEncoderStream::post_select(SelectInfo &si)
00209 {
00210     bool sure = false;
00211     
00212     // if we have buffered input data and we want to check for
00213     // readability, then cause a callback to occur that will
00214     // hopefully ask us for more data via uread()
00215     if (si.wants.readable && readoutbuf.used() != 0)
00216     {
00217         pull(0); // try an encode
00218         if (readoutbuf.used() != 0)
00219             sure = true;
00220     }
00221     
00222     // try to push pending encoded output to cloned stream
00223     // outbuf_delayed_flush condition already handled by uwrite()
00224     push(false /*flush*/, false /*finish*/);
00225     
00226     // consult the underlying stream
00227     sure |= WvStreamClone::post_select(si);
00228 
00229     return sure;
00230 }
00231 
00232 
00233 void WvEncoderStream::checkreadisok()
00234 {
00235     if (!readchain.isok())
00236     {
00237         seterr(WvString("read chain: %s", readchain.geterror()));
00238         noread();
00239     }
00240 }
00241 
00242 
00243 void WvEncoderStream::checkwriteisok()
00244 {
00245     if (!writechain.isok())
00246         seterr(WvString("write chain: %s", writechain.geterror()));
00247 }