WvStreams
wvhttppool.cc
00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
00006  * 
00007  * See wvhttppool.h.
00008  */
00009 #include <ctype.h>
00010 #include <time.h>
00011 #include "wvhttppool.h"
00012 #include "wvbufstream.h"
00013 #include "wvtcp.h"
00014 #include "strutils.h"
00015 
00016 bool WvHttpStream::global_enable_pipelining = true;
00017 int WvUrlStream::max_requests = 100;
00018 
00019 unsigned WvHash(const WvUrlStream::Target &n)
00020 {
00021     WvString key("%s%s", n.remaddr, n.username);
00022     return (WvHash(key));
00023 }
00024 
00025 
00026 WvUrlRequest::WvUrlRequest(WvStringParm _url, WvStringParm _method,
00027                 WvStringParm _headers, WvStream *content_source,
00028                 bool _create_dirs, bool _pipeline_test)
00029     : url(_url), headers(_headers)
00030 { 
00031     instream = NULL;
00032     create_dirs = _create_dirs;
00033     pipeline_test = _pipeline_test;
00034     method = _method;
00035     is_dir = false;    // for ftp primarily; set later
00036 
00037     if (pipeline_test)
00038     {
00039         outstream = NULL;
00040         putstream = NULL;
00041     }
00042     else
00043     {
00044         WvBufUrlStream *x = new WvBufUrlStream;
00045         outstream = x;
00046         x->url = url;
00047 
00048         putstream = content_source;
00049     }
00050     inuse = false;
00051 }
00052 
00053 
00054 WvUrlRequest::~WvUrlRequest()
00055 {
00056     done();
00057 }
00058 
00059 
00060 void WvUrlRequest::done()
00061 {
00062     if (outstream)
00063     {
00064         outstream->seteof();
00065         outstream = NULL; 
00066     }
00067     if (putstream)
00068         putstream = NULL;
00069     inuse = false;
00070 }
00071 
00072 
00073 void WvUrlStream::addurl(WvUrlRequest *url)
00074 {
00075     log(WvLog::Debug4, "Adding a new url: '%s'\n", url->url);
00076 
00077     assert(url->outstream);
00078 
00079     if (!url->url.isok())
00080         return;
00081 
00082     waiting_urls.append(url, false, "waiting_url");
00083     request_next();
00084 }
00085 
00086 
00087 void WvUrlStream::delurl(WvUrlRequest *url)
00088 {
00089     log(WvLog::Debug4, "Removing an url: '%s'\n", url->url);
00090 
00091     if (url == curl)
00092         doneurl();
00093     waiting_urls.unlink(url);
00094     urls.unlink(url);
00095 }
00096 
00097 
00098 WvHttpPool::WvHttpPool() 
00099     : log("HTTP Pool", WvLog::Debug), conns(10),
00100       pipeline_incompatible(50)
00101 {
00102     log("Pool initializing.\n");
00103     num_streams_created = 0;
00104 }
00105 
00106 
00107 WvHttpPool::~WvHttpPool()
00108 {
00109     log("Created %s individual session%s during this run.\n",
00110             num_streams_created, num_streams_created == 1 ? "" : "s");
00111     if (geterr())
00112         log("Error was: %s\n", errstr());
00113 
00114     // these must get zapped before the URL list, since they have pointers
00115     // to URLs.
00116     zap();
00117     conns.zap();
00118 }
00119 
00120 
00121 void WvHttpPool::pre_select(SelectInfo &si)
00122 {
00123     //    log(WvLog::Debug5, "pre_select: main:%s conns:%s urls:%s\n",
00124     //         count(), conns.count(), urls.count());
00125 
00126     WvIStreamList::pre_select(si);
00127 
00128     WvUrlStreamDict::Iter ci(conns);
00129     for (ci.rewind(); ci.next(); )
00130     {
00131         if (!ci->isok())
00132             si.msec_timeout = 0;
00133     }
00134     
00135     WvUrlRequestList::Iter i(urls);
00136     for (i.rewind(); i.next(); )
00137     {
00138         if (!i->instream)
00139         {
00140             log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
00141             if (i->url.resolve())
00142                 si.msec_timeout = 0;
00143             else
00144                 dns.pre_select(i->url.gethost(), si);    
00145         }
00146     }
00147 }
00148 
00149 
00150 bool WvHttpPool::post_select(SelectInfo &si)
00151 {
00152     bool sure = false;
00153 
00154     WvUrlStreamDict::Iter ci(conns);
00155     for (ci.rewind(); ci.next(); )
00156     {
00157         if (!ci->isok())
00158         {
00159             log(WvLog::Debug4, "Selecting true because of a dead stream.\n");
00160             unconnect(ci.ptr());
00161             ci.rewind();
00162             sure = true;
00163         }
00164     }
00165 
00166     WvUrlRequestList::Iter i(urls);
00167     for (i.rewind(); i.next(); )
00168     {
00169         if ((!i->outstream && !i->inuse) || !i->url.isok())
00170         {
00171             //log("'%s' is dead: %s/%s\n", 
00172             //  i->url, i->url.isok(), i.outstream->isok());
00173             if (!i->url.isok())
00174             {
00175                 log("URL not okay: '%s'\n", i->url);
00176                 i->done();
00177             }
00178             // nicely delete the url request
00179             WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
00180             WvUrlStream *s = conns[target];
00181             if (s)
00182                 s->delurl(i.ptr());
00183             i.xunlink();
00184             continue;
00185         }
00186 
00187         if (!i->instream)
00188         {
00189             log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
00190             if (i->url.resolve() || dns.post_select(i->url.gethost(), si))
00191             {
00192                 log(WvLog::Debug4, "Selecting true because of '%s'\n", i->url);
00193                 sure = true;
00194             }
00195         }
00196     }
00197 
00198     return WvIStreamList::post_select(si) || sure;
00199 }
00200 
00201 
00202 void WvHttpPool::execute()
00203 {
00204     WvIStreamList::execute();
00205 
00206     WvUrlRequestList::Iter i(urls);
00207     for (i.rewind(); i.next(); )
00208     {
00209         WvUrlStream *s;
00210 
00211         if (!i->outstream || !i->url.isok() || !i->url.resolve())
00212             continue; // skip it for now
00213 
00214         WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
00215 
00216         //log(WvLog::Info, "remaddr is %s; username is %s\n", target.remaddr,
00217         //    target.username);
00218         s = conns[target];
00219         //if (!s) log("conn for '%s' is not found.\n", ip);
00220 
00221         if (s && !s->isok())
00222         {
00223             unconnect(s);
00224             s = NULL;
00225         }
00226 
00227         if (!i->outstream)
00228             continue; // unconnect might have caused this URL to be marked bad
00229 
00230         if (!s)
00231         {
00232             num_streams_created++;
00233             if (!strncasecmp(i->url.getproto(), "http", 4))
00234                 s = new WvHttpStream(target.remaddr, target.username,
00235                         i->url.getproto() == "https",
00236                         pipeline_incompatible);
00237             else if (!strcasecmp(i->url.getproto(), "ftp"))
00238                 s = new WvFtpStream(target.remaddr, target.username,
00239                         i->url.getpassword());
00240             conns.add(s, true);
00241 
00242             // add it to the streamlist, so it can do things
00243             append(s, false, "http/ftp stream");
00244         }
00245 
00246         if (!i->instream)
00247         {
00248             s->addurl(i.ptr());
00249             i->instream = s;
00250         }
00251     }
00252 }
00253 
00254 
00255 WvBufUrlStream *WvHttpPool::addurl(WvStringParm _url, WvStringParm _method,
00256         WvStringParm _headers, WvStream *content_source, bool create_dirs)
00257 {
00258     log(WvLog::Debug4, "Adding a new url to pool: '%s'\n", _url);
00259     WvUrlRequest *url = new WvUrlRequest(_url, _method, _headers, content_source,
00260                                          create_dirs, false);
00261     urls.append(url, true, "addurl");
00262 
00263     return url->outstream;
00264 }
00265 
00266 
00267 void WvHttpPool::unconnect(WvUrlStream *s)
00268 {
00269     if (!s->target.username)
00270         log("Unconnecting stream to %s.\n", s->target.remaddr);
00271     else
00272         log("Unconnecting stream to %s@%s.\n", s->target.username,
00273                 s->target.remaddr);
00274 
00275     WvUrlRequestList::Iter i(urls);
00276     for (i.rewind(); i.next(); )
00277     {
00278         if (i->instream == s)
00279             i->instream = NULL;
00280     }
00281 
00282     unlink(s);
00283     conns.remove(s);
00284 }