WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever. 00006 * 00007 * See wvhttppool.h. 00008 */ 00009 #include <ctype.h> 00010 #include <time.h> 00011 #include "wvhttppool.h" 00012 #include "wvbufstream.h" 00013 #include "wvtcp.h" 00014 #include "strutils.h" 00015 00016 bool WvHttpStream::global_enable_pipelining = true; 00017 int WvUrlStream::max_requests = 100; 00018 00019 unsigned WvHash(const WvUrlStream::Target &n) 00020 { 00021 WvString key("%s%s", n.remaddr, n.username); 00022 return (WvHash(key)); 00023 } 00024 00025 00026 WvUrlRequest::WvUrlRequest(WvStringParm _url, WvStringParm _method, 00027 WvStringParm _headers, WvStream *content_source, 00028 bool _create_dirs, bool _pipeline_test) 00029 : url(_url), headers(_headers) 00030 { 00031 instream = NULL; 00032 create_dirs = _create_dirs; 00033 pipeline_test = _pipeline_test; 00034 method = _method; 00035 is_dir = false; // for ftp primarily; set later 00036 00037 if (pipeline_test) 00038 { 00039 outstream = NULL; 00040 putstream = NULL; 00041 } 00042 else 00043 { 00044 WvBufUrlStream *x = new WvBufUrlStream; 00045 outstream = x; 00046 x->url = url; 00047 00048 putstream = content_source; 00049 } 00050 inuse = false; 00051 } 00052 00053 00054 WvUrlRequest::~WvUrlRequest() 00055 { 00056 done(); 00057 } 00058 00059 00060 void WvUrlRequest::done() 00061 { 00062 if (outstream) 00063 { 00064 outstream->seteof(); 00065 outstream = NULL; 00066 } 00067 if (putstream) 00068 putstream = NULL; 00069 inuse = false; 00070 } 00071 00072 00073 void WvUrlStream::addurl(WvUrlRequest *url) 00074 { 00075 log(WvLog::Debug4, "Adding a new url: '%s'\n", url->url); 00076 00077 assert(url->outstream); 00078 00079 if (!url->url.isok()) 00080 return; 00081 00082 waiting_urls.append(url, false, "waiting_url"); 00083 request_next(); 00084 } 00085 00086 00087 void WvUrlStream::delurl(WvUrlRequest *url) 00088 { 00089 log(WvLog::Debug4, "Removing an url: '%s'\n", url->url); 00090 00091 if (url == curl) 00092 doneurl(); 00093 waiting_urls.unlink(url); 00094 urls.unlink(url); 00095 } 00096 00097 00098 WvHttpPool::WvHttpPool() 00099 : log("HTTP Pool", WvLog::Debug), conns(10), 00100 pipeline_incompatible(50) 00101 { 00102 log("Pool initializing.\n"); 00103 num_streams_created = 0; 00104 } 00105 00106 00107 WvHttpPool::~WvHttpPool() 00108 { 00109 log("Created %s individual session%s during this run.\n", 00110 num_streams_created, num_streams_created == 1 ? "" : "s"); 00111 if (geterr()) 00112 log("Error was: %s\n", errstr()); 00113 00114 // these must get zapped before the URL list, since they have pointers 00115 // to URLs. 00116 zap(); 00117 conns.zap(); 00118 } 00119 00120 00121 void WvHttpPool::pre_select(SelectInfo &si) 00122 { 00123 // log(WvLog::Debug5, "pre_select: main:%s conns:%s urls:%s\n", 00124 // count(), conns.count(), urls.count()); 00125 00126 WvIStreamList::pre_select(si); 00127 00128 WvUrlStreamDict::Iter ci(conns); 00129 for (ci.rewind(); ci.next(); ) 00130 { 00131 if (!ci->isok()) 00132 si.msec_timeout = 0; 00133 } 00134 00135 WvUrlRequestList::Iter i(urls); 00136 for (i.rewind(); i.next(); ) 00137 { 00138 if (!i->instream) 00139 { 00140 log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost()); 00141 if (i->url.resolve()) 00142 si.msec_timeout = 0; 00143 else 00144 dns.pre_select(i->url.gethost(), si); 00145 } 00146 } 00147 } 00148 00149 00150 bool WvHttpPool::post_select(SelectInfo &si) 00151 { 00152 bool sure = false; 00153 00154 WvUrlStreamDict::Iter ci(conns); 00155 for (ci.rewind(); ci.next(); ) 00156 { 00157 if (!ci->isok()) 00158 { 00159 log(WvLog::Debug4, "Selecting true because of a dead stream.\n"); 00160 unconnect(ci.ptr()); 00161 ci.rewind(); 00162 sure = true; 00163 } 00164 } 00165 00166 WvUrlRequestList::Iter i(urls); 00167 for (i.rewind(); i.next(); ) 00168 { 00169 if ((!i->outstream && !i->inuse) || !i->url.isok()) 00170 { 00171 //log("'%s' is dead: %s/%s\n", 00172 // i->url, i->url.isok(), i.outstream->isok()); 00173 if (!i->url.isok()) 00174 { 00175 log("URL not okay: '%s'\n", i->url); 00176 i->done(); 00177 } 00178 // nicely delete the url request 00179 WvUrlStream::Target target(i->url.getaddr(), i->url.getuser()); 00180 WvUrlStream *s = conns[target]; 00181 if (s) 00182 s->delurl(i.ptr()); 00183 i.xunlink(); 00184 continue; 00185 } 00186 00187 if (!i->instream) 00188 { 00189 log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost()); 00190 if (i->url.resolve() || dns.post_select(i->url.gethost(), si)) 00191 { 00192 log(WvLog::Debug4, "Selecting true because of '%s'\n", i->url); 00193 sure = true; 00194 } 00195 } 00196 } 00197 00198 return WvIStreamList::post_select(si) || sure; 00199 } 00200 00201 00202 void WvHttpPool::execute() 00203 { 00204 WvIStreamList::execute(); 00205 00206 WvUrlRequestList::Iter i(urls); 00207 for (i.rewind(); i.next(); ) 00208 { 00209 WvUrlStream *s; 00210 00211 if (!i->outstream || !i->url.isok() || !i->url.resolve()) 00212 continue; // skip it for now 00213 00214 WvUrlStream::Target target(i->url.getaddr(), i->url.getuser()); 00215 00216 //log(WvLog::Info, "remaddr is %s; username is %s\n", target.remaddr, 00217 // target.username); 00218 s = conns[target]; 00219 //if (!s) log("conn for '%s' is not found.\n", ip); 00220 00221 if (s && !s->isok()) 00222 { 00223 unconnect(s); 00224 s = NULL; 00225 } 00226 00227 if (!i->outstream) 00228 continue; // unconnect might have caused this URL to be marked bad 00229 00230 if (!s) 00231 { 00232 num_streams_created++; 00233 if (!strncasecmp(i->url.getproto(), "http", 4)) 00234 s = new WvHttpStream(target.remaddr, target.username, 00235 i->url.getproto() == "https", 00236 pipeline_incompatible); 00237 else if (!strcasecmp(i->url.getproto(), "ftp")) 00238 s = new WvFtpStream(target.remaddr, target.username, 00239 i->url.getpassword()); 00240 conns.add(s, true); 00241 00242 // add it to the streamlist, so it can do things 00243 append(s, false, "http/ftp stream"); 00244 } 00245 00246 if (!i->instream) 00247 { 00248 s->addurl(i.ptr()); 00249 i->instream = s; 00250 } 00251 } 00252 } 00253 00254 00255 WvBufUrlStream *WvHttpPool::addurl(WvStringParm _url, WvStringParm _method, 00256 WvStringParm _headers, WvStream *content_source, bool create_dirs) 00257 { 00258 log(WvLog::Debug4, "Adding a new url to pool: '%s'\n", _url); 00259 WvUrlRequest *url = new WvUrlRequest(_url, _method, _headers, content_source, 00260 create_dirs, false); 00261 urls.append(url, true, "addurl"); 00262 00263 return url->outstream; 00264 } 00265 00266 00267 void WvHttpPool::unconnect(WvUrlStream *s) 00268 { 00269 if (!s->target.username) 00270 log("Unconnecting stream to %s.\n", s->target.remaddr); 00271 else 00272 log("Unconnecting stream to %s@%s.\n", s->target.username, 00273 s->target.remaddr); 00274 00275 WvUrlRequestList::Iter i(urls); 00276 for (i.rewind(); i.next(); ) 00277 { 00278 if (i->instream == s) 00279 i->instream = NULL; 00280 } 00281 00282 unlink(s); 00283 conns.remove(s); 00284 }