WvStreams
wvhttpstream.cc
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
00006  * 
00007  * See wvhttppool.h.
00008  */
00009 #include "wvhttppool.h"
00010 #include "wvtcp.h"
00011 #include "wvsslstream.h"
00012 #include "wvbuf.h"
00013 #include "wvbase64.h"
00014 #include "strutils.h"
00015 #ifdef HAVE_EXECINFO_H
00016 #include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps
00017 #endif
00018 
00019 #ifdef _WIN32
00020 #define ETIMEDOUT WSAETIMEDOUT
00021 #endif
00022 
00023 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
00024                 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
00025     : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
00026       pipeline_incompatible(_pipeline_incompatible),
00027       in_doneurl(false)
00028 {
00029     log("Opening server connection.\n");
00030     http_response = "";
00031     encoding = Unknown;
00032     bytes_remaining = 0;
00033     in_chunk_trailer = false;
00034     pipeline_test_count = 0;
00035     last_was_pipeline_test = false;
00036 
00037     enable_pipelining = global_enable_pipelining 
00038         && !pipeline_incompatible[target.remaddr];
00039     ssl = _ssl;
00040 
00041     if (ssl)
00042         cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
00043 
00044     sent_url_request = false;
00045 
00046     alarm(60000); // timeout if no connection, or something goes wrong
00047 }
00048 
00049 
00050 WvHttpStream::~WvHttpStream()
00051 {
00052     log(WvLog::Debug2, "Deleting.\n");
00053 
00054 #if 0
00055 #ifdef HAVE_EXECINFO_H
00056     void* trace[10];
00057     int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00058     char** tracedump = backtrace_symbols(trace, count);
00059     log(WvLog::Debug, "TRACE");
00060     for (int i = 0; i < count; ++i)
00061         log(WvLog::Debug, ":%s", tracedump[i]);
00062     log(WvLog::Debug, "\n");
00063     free(tracedump);
00064 #endif
00065 #endif
00066 
00067     if (geterr())
00068         log("Error was: %s\n", errstr());
00069     close();
00070 }
00071 
00072 
00073 void WvHttpStream::close()
00074 {
00075     log("close called\n");
00076 
00077 #if 0    
00078 #ifdef HAVE_EXECINFO_H
00079     void *trace[10];
00080     int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00081     char** tracedump = backtrace_symbols(trace, count);
00082     log(WvLog::Debug, "TRACE");
00083     for (int i = 0; i < count; ++i)
00084         log(WvLog::Debug, ":%s", tracedump[i]);
00085     log(WvLog::Debug, "\n");
00086     free(tracedump);
00087 #endif
00088 #endif
00089 
00090     // assume pipelining is broken if we're closing without doing at least
00091     // one successful pipelining test and a following non-test request.
00092     if (enable_pipelining && max_requests > 1
00093             && (pipeline_test_count < 1
00094             || (pipeline_test_count == 1 && last_was_pipeline_test)))
00095         pipelining_is_broken(2);
00096 
00097     if (isok())
00098         log("Closing.\n");
00099     WvStreamClone::close();
00100 
00101     if (geterr())
00102     {
00103         // if there was an error, count the first URL as done.  This prevents
00104         // retrying indefinitely.
00105         WvUrlRequest *msgurl = curl;
00106         if (!msgurl && !urls.isempty())
00107             msgurl = urls.first();
00108         if (!msgurl && !waiting_urls.isempty())
00109             msgurl = waiting_urls.first();
00110 
00111         if (msgurl)
00112         {
00113             log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
00114                 errstr());        
00115             curl = msgurl;
00116             doneurl();
00117         }
00118     }
00119     waiting_urls.zap();
00120     if (curl)
00121     {
00122         log("curl is %s\n", curl->url);
00123         doneurl();
00124     }
00125     log("close done\n");
00126 }
00127 
00128 
00129 void WvHttpStream::doneurl()
00130 {
00131     // There is a slight chance that we might receive an error during or just before
00132     // this function is called, which means that the write occuring during
00133     // start_pipeline_test() would be called, which would call close() because of the
00134     // error, which would call doneurl() again.  We don't want to execute doneurl()
00135     // a second time when we're already in the middle.
00136     if (in_doneurl)
00137         return;
00138     in_doneurl = true;
00139 
00140     assert(curl != NULL);
00141     WvString last_response(http_response);
00142     log("Done URL: %s\n", curl->url);
00143 
00144     http_response = "";
00145     encoding = Unknown;
00146     in_chunk_trailer = false;
00147     bytes_remaining = 0;
00148 
00149     last_was_pipeline_test = curl->pipeline_test;
00150     bool broken = false;
00151     if (last_was_pipeline_test)
00152     {
00153         pipeline_test_count++;
00154         if (pipeline_test_count == 1)
00155             start_pipeline_test(&curl->url);
00156         else if (pipeline_test_response != last_response)
00157         {
00158             // getting a bit late in the game to be detecting brokenness :(
00159             // However, if the response code isn't the same for both tests,
00160             // something's definitely screwy.
00161             pipelining_is_broken(4);
00162             broken = true;
00163         }
00164         pipeline_test_response = last_response;
00165     }
00166 
00167     assert(curl == urls.first());
00168     curl->done();
00169     curl = NULL;
00170     sent_url_request = false;
00171     urls.unlink_first();
00172 
00173     if (broken)
00174         close();
00175 
00176     request_next();
00177     in_doneurl = false;
00178 }
00179 
00180 
00181 static WvString encode64(WvStringParm user, WvStringParm password)
00182 {
00183     WvBase64Encoder encoder;
00184     WvString ret;
00185     encoder.flushstrstr(WvString("%s:%s", user, password), ret);
00186     return ret;
00187 }
00188 
00189 
00190 static WvString fixnl(WvStringParm nonl)
00191 {
00192     WvDynBuf b;
00193     const char *cptr;
00194 
00195     for (cptr = nonl; cptr && *cptr; cptr++)
00196     {
00197         if (*cptr == '\r')
00198             continue;
00199         else if (*cptr == '\n')
00200             b.put("\r", 1); // put BOTH \r and \n
00201         b.put(cptr, 1);
00202     }
00203 
00204     return b.getstr();
00205 }
00206 
00207 
00208 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
00209 {
00210     WvString request;
00211     WvString auth("");
00212     if(!!url->url.getuser() && !!url->url.getpassword())
00213         auth = WvString("Authorization: Basic %s\n",
00214                     encode64(url->url.getuser(), url->url.getpassword()));
00215 
00216     request = fixnl(
00217         WvString(
00218             "%s %s HTTP/1.1\n"
00219             "Host: %s:%s\n"
00220             "Connection: %s\n"
00221             "%s"
00222             "%s"
00223             "%s%s"
00224             "\n",
00225             url->method,
00226             url->url.getfile(),
00227             url->url.gethost(), url->url.getport(),
00228             keepalive ? "keep-alive" : "close",
00229             auth,
00230             (putstream_data.used() > 0 ? WvString(
00231                 "Content-Length: %s\n", putstream_data.used()) : ""),
00232             trim_string(url->headers.edit()),
00233             !!url->headers ? "\n" : ""));
00234     return request;
00235 }
00236 
00237 
00238 void WvHttpStream::send_request(WvUrlRequest *url)
00239 {
00240     request_count++;
00241     log("Request #%s: %s\n", request_count, url->url);
00242     write(request_str(url, url->pipeline_test
00243                 || request_count < max_requests));
00244     write(putstream_data);
00245     sent_url_request = true;
00246     alarm(60000);
00247 }
00248 
00249 
00250 void WvHttpStream::start_pipeline_test(WvUrl *url)
00251 {
00252     WvUrl location(WvString(
00253                 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
00254                 url->getproto(), url->gethost(), url->getport()));
00255     WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
00256                                              false, true);
00257     testurl->instream = this;
00258     send_request(testurl);
00259     urls.append(testurl, true, "sent_running_url");
00260 }
00261 
00262 
00263 void WvHttpStream::request_next()
00264 {
00265     // Clear the putstream buffer before we start any new requests
00266     putstream_data.zap();
00267 
00268     // don't do a request if we've done too many already or we have none
00269     // waiting.
00270     if (request_count >= max_requests || waiting_urls.isempty())
00271         return;
00272 
00273     // don't do more than one request at a time if we're not pipelining.
00274     if (!enable_pipelining && !urls.isempty())
00275         return;
00276 
00277     // okay then, we really do want to send a new request.
00278     WvUrlRequest *url = waiting_urls.first();
00279 
00280     waiting_urls.unlink_first();
00281     if (!url->putstream)
00282     {
00283         if (enable_pipelining && !request_count && max_requests > 1)
00284             start_pipeline_test(&url->url);
00285         send_request(url);
00286     }
00287     urls.append(url, false, "sent_running_url");
00288 }
00289 
00290 
00291 void WvHttpStream::pipelining_is_broken(int why)
00292 {
00293     if (!pipeline_incompatible[target.remaddr])
00294     {
00295         pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
00296         log("Pipelining is broken on this server (%s)!  Disabling.\n", why);
00297     }
00298 }
00299 
00300 
00301 void WvHttpStream::pre_select(SelectInfo &si)
00302 {
00303     SelectRequest oldwant = si.wants;
00304     WvUrlRequest *url;
00305 
00306     WvUrlStream::pre_select(si);
00307 
00308     if (!urls.isempty())
00309     {
00310         url = urls.first();
00311         if(url && url->putstream) 
00312             url->putstream->pre_select(si);
00313     }
00314    
00315     si.wants = oldwant;
00316 }
00317 
00318 
00319 bool WvHttpStream::post_select(SelectInfo &si)
00320 {
00321     SelectRequest oldwant = si.wants;
00322     WvUrlRequest *url;
00323 
00324     if (WvUrlStream::post_select(si))
00325         return true;
00326 
00327     if (!urls.isempty())
00328     {
00329         url = urls.first();
00330         if(url && url->putstream && url->putstream->post_select(si))
00331             return true;
00332     }
00333 
00334     si.wants = oldwant;
00335     return false;
00336 }
00337 
00338 
00339 void WvHttpStream::execute()
00340 {
00341     char buf[1024], *line;
00342     size_t len;
00343 
00344     WvStreamClone::execute();
00345 
00346     // make connections timeout after some idleness
00347     if (alarm_was_ticking)
00348     {
00349         log(WvLog::Debug4, "urls count: %s\n", urls.count());
00350         if (!urls.isempty())
00351         {
00352             seterr(ETIMEDOUT);
00353 
00354             // Must check again here since seterr()
00355             // will close our stream and if we only 
00356             // had one url then it'll be gone.
00357             if (!urls.isempty())
00358             {
00359                 WvUrlRequest *url = urls.first();
00360                 if (url->outstream)
00361                     url->outstream->seterr(ETIMEDOUT);
00362             }
00363         }
00364         else
00365             close(); // timed out, but not really an error
00366         return;
00367     }
00368 
00369     // Die if somebody closed our outstream.  This is so that if we were
00370     // downloading a really big file, they can stop it in the middle and
00371     // our next url request can start downloading immediately.
00372     if (curl && !curl->outstream)
00373     {
00374         if (!(encoding == PostHeadInfinity
00375               || encoding == PostHeadChunked
00376               || encoding == PostHeadStream))
00377         {
00378             // don't complain about pipelining failures
00379             pipeline_test_count++;
00380             last_was_pipeline_test = false;
00381             close();
00382         }
00383 
00384         if (curl)
00385             doneurl();
00386         return;
00387     }
00388     else if (curl)
00389         curl->inuse = true;
00390 
00391     if(!sent_url_request && !urls.isempty())
00392     {
00393         WvUrlRequest *url = urls.first();
00394         if(url)
00395         {
00396             if(url->putstream)
00397             {
00398                 int len = 0;
00399                 if(url->putstream->isok())
00400                     len = url->putstream->read(putstream_data, 1024);
00401 
00402                 if(!url->putstream->isok() || len == 0)
00403                 {
00404                     url->putstream = NULL;
00405                     send_request(url);
00406                 }
00407             }
00408         }
00409     }
00410 
00411     if (!curl)
00412     {
00413         // in the header section
00414         line = getline();
00415         if (line)
00416         {
00417             line = trim_string(line);
00418             log(WvLog::Debug4, "Header: '%s'\n", line);
00419             if (!http_response)
00420             {
00421                 http_response = line;
00422 
00423                 // there are never two pipeline test requests in a row, so
00424                 // a second response string exactly like the pipeline test
00425                 // response implies that everything between the first and
00426                 // second test requests was lost: bad!
00427                 if (last_was_pipeline_test
00428                         && http_response == pipeline_test_response)
00429                 {
00430                     pipelining_is_broken(1);
00431                     close();
00432                     return;
00433                 }
00434 
00435                 // http response #400 is "invalid request", which we
00436                 // shouldn't be sending. If we get one of these right after
00437                 // a test, it probably means the stuff that came after it
00438                 // was mangled in some way during transmission ...and we
00439                 // should throw it away.
00440                 if (last_was_pipeline_test && !!http_response)
00441                 {
00442                     const char *cptr = strchr(http_response, ' ');
00443                     if (cptr && atoi(cptr+1) == 400)
00444                     {
00445                         pipelining_is_broken(3);
00446                         close();
00447                         return;
00448                     }
00449                 }
00450             }
00451 
00452             if (urls.isempty())
00453             {
00454                 log("got unsolicited data.\n");
00455                 seterr("unsolicited data from server!");
00456                 return;
00457             }
00458 
00459             if (!strncasecmp(line, "Content-length: ", 16))
00460             {
00461                 bytes_remaining = atoi(line+16);
00462                 encoding = ContentLength;
00463             }
00464             else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
00465                     && strstr(line+19, "chunked"))
00466             {
00467                 encoding = Chunked;
00468             }
00469 
00470             if (line[0])
00471             {
00472                 char *p;
00473                 WvBufUrlStream *outstream = urls.first()->outstream;
00474 
00475                 if ((p = strchr(line, ':')) != NULL)
00476                 {
00477                     *p = 0;
00478                     p = trim_string(p+1);
00479                     if (outstream) {
00480                         struct WvHTTPHeader *h;
00481                         h = new struct WvHTTPHeader(line, p);
00482                         outstream->headers.add(h, true);
00483                     }
00484                 }
00485                 else if (strncasecmp(line, "HTTP/", 5) == 0)
00486                 {
00487                     char *p = strchr(line, ' ');
00488                     if (p)
00489                     {
00490                         *p = 0;
00491                         if (outstream)
00492                         {
00493                             outstream->version = line+5;
00494                             outstream->status = atoi(p+1);
00495                         }
00496                     }
00497                 }
00498             }
00499             else
00500             {
00501                 // blank line is the beginning of data section
00502                 curl = urls.first();
00503                 in_chunk_trailer = false;
00504                 log(WvLog::Debug4,
00505                         "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
00506 
00507                 if (encoding == Unknown)
00508                     encoding = Infinity; // go until connection closes itself
00509 
00510                 if (curl->method == "HEAD")
00511                 {
00512                     log("Got all headers.\n");
00513                     if (!enable_pipelining)
00514                         doneurl();
00515 
00516                     if (encoding == Infinity)
00517                         encoding = PostHeadInfinity;
00518                     else if (encoding == Chunked)
00519                         encoding = PostHeadChunked;
00520                     else
00521                         encoding = PostHeadStream;
00522                 }
00523             }
00524         }
00525     }
00526     else if (encoding == PostHeadInfinity
00527              || encoding == PostHeadChunked
00528              || encoding == PostHeadStream)
00529     {
00530         WvDynBuf chkbuf;
00531         len = read(chkbuf, 5);
00532 
00533         // If there is more data available right away, and it isn't an
00534         // HTTP header from another request, then it's a stupid web
00535         // server that likes to send bodies with HEAD requests.
00536         if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
00537                            "HTTP/", 5))
00538         {
00539             if (encoding == PostHeadInfinity)
00540                 encoding = ChuckInfinity;
00541             else if (encoding == PostHeadChunked)
00542                 encoding = ChuckChunked;
00543             else if (encoding == PostHeadStream)
00544                 encoding = ChuckStream;
00545             else
00546                 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
00547         }
00548         else
00549             doneurl();
00550 
00551         unread(chkbuf, len);
00552     }
00553     else if (encoding == ChuckInfinity)
00554     {
00555         len = read(buf, sizeof(buf));
00556         if (len)
00557             log(WvLog::Debug5, "Chucking %s bytes.\n", len);
00558         if (!isok())
00559             doneurl();
00560     }
00561     else if (encoding == ChuckChunked && !bytes_remaining)
00562     {
00563         encoding = Chunked;
00564     }
00565     else if (encoding == ChuckChunked || encoding == ChuckStream)
00566     {
00567         if (bytes_remaining > sizeof(buf))
00568             len = read(buf, sizeof(buf));
00569         else
00570             len = read(buf, bytes_remaining);
00571         bytes_remaining -= len;
00572         if (len)
00573             log(WvLog::Debug5,
00574                 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
00575         if (!bytes_remaining && encoding == ContentLength)
00576             doneurl();
00577         if (bytes_remaining && !isok())
00578             seterr("connection interrupted");
00579     }
00580     else if (encoding == Chunked && !bytes_remaining)
00581     {
00582         line = getline();
00583         if (line)
00584         {
00585             line = trim_string(line);
00586 
00587             if (in_chunk_trailer)
00588             {
00589                 // in the trailer section of a chunked encoding
00590                 log(WvLog::Debug4, "Trailer: '%s'\n", line);
00591 
00592                 // a blank line means we're finally done!
00593                 if (!line[0])
00594                     doneurl();
00595             }
00596             else
00597             {
00598                 // in the "length line" section of a chunked encoding
00599                 if (line[0])
00600                 {
00601                     bytes_remaining = (size_t)strtoul(line, NULL, 16);
00602                     if (!bytes_remaining)
00603                         in_chunk_trailer = true;
00604                     log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
00605                             bytes_remaining, line);
00606                 }
00607             }
00608         }
00609     }
00610     else if (encoding == Infinity)
00611     {
00612         // just read data until the connection closes, and assume all was
00613         // well.  It sucks, but there's no way to tell if all the data arrived
00614         // okay... that's why Chunked or ContentLength encoding is better.
00615         len = read(buf, sizeof(buf));
00616         if (!isok())
00617             return;
00618 
00619         if (len)
00620             log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
00621         if (curl && curl->outstream)
00622             curl->outstream->write(buf, len);
00623 
00624         if (!isok() && curl)
00625             doneurl();
00626     }
00627     else // not chunked or currently in a chunk - read 'bytes_remaining' bytes.
00628     {
00629         // in the data section of a chunked or content-length encoding,
00630         // with 'bytes_remaining' bytes of data left.
00631 
00632         if (bytes_remaining > sizeof(buf))
00633             len = read(buf, sizeof(buf));
00634         else
00635             len = read(buf, bytes_remaining);
00636         if (!isok())
00637             return;
00638 
00639         bytes_remaining -= len;
00640         if (len)
00641             log(WvLog::Debug5, 
00642                     "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
00643         if (curl && curl->outstream)
00644             curl->outstream->write(buf, len);
00645 
00646         if (!bytes_remaining && encoding == ContentLength && curl)
00647             doneurl();
00648 
00649         if (bytes_remaining && !isok())
00650             seterr("connection interrupted");
00651 
00652         if (!isok())
00653             doneurl();
00654     }
00655 
00656     if (urls.isempty())
00657         alarm(5000); // just wait a few seconds before closing connection
00658     else
00659         alarm(60000); // give the server a minute to respond, if we're waiting
00660 }