WvStreams
|
00001 /* -*- Mode: C++ -*- 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 2004-2006 Net Integration Technologies, Inc. 00004 * 00005 * Pathfinder Software: 00006 * Copyright (C) 2007, Carillon Information Security Inc. 00007 * 00008 * This library is licensed under the LGPL, please read LICENSE for details. 00009 * 00010 */ 00011 #include "wvdbusconn.h" 00012 #include "wvmoniker.h" 00013 #include "wvstrutils.h" 00014 #undef interface // windows 00015 #include <dbus/dbus.h> 00016 00017 00018 static WvString translate(WvStringParm dbus_moniker) 00019 { 00020 WvStringList l; 00021 WvStringList::Iter i(l); 00022 00023 if (!strncasecmp(dbus_moniker, "unix:", 5)) 00024 { 00025 WvString path, tmpdir; 00026 l.split(dbus_moniker+5, ","); 00027 for (i.rewind(); i.next(); ) 00028 { 00029 if (!strncasecmp(*i, "path=", 5)) 00030 path = *i + 5; 00031 else if (!strncasecmp(*i, "abstract=", 9)) 00032 path = WvString("@%s", *i + 9); 00033 else if (!strncasecmp(*i, "tmpdir=", 7)) 00034 tmpdir = *i + 7; 00035 } 00036 if (!!path) 00037 return WvString("unix:%s", path); 00038 else if (!!tmpdir) 00039 return WvString("unix:%s/dbus.sock", tmpdir); 00040 } 00041 else if (!strncasecmp(dbus_moniker, "tcp:", 4)) 00042 { 00043 WvString host, port, family; 00044 l.split(dbus_moniker+4, ","); 00045 for (i.rewind(); i.next(); ) 00046 { 00047 if (!strncasecmp(*i, "family=", 7)) 00048 family = *i + 7; 00049 else if (!strncasecmp(*i, "host=", 5)) 00050 host = *i + 5; 00051 else if (!strncasecmp(*i, "port=", 5)) 00052 port = *i + 5; 00053 } 00054 if (!!host && !!port) 00055 return WvString("tcp:%s:%s", host, port); 00056 else if (!!host) 00057 return WvString("tcp:%s", host); 00058 else if (!!port) 00059 return WvString("tcp:0.0.0.0:%s", port); // localhost 00060 } 00061 00062 return dbus_moniker; // unrecognized 00063 } 00064 00065 00066 static IWvStream *stream_creator(WvStringParm _s, IObject *) 00067 { 00068 WvString s(_s); 00069 00070 if (!strcasecmp(s, "starter")) 00071 { 00072 WvString startbus(getenv("DBUS_STARTER_ADDRESS")); 00073 if (!!startbus) 00074 return IWvStream::create(translate(startbus)); 00075 else 00076 { 00077 WvString starttype(getenv("DBUS_STARTER_BUS_TYPE")); 00078 if (!!starttype && !strcasecmp(starttype, "system")) 00079 s = "system"; 00080 else if (!!starttype && !strcasecmp(starttype, "session")) 00081 s = "session"; 00082 } 00083 } 00084 00085 if (!strcasecmp(s, "system")) 00086 { 00087 // NOTE: the environment variable for the address of the system 00088 // bus is very often not set-- in that case, look in your dbus 00089 // system bus config file (e.g. /etc/dbus-1/system.conf) for the 00090 // raw address and either set this environment variable to that, or 00091 // pass in the address directly 00092 WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS")); 00093 if (!!bus) 00094 return IWvStream::create(translate(bus)); 00095 } 00096 00097 if (!strcasecmp(s, "session")) 00098 { 00099 WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS")); 00100 if (!!bus) 00101 return IWvStream::create(translate(bus)); 00102 } 00103 00104 return IWvStream::create(translate(s)); 00105 } 00106 00107 static WvMoniker<IWvStream> reg("dbus", stream_creator); 00108 00109 00110 static int conncount; 00111 00112 WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client) 00113 : WvStreamClone(_cloned), 00114 log(WvString("DBus %s%s", 00115 _client ? "" : "s", 00116 ++conncount), WvLog::Debug5), 00117 pending(10) 00118 { 00119 init(_auth, _client); 00120 } 00121 00122 00123 WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client) 00124 : WvStreamClone(IWvStream::create(moniker)), 00125 log(WvString("DBus %s%s", 00126 _client ? "" : "s", 00127 ++conncount), WvLog::Debug5), 00128 pending(10) 00129 { 00130 log("Connecting to '%s'\n", moniker); 00131 init(_auth, _client); 00132 } 00133 00134 00135 void WvDBusConn::init(IWvDBusAuth *_auth, bool _client) 00136 { 00137 log("Initializing.\n"); 00138 client = _client; 00139 auth = _auth ? _auth : new WvDBusClientAuth; 00140 authorized = in_post_select = false; 00141 if (!client) set_uniquename(WvString(":%s.0", conncount)); 00142 00143 if (!isok()) return; 00144 00145 delay_output(true); 00146 00147 // this will get enqueued until later, but we want to make sure it 00148 // comes before anything the user tries to send - including anything 00149 // goofy they enqueue in the authorization part. 00150 if (client) 00151 send_hello(); 00152 00153 try_auth(); 00154 } 00155 00156 WvDBusConn::~WvDBusConn() 00157 { 00158 log("Shutting down.\n"); 00159 if (geterr()) 00160 log("Error was: %s\n", errstr()); 00161 00162 close(); 00163 00164 delete auth; 00165 } 00166 00167 00168 void WvDBusConn::close() 00169 { 00170 if (!closed) 00171 log("Closing.\n"); 00172 WvStreamClone::close(); 00173 } 00174 00175 00176 WvString WvDBusConn::uniquename() const 00177 { 00178 return _uniquename; 00179 } 00180 00181 00182 void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply, 00183 time_t msec_timeout) 00184 { 00185 uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT | 00186 DBUS_NAME_FLAG_REPLACE_EXISTING); 00187 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus", 00188 "org.freedesktop.DBus", "RequestName"); 00189 msg.append(name).append(flags); 00190 send(msg, onreply, msec_timeout); 00191 } 00192 00193 00194 uint32_t WvDBusConn::send(WvDBusMsg msg) 00195 { 00196 msg.marshal(out_queue); 00197 if (authorized) 00198 { 00199 log(" >> %s\n", msg); 00200 write(out_queue); 00201 } 00202 else 00203 log(" .> %s\n", msg); 00204 return msg.get_serial(); 00205 } 00206 00207 00208 void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply, 00209 time_t msec_timeout) 00210 { 00211 send(msg); 00212 if (onreply) 00213 add_pending(msg, onreply, msec_timeout); 00214 } 00215 00216 00217 class xxReplyWaiter 00218 { 00219 public: 00220 WvDBusMsg *reply; 00221 00222 xxReplyWaiter() 00223 { reply = NULL; } 00224 ~xxReplyWaiter() 00225 { delete reply; } 00226 bool reply_wait(WvDBusMsg &msg) 00227 { reply = new WvDBusMsg(msg); return true; } 00228 }; 00229 00230 00231 WvDBusMsg WvDBusConn::send_and_wait(WvDBusMsg msg, time_t msec_timeout, 00232 wv::function<void(uint32_t)> serial_cb) 00233 { 00234 xxReplyWaiter rw; 00235 00236 send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1), 00237 msec_timeout); 00238 if (serial_cb) 00239 serial_cb(msg.get_serial()); 00240 while (!rw.reply && isok()) 00241 runonce(); 00242 if (!rw.reply) 00243 return WvDBusError(msg, DBUS_ERROR_FAILED, 00244 WvString("Connection closed (%s) " 00245 "while waiting for reply.", 00246 errstr())); 00247 else 00248 return *rw.reply; 00249 } 00250 00251 00252 void WvDBusConn::out(WvStringParm s) 00253 { 00254 log(" >> %s", s); 00255 print(s); 00256 } 00257 00258 00259 const char *WvDBusConn::in() 00260 { 00261 const char *s = trim_string(getline(0)); 00262 if (s) 00263 log("<< %s\n", s); 00264 return s; 00265 } 00266 00267 00268 void WvDBusConn::send_hello() 00269 { 00270 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus", 00271 "org.freedesktop.DBus", "Hello"); 00272 send(msg, wv::bind(&WvDBusConn::_registered, this, _1)); 00273 WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus", 00274 "org.freedesktop.DBus", "AddMatch"); 00275 msg2.append("type='signal'"); 00276 send(msg2); // don't need to monitor this for completion 00277 } 00278 00279 00280 void WvDBusConn::set_uniquename(WvStringParm s) 00281 { 00282 // we want to print the message before switching log.app, so that we 00283 // can trace which log.app turned into which 00284 log("Assigned name '%s'\n", s); 00285 _uniquename = s; 00286 log.app = WvString("DBus %s%s", client ? "" : "s", uniquename()); 00287 } 00288 00289 00290 void WvDBusConn::try_auth() 00291 { 00292 bool done = auth->authorize(*this); 00293 if (done) 00294 { 00295 // ready to send messages! 00296 if (out_queue.used()) 00297 { 00298 log(" >> (sending enqueued messages)\n"); 00299 write(out_queue); 00300 } 00301 00302 authorized = true; 00303 } 00304 } 00305 00306 00307 void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie) 00308 { 00309 callbacks.append(new CallbackInfo(pri, cb, cookie), true); 00310 } 00311 00312 00313 void WvDBusConn::del_callback(void *cookie) 00314 { 00315 // remember, there might be more than one callback with the same cookie. 00316 CallbackInfoList::Iter i(callbacks); 00317 for (i.rewind(); i.next(); ) 00318 if (i->cookie == cookie) 00319 i.xunlink(); 00320 } 00321 00322 00323 int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b) 00324 { 00325 return a->pri - b->pri; 00326 } 00327 00328 bool WvDBusConn::filter_func(WvDBusMsg &msg) 00329 { 00330 log("<< %s\n", msg); 00331 00332 // handle replies 00333 uint32_t rserial = msg.get_replyserial(); 00334 if (rserial) 00335 { 00336 Pending *p = pending[rserial]; 00337 if (p) 00338 { 00339 p->cb(msg); 00340 pending.remove(p); 00341 return true; // handled it 00342 } 00343 } 00344 00345 // handle all the generic filters 00346 CallbackInfoList::Sorter i(callbacks, priority_order); 00347 for (i.rewind(); i.next(); ) 00348 { 00349 bool handled = i->cb(msg); 00350 if (handled) return true; 00351 } 00352 00353 return false; // couldn't handle the message, sorry 00354 } 00355 00356 00357 WvDBusClientAuth::WvDBusClientAuth() 00358 { 00359 sent_request = false; 00360 } 00361 00362 00363 wvuid_t WvDBusClientAuth::get_uid() 00364 { 00365 return wvgetuid(); 00366 } 00367 00368 00369 bool WvDBusClientAuth::authorize(WvDBusConn &c) 00370 { 00371 if (!sent_request) 00372 { 00373 c.write("\0", 1); 00374 WvString uid = get_uid(); 00375 c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid)); 00376 sent_request = true; 00377 } 00378 else 00379 { 00380 const char *line = c.in(); 00381 if (line) 00382 { 00383 if (!strncasecmp(line, "OK ", 3)) 00384 { 00385 c.out("BEGIN\r\n"); 00386 return true; 00387 } 00388 else if (!strncasecmp(line, "ERROR ", 6)) 00389 c.seterr("Auth failed: %s", line); 00390 else 00391 c.seterr("Unknown AUTH response: '%s'", line); 00392 } 00393 } 00394 00395 return false; 00396 } 00397 00398 00399 time_t WvDBusConn::mintimeout_msec() 00400 { 00401 WvTime when = 0; 00402 PendingDict::Iter i(pending); 00403 for (i.rewind(); i.next(); ) 00404 { 00405 if (!when || when > i->valid_until) 00406 when = i->valid_until; 00407 } 00408 if (!when) 00409 return -1; 00410 else if (when <= wvstime()) 00411 return 0; 00412 else 00413 return msecdiff(when, wvstime()); 00414 } 00415 00416 00417 bool WvDBusConn::post_select(SelectInfo &si) 00418 { 00419 bool ready = WvStreamClone::post_select(si); 00420 if (si.inherit_request) return ready; 00421 00422 if (in_post_select) return false; 00423 in_post_select = true; 00424 00425 if (!authorized && ready) 00426 try_auth(); 00427 00428 if (!alarm_remaining()) 00429 { 00430 WvTime now = wvstime(); 00431 PendingDict::Iter i(pending); 00432 for (i.rewind(); i.next(); ) 00433 { 00434 if (now > i->valid_until) 00435 { 00436 log("Expiring %s\n", i->msg); 00437 expire_pending(i.ptr()); 00438 i.rewind(); 00439 } 00440 } 00441 } 00442 00443 if (authorized && ready) 00444 { 00445 // put this in a loop so that wvdbusd can forward packets rapidly. 00446 // Otherwise TCP_NODELAY kicks in, because we do a select() loop 00447 // between packets, which causes delay_output() to flush. 00448 bool ran; 00449 do 00450 { 00451 ran = false; 00452 size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue); 00453 size_t amt = needed - in_queue.used(); 00454 if (amt < 4096) 00455 amt = 4096; 00456 read(in_queue, amt); 00457 WvDBusMsg *m; 00458 while ((m = WvDBusMsg::demarshal(in_queue)) != NULL) 00459 { 00460 ran = true; 00461 filter_func(*m); 00462 delete m; 00463 } 00464 } while (ran); 00465 } 00466 00467 alarm(mintimeout_msec()); 00468 in_post_select = false; 00469 return false; 00470 } 00471 00472 00473 bool WvDBusConn::isidle() 00474 { 00475 return !out_queue.used() && pending.isempty(); 00476 } 00477 00478 00479 void WvDBusConn::expire_pending(Pending *p) 00480 { 00481 if (p) 00482 { 00483 WvDBusCallback xcb(p->cb); 00484 pending.remove(p); // prevent accidental recursion 00485 WvDBusError e(p->msg, DBUS_ERROR_FAILED, 00486 "Timed out while waiting for reply"); 00487 xcb(e); 00488 } 00489 } 00490 00491 00492 void WvDBusConn::cancel_pending(uint32_t serial) 00493 { 00494 Pending *p = pending[serial]; 00495 if (p) 00496 { 00497 WvDBusCallback xcb(p->cb); 00498 WvDBusMsg msg(p->msg); 00499 pending.remove(p); // prevent accidental recursion 00500 WvDBusError e(msg, DBUS_ERROR_FAILED, 00501 "Canceled while waiting for reply"); 00502 xcb(e); 00503 } 00504 } 00505 00506 00507 void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb, 00508 time_t msec_timeout) 00509 { 00510 uint32_t serial = msg.get_serial(); 00511 assert(serial); 00512 if (pending[serial]) 00513 cancel_pending(serial); 00514 pending.add(new Pending(msg, cb, msec_timeout), true); 00515 alarm(mintimeout_msec()); 00516 } 00517 00518 00519 bool WvDBusConn::_registered(WvDBusMsg &msg) 00520 { 00521 WvDBusMsg::Iter i(msg); 00522 _uniquename = i.getnext().get_str(); 00523 set_uniquename(_uniquename); 00524 return true; 00525 } 00526