WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever. 00006 * 00007 * See wvhttppool.h. 00008 */ 00009 #include "wvhttppool.h" 00010 #include "wvtcp.h" 00011 #include "wvsslstream.h" 00012 #include "wvbuf.h" 00013 #include "wvbase64.h" 00014 #include "strutils.h" 00015 #ifdef HAVE_EXECINFO_H 00016 #include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps 00017 #endif 00018 00019 #ifdef _WIN32 00020 #define ETIMEDOUT WSAETIMEDOUT 00021 #endif 00022 00023 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username, 00024 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible) 00025 : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)), 00026 pipeline_incompatible(_pipeline_incompatible), 00027 in_doneurl(false) 00028 { 00029 log("Opening server connection.\n"); 00030 http_response = ""; 00031 encoding = Unknown; 00032 bytes_remaining = 0; 00033 in_chunk_trailer = false; 00034 pipeline_test_count = 0; 00035 last_was_pipeline_test = false; 00036 00037 enable_pipelining = global_enable_pipelining 00038 && !pipeline_incompatible[target.remaddr]; 00039 ssl = _ssl; 00040 00041 if (ssl) 00042 cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned)); 00043 00044 sent_url_request = false; 00045 00046 alarm(60000); // timeout if no connection, or something goes wrong 00047 } 00048 00049 00050 WvHttpStream::~WvHttpStream() 00051 { 00052 log(WvLog::Debug2, "Deleting.\n"); 00053 00054 #if 0 00055 #ifdef HAVE_EXECINFO_H 00056 void* trace[10]; 00057 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0])); 00058 char** tracedump = backtrace_symbols(trace, count); 00059 log(WvLog::Debug, "TRACE"); 00060 for (int i = 0; i < count; ++i) 00061 log(WvLog::Debug, ":%s", tracedump[i]); 00062 log(WvLog::Debug, "\n"); 00063 free(tracedump); 00064 #endif 00065 #endif 00066 00067 if (geterr()) 00068 log("Error was: %s\n", errstr()); 00069 close(); 00070 } 00071 00072 00073 void WvHttpStream::close() 00074 { 00075 log("close called\n"); 00076 00077 #if 0 00078 #ifdef HAVE_EXECINFO_H 00079 void *trace[10]; 00080 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0])); 00081 char** tracedump = backtrace_symbols(trace, count); 00082 log(WvLog::Debug, "TRACE"); 00083 for (int i = 0; i < count; ++i) 00084 log(WvLog::Debug, ":%s", tracedump[i]); 00085 log(WvLog::Debug, "\n"); 00086 free(tracedump); 00087 #endif 00088 #endif 00089 00090 // assume pipelining is broken if we're closing without doing at least 00091 // one successful pipelining test and a following non-test request. 00092 if (enable_pipelining && max_requests > 1 00093 && (pipeline_test_count < 1 00094 || (pipeline_test_count == 1 && last_was_pipeline_test))) 00095 pipelining_is_broken(2); 00096 00097 if (isok()) 00098 log("Closing.\n"); 00099 WvStreamClone::close(); 00100 00101 if (geterr()) 00102 { 00103 // if there was an error, count the first URL as done. This prevents 00104 // retrying indefinitely. 00105 WvUrlRequest *msgurl = curl; 00106 if (!msgurl && !urls.isempty()) 00107 msgurl = urls.first(); 00108 if (!msgurl && !waiting_urls.isempty()) 00109 msgurl = waiting_urls.first(); 00110 00111 if (msgurl) 00112 { 00113 log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(), 00114 errstr()); 00115 curl = msgurl; 00116 doneurl(); 00117 } 00118 } 00119 waiting_urls.zap(); 00120 if (curl) 00121 { 00122 log("curl is %s\n", curl->url); 00123 doneurl(); 00124 } 00125 log("close done\n"); 00126 } 00127 00128 00129 void WvHttpStream::doneurl() 00130 { 00131 // There is a slight chance that we might receive an error during or just before 00132 // this function is called, which means that the write occuring during 00133 // start_pipeline_test() would be called, which would call close() because of the 00134 // error, which would call doneurl() again. We don't want to execute doneurl() 00135 // a second time when we're already in the middle. 00136 if (in_doneurl) 00137 return; 00138 in_doneurl = true; 00139 00140 assert(curl != NULL); 00141 WvString last_response(http_response); 00142 log("Done URL: %s\n", curl->url); 00143 00144 http_response = ""; 00145 encoding = Unknown; 00146 in_chunk_trailer = false; 00147 bytes_remaining = 0; 00148 00149 last_was_pipeline_test = curl->pipeline_test; 00150 bool broken = false; 00151 if (last_was_pipeline_test) 00152 { 00153 pipeline_test_count++; 00154 if (pipeline_test_count == 1) 00155 start_pipeline_test(&curl->url); 00156 else if (pipeline_test_response != last_response) 00157 { 00158 // getting a bit late in the game to be detecting brokenness :( 00159 // However, if the response code isn't the same for both tests, 00160 // something's definitely screwy. 00161 pipelining_is_broken(4); 00162 broken = true; 00163 } 00164 pipeline_test_response = last_response; 00165 } 00166 00167 assert(curl == urls.first()); 00168 curl->done(); 00169 curl = NULL; 00170 sent_url_request = false; 00171 urls.unlink_first(); 00172 00173 if (broken) 00174 close(); 00175 00176 request_next(); 00177 in_doneurl = false; 00178 } 00179 00180 00181 static WvString encode64(WvStringParm user, WvStringParm password) 00182 { 00183 WvBase64Encoder encoder; 00184 WvString ret; 00185 encoder.flushstrstr(WvString("%s:%s", user, password), ret); 00186 return ret; 00187 } 00188 00189 00190 static WvString fixnl(WvStringParm nonl) 00191 { 00192 WvDynBuf b; 00193 const char *cptr; 00194 00195 for (cptr = nonl; cptr && *cptr; cptr++) 00196 { 00197 if (*cptr == '\r') 00198 continue; 00199 else if (*cptr == '\n') 00200 b.put("\r", 1); // put BOTH \r and \n 00201 b.put(cptr, 1); 00202 } 00203 00204 return b.getstr(); 00205 } 00206 00207 00208 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive) 00209 { 00210 WvString request; 00211 WvString auth(""); 00212 if(!!url->url.getuser() && !!url->url.getpassword()) 00213 auth = WvString("Authorization: Basic %s\n", 00214 encode64(url->url.getuser(), url->url.getpassword())); 00215 00216 request = fixnl( 00217 WvString( 00218 "%s %s HTTP/1.1\n" 00219 "Host: %s:%s\n" 00220 "Connection: %s\n" 00221 "%s" 00222 "%s" 00223 "%s%s" 00224 "\n", 00225 url->method, 00226 url->url.getfile(), 00227 url->url.gethost(), url->url.getport(), 00228 keepalive ? "keep-alive" : "close", 00229 auth, 00230 (putstream_data.used() > 0 ? WvString( 00231 "Content-Length: %s\n", putstream_data.used()) : ""), 00232 trim_string(url->headers.edit()), 00233 !!url->headers ? "\n" : "")); 00234 return request; 00235 } 00236 00237 00238 void WvHttpStream::send_request(WvUrlRequest *url) 00239 { 00240 request_count++; 00241 log("Request #%s: %s\n", request_count, url->url); 00242 write(request_str(url, url->pipeline_test 00243 || request_count < max_requests)); 00244 write(putstream_data); 00245 sent_url_request = true; 00246 alarm(60000); 00247 } 00248 00249 00250 void WvHttpStream::start_pipeline_test(WvUrl *url) 00251 { 00252 WvUrl location(WvString( 00253 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/", 00254 url->getproto(), url->gethost(), url->getport())); 00255 WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL, 00256 false, true); 00257 testurl->instream = this; 00258 send_request(testurl); 00259 urls.append(testurl, true, "sent_running_url"); 00260 } 00261 00262 00263 void WvHttpStream::request_next() 00264 { 00265 // Clear the putstream buffer before we start any new requests 00266 putstream_data.zap(); 00267 00268 // don't do a request if we've done too many already or we have none 00269 // waiting. 00270 if (request_count >= max_requests || waiting_urls.isempty()) 00271 return; 00272 00273 // don't do more than one request at a time if we're not pipelining. 00274 if (!enable_pipelining && !urls.isempty()) 00275 return; 00276 00277 // okay then, we really do want to send a new request. 00278 WvUrlRequest *url = waiting_urls.first(); 00279 00280 waiting_urls.unlink_first(); 00281 if (!url->putstream) 00282 { 00283 if (enable_pipelining && !request_count && max_requests > 1) 00284 start_pipeline_test(&url->url); 00285 send_request(url); 00286 } 00287 urls.append(url, false, "sent_running_url"); 00288 } 00289 00290 00291 void WvHttpStream::pipelining_is_broken(int why) 00292 { 00293 if (!pipeline_incompatible[target.remaddr]) 00294 { 00295 pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true); 00296 log("Pipelining is broken on this server (%s)! Disabling.\n", why); 00297 } 00298 } 00299 00300 00301 void WvHttpStream::pre_select(SelectInfo &si) 00302 { 00303 SelectRequest oldwant = si.wants; 00304 WvUrlRequest *url; 00305 00306 WvUrlStream::pre_select(si); 00307 00308 if (!urls.isempty()) 00309 { 00310 url = urls.first(); 00311 if(url && url->putstream) 00312 url->putstream->pre_select(si); 00313 } 00314 00315 si.wants = oldwant; 00316 } 00317 00318 00319 bool WvHttpStream::post_select(SelectInfo &si) 00320 { 00321 SelectRequest oldwant = si.wants; 00322 WvUrlRequest *url; 00323 00324 if (WvUrlStream::post_select(si)) 00325 return true; 00326 00327 if (!urls.isempty()) 00328 { 00329 url = urls.first(); 00330 if(url && url->putstream && url->putstream->post_select(si)) 00331 return true; 00332 } 00333 00334 si.wants = oldwant; 00335 return false; 00336 } 00337 00338 00339 void WvHttpStream::execute() 00340 { 00341 char buf[1024], *line; 00342 size_t len; 00343 00344 WvStreamClone::execute(); 00345 00346 // make connections timeout after some idleness 00347 if (alarm_was_ticking) 00348 { 00349 log(WvLog::Debug4, "urls count: %s\n", urls.count()); 00350 if (!urls.isempty()) 00351 { 00352 seterr(ETIMEDOUT); 00353 00354 // Must check again here since seterr() 00355 // will close our stream and if we only 00356 // had one url then it'll be gone. 00357 if (!urls.isempty()) 00358 { 00359 WvUrlRequest *url = urls.first(); 00360 if (url->outstream) 00361 url->outstream->seterr(ETIMEDOUT); 00362 } 00363 } 00364 else 00365 close(); // timed out, but not really an error 00366 return; 00367 } 00368 00369 // Die if somebody closed our outstream. This is so that if we were 00370 // downloading a really big file, they can stop it in the middle and 00371 // our next url request can start downloading immediately. 00372 if (curl && !curl->outstream) 00373 { 00374 if (!(encoding == PostHeadInfinity 00375 || encoding == PostHeadChunked 00376 || encoding == PostHeadStream)) 00377 { 00378 // don't complain about pipelining failures 00379 pipeline_test_count++; 00380 last_was_pipeline_test = false; 00381 close(); 00382 } 00383 00384 if (curl) 00385 doneurl(); 00386 return; 00387 } 00388 else if (curl) 00389 curl->inuse = true; 00390 00391 if(!sent_url_request && !urls.isempty()) 00392 { 00393 WvUrlRequest *url = urls.first(); 00394 if(url) 00395 { 00396 if(url->putstream) 00397 { 00398 int len = 0; 00399 if(url->putstream->isok()) 00400 len = url->putstream->read(putstream_data, 1024); 00401 00402 if(!url->putstream->isok() || len == 0) 00403 { 00404 url->putstream = NULL; 00405 send_request(url); 00406 } 00407 } 00408 } 00409 } 00410 00411 if (!curl) 00412 { 00413 // in the header section 00414 line = getline(); 00415 if (line) 00416 { 00417 line = trim_string(line); 00418 log(WvLog::Debug4, "Header: '%s'\n", line); 00419 if (!http_response) 00420 { 00421 http_response = line; 00422 00423 // there are never two pipeline test requests in a row, so 00424 // a second response string exactly like the pipeline test 00425 // response implies that everything between the first and 00426 // second test requests was lost: bad! 00427 if (last_was_pipeline_test 00428 && http_response == pipeline_test_response) 00429 { 00430 pipelining_is_broken(1); 00431 close(); 00432 return; 00433 } 00434 00435 // http response #400 is "invalid request", which we 00436 // shouldn't be sending. If we get one of these right after 00437 // a test, it probably means the stuff that came after it 00438 // was mangled in some way during transmission ...and we 00439 // should throw it away. 00440 if (last_was_pipeline_test && !!http_response) 00441 { 00442 const char *cptr = strchr(http_response, ' '); 00443 if (cptr && atoi(cptr+1) == 400) 00444 { 00445 pipelining_is_broken(3); 00446 close(); 00447 return; 00448 } 00449 } 00450 } 00451 00452 if (urls.isempty()) 00453 { 00454 log("got unsolicited data.\n"); 00455 seterr("unsolicited data from server!"); 00456 return; 00457 } 00458 00459 if (!strncasecmp(line, "Content-length: ", 16)) 00460 { 00461 bytes_remaining = atoi(line+16); 00462 encoding = ContentLength; 00463 } 00464 else if (!strncasecmp(line, "Transfer-Encoding: ", 19) 00465 && strstr(line+19, "chunked")) 00466 { 00467 encoding = Chunked; 00468 } 00469 00470 if (line[0]) 00471 { 00472 char *p; 00473 WvBufUrlStream *outstream = urls.first()->outstream; 00474 00475 if ((p = strchr(line, ':')) != NULL) 00476 { 00477 *p = 0; 00478 p = trim_string(p+1); 00479 if (outstream) { 00480 struct WvHTTPHeader *h; 00481 h = new struct WvHTTPHeader(line, p); 00482 outstream->headers.add(h, true); 00483 } 00484 } 00485 else if (strncasecmp(line, "HTTP/", 5) == 0) 00486 { 00487 char *p = strchr(line, ' '); 00488 if (p) 00489 { 00490 *p = 0; 00491 if (outstream) 00492 { 00493 outstream->version = line+5; 00494 outstream->status = atoi(p+1); 00495 } 00496 } 00497 } 00498 } 00499 else 00500 { 00501 // blank line is the beginning of data section 00502 curl = urls.first(); 00503 in_chunk_trailer = false; 00504 log(WvLog::Debug4, 00505 "Starting data: %s (enc=%s)\n", bytes_remaining, encoding); 00506 00507 if (encoding == Unknown) 00508 encoding = Infinity; // go until connection closes itself 00509 00510 if (curl->method == "HEAD") 00511 { 00512 log("Got all headers.\n"); 00513 if (!enable_pipelining) 00514 doneurl(); 00515 00516 if (encoding == Infinity) 00517 encoding = PostHeadInfinity; 00518 else if (encoding == Chunked) 00519 encoding = PostHeadChunked; 00520 else 00521 encoding = PostHeadStream; 00522 } 00523 } 00524 } 00525 } 00526 else if (encoding == PostHeadInfinity 00527 || encoding == PostHeadChunked 00528 || encoding == PostHeadStream) 00529 { 00530 WvDynBuf chkbuf; 00531 len = read(chkbuf, 5); 00532 00533 // If there is more data available right away, and it isn't an 00534 // HTTP header from another request, then it's a stupid web 00535 // server that likes to send bodies with HEAD requests. 00536 if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)), 00537 "HTTP/", 5)) 00538 { 00539 if (encoding == PostHeadInfinity) 00540 encoding = ChuckInfinity; 00541 else if (encoding == PostHeadChunked) 00542 encoding = ChuckChunked; 00543 else if (encoding == PostHeadStream) 00544 encoding = ChuckStream; 00545 else 00546 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n"); 00547 } 00548 else 00549 doneurl(); 00550 00551 unread(chkbuf, len); 00552 } 00553 else if (encoding == ChuckInfinity) 00554 { 00555 len = read(buf, sizeof(buf)); 00556 if (len) 00557 log(WvLog::Debug5, "Chucking %s bytes.\n", len); 00558 if (!isok()) 00559 doneurl(); 00560 } 00561 else if (encoding == ChuckChunked && !bytes_remaining) 00562 { 00563 encoding = Chunked; 00564 } 00565 else if (encoding == ChuckChunked || encoding == ChuckStream) 00566 { 00567 if (bytes_remaining > sizeof(buf)) 00568 len = read(buf, sizeof(buf)); 00569 else 00570 len = read(buf, bytes_remaining); 00571 bytes_remaining -= len; 00572 if (len) 00573 log(WvLog::Debug5, 00574 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining); 00575 if (!bytes_remaining && encoding == ContentLength) 00576 doneurl(); 00577 if (bytes_remaining && !isok()) 00578 seterr("connection interrupted"); 00579 } 00580 else if (encoding == Chunked && !bytes_remaining) 00581 { 00582 line = getline(); 00583 if (line) 00584 { 00585 line = trim_string(line); 00586 00587 if (in_chunk_trailer) 00588 { 00589 // in the trailer section of a chunked encoding 00590 log(WvLog::Debug4, "Trailer: '%s'\n", line); 00591 00592 // a blank line means we're finally done! 00593 if (!line[0]) 00594 doneurl(); 00595 } 00596 else 00597 { 00598 // in the "length line" section of a chunked encoding 00599 if (line[0]) 00600 { 00601 bytes_remaining = (size_t)strtoul(line, NULL, 16); 00602 if (!bytes_remaining) 00603 in_chunk_trailer = true; 00604 log(WvLog::Debug4, "Chunk length is %s ('%s').\n", 00605 bytes_remaining, line); 00606 } 00607 } 00608 } 00609 } 00610 else if (encoding == Infinity) 00611 { 00612 // just read data until the connection closes, and assume all was 00613 // well. It sucks, but there's no way to tell if all the data arrived 00614 // okay... that's why Chunked or ContentLength encoding is better. 00615 len = read(buf, sizeof(buf)); 00616 if (!isok()) 00617 return; 00618 00619 if (len) 00620 log(WvLog::Debug5, "Infinity: read %s bytes.\n", len); 00621 if (curl && curl->outstream) 00622 curl->outstream->write(buf, len); 00623 00624 if (!isok() && curl) 00625 doneurl(); 00626 } 00627 else // not chunked or currently in a chunk - read 'bytes_remaining' bytes. 00628 { 00629 // in the data section of a chunked or content-length encoding, 00630 // with 'bytes_remaining' bytes of data left. 00631 00632 if (bytes_remaining > sizeof(buf)) 00633 len = read(buf, sizeof(buf)); 00634 else 00635 len = read(buf, bytes_remaining); 00636 if (!isok()) 00637 return; 00638 00639 bytes_remaining -= len; 00640 if (len) 00641 log(WvLog::Debug5, 00642 "Read %s bytes (%s bytes left).\n", len, bytes_remaining); 00643 if (curl && curl->outstream) 00644 curl->outstream->write(buf, len); 00645 00646 if (!bytes_remaining && encoding == ContentLength && curl) 00647 doneurl(); 00648 00649 if (bytes_remaining && !isok()) 00650 seterr("connection interrupted"); 00651 00652 if (!isok()) 00653 doneurl(); 00654 } 00655 00656 if (urls.isempty()) 00657 alarm(5000); // just wait a few seconds before closing connection 00658 else 00659 alarm(60000); // give the server a minute to respond, if we're waiting 00660 }