WvStreams
|
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * UniClientGen is a UniConfGen for retrieving data from the 00006 * UniConfDaemon. 00007 */ 00008 #include "uniclientgen.h" 00009 #include "unilistiter.h" 00010 #include "wvaddr.h" 00011 #include "wvfile.h" 00012 #include "wvlinkerhack.h" 00013 #include "wvmoniker.h" 00014 #include "wvresolver.h" 00015 #include "wvsslstream.h" 00016 #include "wvstrutils.h" 00017 #include "wvstringmask.h" 00018 #include "wvtclstring.h" 00019 #include "wvtcp.h" 00020 00021 WV_LINK(UniClientGen); 00022 00023 00024 #ifndef _WIN32 00025 #include "wvunixsocket.h" 00026 static IUniConfGen *unixcreator(WvStringParm s, IObject *) 00027 { 00028 WvConstInPlaceBuf buf(s, s.len()); 00029 WvString dst(wvtcl_getword(buf)); 00030 if (!dst) dst = ""; 00031 00032 return new UniClientGen(new WvUnixConn(dst), dst); 00033 } 00034 static WvMoniker<IUniConfGen> unixreg("unix", unixcreator); 00035 #endif 00036 00037 00038 static IUniConfGen *tcpcreator(WvStringParm _s, IObject *) 00039 { 00040 WvConstInPlaceBuf buf(_s, _s.len()); 00041 WvString dst(wvtcl_getword(buf)); 00042 if (!dst) dst = ""; 00043 00044 WvString s = dst; 00045 char *cptr = s.edit(); 00046 00047 if (!strchr(cptr, ':')) // no default port 00048 s.append(":%s", DEFAULT_UNICONF_DAEMON_TCP_PORT); 00049 00050 return new UniClientGen(new WvTCPConn(s), dst); 00051 } 00052 00053 00054 static IUniConfGen *sslcreator(WvStringParm _s, IObject *) 00055 { 00056 WvConstInPlaceBuf buf(_s, _s.len()); 00057 WvString dst(wvtcl_getword(buf)); 00058 if (!dst) dst = ""; 00059 00060 WvString s = dst; 00061 char *cptr = s.edit(); 00062 00063 if (!strchr(cptr, ':')) // no default port 00064 s.append(":%s", DEFAULT_UNICONF_DAEMON_SSL_PORT); 00065 00066 return new UniClientGen(new WvSSLStream(new WvTCPConn(s), NULL), dst); 00067 } 00068 00069 00070 static IUniConfGen *wvstreamcreator(WvStringParm s, IObject *_obj) 00071 { 00072 return new UniClientGen(wvcreate<IWvStream>(s, _obj)); 00073 } 00074 00075 #ifdef WITH_SLP 00076 #include "wvslp.h" 00077 00078 // FIXME: Only gets the first 00079 static IUniConfGen *slpcreator(WvStringParm s, IObject *) 00080 { 00081 WvStringList serverlist; 00082 00083 if (slp_get_servs("uniconf.niti", serverlist)) 00084 { 00085 WvString server = serverlist.popstr(); 00086 printf("Creating connection to: %s\n", server.cstr()); 00087 return new UniClientGen(new WvTCPConn(server), s); 00088 } 00089 00090 return NULL; 00091 } 00092 00093 static WvMoniker<IUniConfGen> slpreg("slp", slpcreator); 00094 #endif 00095 00096 static WvMoniker<IUniConfGen> tcpreg("tcp", tcpcreator); 00097 static WvMoniker<IUniConfGen> sslreg("ssl", sslcreator); 00098 static WvMoniker<IUniConfGen> wvstreamreg1("wvstream", wvstreamcreator); 00099 static WvMoniker<IUniConfGen> wvstreamreg2("wv", wvstreamcreator); 00100 00101 00102 00103 00104 /***** UniClientGen *****/ 00105 00106 UniClientGen::UniClientGen(IWvStream *stream, WvStringParm dst) 00107 : log(WvString("UniClientGen to %s", 00108 dst.isnull() && stream->src() 00109 ? *stream->src() : WvString(dst))), 00110 timeout(60*1000), 00111 version(0) 00112 { 00113 cmdinprogress = cmdsuccess = false; 00114 result_list = NULL; 00115 00116 conn = new UniClientConn(stream, dst); 00117 conn->setcallback(wv::bind(&UniClientGen::conncallback, this)); 00118 WvIStreamList::globallist.append(conn, false, "uniclientconn-via-gen"); 00119 } 00120 00121 00122 UniClientGen::~UniClientGen() 00123 { 00124 if (isok()) 00125 conn->writecmd(UniClientConn::REQ_QUIT, ""); 00126 WvIStreamList::globallist.unlink(conn); 00127 WVRELEASE(conn); 00128 } 00129 00130 00131 time_t UniClientGen::set_timeout(time_t _timeout) 00132 { 00133 if (_timeout < 1000) 00134 timeout = 1000; 00135 else 00136 timeout = _timeout; 00137 return timeout; 00138 } 00139 00140 00141 bool UniClientGen::isok() 00142 { 00143 return (conn && conn->isok()); 00144 } 00145 00146 00147 bool UniClientGen::refresh() 00148 { 00149 conn->writecmd(UniClientConn::REQ_REFRESH); 00150 return do_select(); 00151 } 00152 00153 void UniClientGen::flush_buffers() 00154 { 00155 // this ensures that all keys pending notifications are dealt with 00156 while (conn->isok() && conn->isreadable()) 00157 conn->callback(); 00158 } 00159 00160 void UniClientGen::commit() 00161 { 00162 conn->writecmd(UniClientConn::REQ_COMMIT); 00163 do_select(); 00164 } 00165 00166 WvString UniClientGen::get(const UniConfKey &key) 00167 { 00168 WvString value; 00169 conn->writecmd(UniClientConn::REQ_GET, wvtcl_escape(key)); 00170 00171 if (do_select()) 00172 { 00173 if (result_key == key) 00174 value = result; 00175 // else 00176 // seterror("Error: server sent wrong key pair."); 00177 } 00178 return value; 00179 } 00180 00181 00182 void UniClientGen::set(const UniConfKey &key, WvStringParm newvalue) 00183 { 00184 //set_queue.append(new WvString(key), true); 00185 hold_delta(); 00186 00187 if (newvalue.isnull()) 00188 conn->writecmd(UniClientConn::REQ_REMOVE, wvtcl_escape(key)); 00189 else 00190 conn->writecmd(UniClientConn::REQ_SET, 00191 spacecat(wvtcl_escape(key), 00192 wvtcl_escape(newvalue), ' ')); 00193 00194 flush_buffers(); 00195 unhold_delta(); 00196 } 00197 00198 00199 void UniClientGen::setv(const UniConfPairList &pairs) 00200 { 00201 hold_delta(); 00202 00203 UniConfPairList::Iter i(pairs); 00204 if (version >= 19) 00205 { 00206 // Much like how VAL works, SETV continues sending key-value pairs 00207 // until it sends a terminating SETV, which has no arguments. 00208 for (i.rewind(); i.next(); ) 00209 { 00210 conn->writecmd(UniClientConn::REQ_SETV, 00211 spacecat(wvtcl_escape(i->key()), 00212 wvtcl_escape(i->value()), ' ')); 00213 } 00214 conn->writecmd(UniClientConn::REQ_SETV); 00215 } 00216 else 00217 { 00218 for (i.rewind(); i.next(); ) 00219 { 00220 set(i->key(), i->value()); 00221 } 00222 } 00223 00224 unhold_delta(); 00225 } 00226 00227 00228 bool UniClientGen::haschildren(const UniConfKey &key) 00229 { 00230 conn->writecmd(UniClientConn::REQ_HASCHILDREN, wvtcl_escape(key)); 00231 00232 if (do_select()) 00233 { 00234 if (result_key == key && result == "TRUE") 00235 return true; 00236 } 00237 00238 return false; 00239 } 00240 00241 00242 UniClientGen::Iter *UniClientGen::do_iterator(const UniConfKey &key, 00243 bool recursive) 00244 { 00245 assert(!result_list); 00246 result_list = new UniListIter(this); 00247 conn->writecmd(UniClientConn::REQ_SUBTREE, 00248 WvString("%s %s", wvtcl_escape(key), WvString(recursive))); 00249 00250 if (do_select()) 00251 { 00252 ListIter *it = result_list; 00253 result_list = NULL; 00254 return it; 00255 } 00256 else 00257 { 00258 delete result_list; 00259 result_list = NULL; 00260 return NULL; 00261 } 00262 } 00263 00264 00265 UniClientGen::Iter *UniClientGen::iterator(const UniConfKey &key) 00266 { 00267 return do_iterator(key, false); 00268 } 00269 00270 00271 UniClientGen::Iter *UniClientGen::recursiveiterator(const UniConfKey &key) 00272 { 00273 return do_iterator(key, true); 00274 } 00275 00276 00277 void UniClientGen::conncallback() 00278 { 00279 UniClientConn::Command command = conn->readcmd(); 00280 static const WvStringMask nasty_space(' '); 00281 switch (command) 00282 { 00283 case UniClientConn::NONE: 00284 // do nothing 00285 break; 00286 00287 case UniClientConn::REPLY_OK: 00288 cmdsuccess = true; 00289 cmdinprogress = false; 00290 break; 00291 00292 case UniClientConn::REPLY_FAIL: 00293 result_key = WvString::null; 00294 cmdsuccess = false; 00295 cmdinprogress = false; 00296 break; 00297 00298 case UniClientConn::REPLY_CHILD: 00299 { 00300 WvString key(wvtcl_getword(conn->payloadbuf, nasty_space)); 00301 WvString value(wvtcl_getword(conn->payloadbuf, nasty_space)); 00302 00303 if (!key.isnull() && !value.isnull()) 00304 { 00305 result_key = key; 00306 result = value; 00307 cmdsuccess = true; 00308 } 00309 cmdinprogress = false; 00310 break; 00311 00312 } 00313 00314 case UniClientConn::REPLY_ONEVAL: 00315 { 00316 WvString key(wvtcl_getword(conn->payloadbuf, nasty_space)); 00317 WvString value(wvtcl_getword(conn->payloadbuf, nasty_space)); 00318 00319 if (!key.isnull() && !value.isnull()) 00320 { 00321 result_key = key; 00322 result = value; 00323 cmdsuccess = true; 00324 } 00325 00326 cmdinprogress = false; 00327 break; 00328 } 00329 00330 case UniClientConn::PART_VALUE: 00331 { 00332 WvString key(wvtcl_getword(conn->payloadbuf, nasty_space)); 00333 WvString value(wvtcl_getword(conn->payloadbuf, nasty_space)); 00334 00335 if (!key.isnull() && !value.isnull()) 00336 { 00337 if (result_list) 00338 result_list->add(key, value); 00339 } 00340 break; 00341 } 00342 00343 case UniClientConn::EVENT_HELLO: 00344 { 00345 WvStringList greeting; 00346 wvtcl_decode(greeting, conn->payloadbuf.getstr(), nasty_space); 00347 WvString server(greeting.popstr()); 00348 WvString version_string(greeting.popstr()); 00349 00350 if (server.isnull() || strncmp(server, "UniConf", 7)) 00351 { 00352 // wrong type of server! 00353 log(WvLog::Error, "Connected to a non-UniConf server!\n"); 00354 00355 cmdinprogress = false; 00356 cmdsuccess = false; 00357 conn->close(); 00358 } 00359 else 00360 { 00361 version = 0; 00362 sscanf(version_string, "%d", &version); 00363 log(WvLog::Debug3, "UniConf version %s.\n", version); 00364 } 00365 break; 00366 } 00367 00368 case UniClientConn::EVENT_NOTICE: 00369 { 00370 WvString key(wvtcl_getword(conn->payloadbuf, nasty_space)); 00371 WvString value(wvtcl_getword(conn->payloadbuf, nasty_space)); 00372 delta(key, value); 00373 } 00374 00375 default: 00376 // discard unrecognized commands 00377 break; 00378 } 00379 } 00380 00381 00382 // FIXME: horribly horribly evil!! 00383 bool UniClientGen::do_select() 00384 { 00385 wvstime_sync(); 00386 00387 hold_delta(); 00388 00389 cmdinprogress = true; 00390 cmdsuccess = false; 00391 00392 time_t remaining = timeout; 00393 const time_t clock_error = 10*1000; 00394 WvTime timeout_at = msecadd(wvstime(), timeout); 00395 while (conn->isok() && cmdinprogress) 00396 { 00397 // We would really like to run the "real" wvstreams globallist 00398 // select loop here, but we can't because we may already be inside 00399 // someone else's callback or something. So we'll wait on *only* this 00400 // connection. 00401 // 00402 // We could do this using alarm()s, but because of very strage behaviour 00403 // due to inherit_request in post_select when calling the long WvStream::select() 00404 // prototype as we do here we have to do the remaining stuff outselves 00405 time_t last_remaining = remaining; 00406 bool result = conn->select(remaining, true, false); 00407 remaining = msecdiff(timeout_at, wvstime()); 00408 if (result) 00409 conn->callback(); 00410 else if (remaining <= 0 && remaining > -clock_error) 00411 { 00412 log(WvLog::Warning, "Command timeout; connection closed.\n"); 00413 cmdinprogress = false; 00414 cmdsuccess = false; 00415 conn->close(); 00416 } 00417 00418 if (result 00419 || remaining <= -clock_error 00420 || remaining >= last_remaining + clock_error) 00421 { 00422 if (!result) 00423 log(WvLog::Debug, 00424 "Clock appears to have jumped; resetting" 00425 " connection remaining.\n"); 00426 remaining = timeout; 00427 timeout_at = msecadd(wvstime(), timeout); 00428 } 00429 } 00430 00431 // if (!cmdsuccess) 00432 // seterror("Error: server timed out on response."); 00433 00434 unhold_delta(); 00435 00436 return cmdsuccess; 00437 }