WvStreams
wvdbusconn.cc
00001 /* -*- Mode: C++ -*-
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 2004-2006 Net Integration Technologies, Inc.
00004  *
00005  * Pathfinder Software:
00006  *   Copyright (C) 2007, Carillon Information Security Inc.
00007  *
00008  * This library is licensed under the LGPL, please read LICENSE for details.
00009  *
00010  */
00011 #include "wvdbusconn.h"
00012 #include "wvmoniker.h"
00013 #include "wvstrutils.h"
00014 #undef interface // windows
00015 #include <dbus/dbus.h>
00016 
00017 
00018 static WvString translate(WvStringParm dbus_moniker)
00019 {
00020     WvStringList l;
00021     WvStringList::Iter i(l);
00022 
00023     if (!strncasecmp(dbus_moniker, "unix:", 5))
00024     {
00025         WvString path, tmpdir;
00026         l.split(dbus_moniker+5, ",");
00027         for (i.rewind(); i.next(); )
00028         {
00029             if (!strncasecmp(*i, "path=", 5))
00030                 path = *i + 5;
00031             else if (!strncasecmp(*i, "abstract=", 9))
00032                 path = WvString("@%s", *i + 9);
00033             else if (!strncasecmp(*i, "tmpdir=", 7))
00034                 tmpdir = *i + 7;
00035         }
00036         if (!!path)
00037             return WvString("unix:%s", path);
00038         else if (!!tmpdir)
00039             return WvString("unix:%s/dbus.sock", tmpdir);
00040     }
00041     else if (!strncasecmp(dbus_moniker, "tcp:", 4))
00042     {
00043         WvString host, port, family;
00044         l.split(dbus_moniker+4, ",");
00045         for (i.rewind(); i.next(); )
00046         {
00047             if (!strncasecmp(*i, "family=", 7))
00048                 family = *i + 7;
00049             else if (!strncasecmp(*i, "host=", 5))
00050                 host = *i + 5;
00051             else if (!strncasecmp(*i, "port=", 5))
00052                 port = *i + 5;
00053         }
00054         if (!!host && !!port)
00055             return WvString("tcp:%s:%s", host, port);
00056         else if (!!host)
00057             return WvString("tcp:%s", host);
00058         else if (!!port)
00059             return WvString("tcp:0.0.0.0:%s", port); // localhost
00060     }
00061 
00062     return dbus_moniker; // unrecognized
00063 }
00064 
00065 
00066 static IWvStream *stream_creator(WvStringParm _s, IObject *)
00067 {
00068     WvString s(_s);
00069 
00070     if (!strcasecmp(s, "starter"))
00071     {
00072         WvString startbus(getenv("DBUS_STARTER_ADDRESS"));
00073         if (!!startbus)
00074             return IWvStream::create(translate(startbus));
00075         else
00076         {
00077             WvString starttype(getenv("DBUS_STARTER_BUS_TYPE"));
00078             if (!!starttype && !strcasecmp(starttype, "system"))
00079                 s = "system";
00080             else if (!!starttype && !strcasecmp(starttype, "session"))
00081                 s = "session";
00082         }
00083     }
00084 
00085     if (!strcasecmp(s, "system"))
00086     {
00087         // NOTE: the environment variable for the address of the system
00088         // bus is very often not set-- in that case, look in your dbus 
00089         // system bus config file (e.g. /etc/dbus-1/system.conf) for the 
00090         // raw address and either set this environment variable to that, or 
00091         // pass in the address directly
00092         WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS"));
00093         if (!!bus)
00094             return IWvStream::create(translate(bus));
00095     }
00096 
00097     if (!strcasecmp(s, "session"))
00098     {
00099         WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS"));
00100         if (!!bus)
00101             return IWvStream::create(translate(bus));
00102     }
00103 
00104     return IWvStream::create(translate(s));
00105 }
00106 
00107 static WvMoniker<IWvStream> reg("dbus", stream_creator);
00108 
00109 
00110 static int conncount;
00111 
00112 WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client)
00113     : WvStreamClone(_cloned),
00114         log(WvString("DBus %s%s",
00115                      _client ? "" : "s",
00116                      ++conncount), WvLog::Debug5),
00117         pending(10)
00118 {
00119     init(_auth, _client);
00120 }
00121 
00122 
00123 WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client)
00124     : WvStreamClone(IWvStream::create(moniker)),
00125         log(WvString("DBus %s%s",
00126                      _client ? "" : "s",
00127                      ++conncount), WvLog::Debug5),
00128         pending(10)
00129 {
00130     log("Connecting to '%s'\n", moniker);
00131     init(_auth, _client);
00132 }
00133 
00134 
00135 void WvDBusConn::init(IWvDBusAuth *_auth, bool _client)
00136 {
00137     log("Initializing.\n");
00138     client = _client;
00139     auth = _auth ? _auth : new WvDBusClientAuth;
00140     authorized = in_post_select = false;
00141     if (!client) set_uniquename(WvString(":%s.0", conncount));
00142 
00143     if (!isok()) return;
00144     
00145     delay_output(true);
00146 
00147     // this will get enqueued until later, but we want to make sure it
00148     // comes before anything the user tries to send - including anything
00149     // goofy they enqueue in the authorization part.
00150     if (client)
00151         send_hello();
00152 
00153     try_auth();
00154 }
00155 
00156 WvDBusConn::~WvDBusConn()
00157 {
00158     log("Shutting down.\n");
00159     if (geterr())
00160         log("Error was: %s\n", errstr());
00161 
00162     close();
00163 
00164     delete auth;
00165 }
00166 
00167 
00168 void WvDBusConn::close()
00169 {
00170     if (!closed)
00171         log("Closing.\n");
00172     WvStreamClone::close();
00173 }
00174 
00175 
00176 WvString WvDBusConn::uniquename() const
00177 {
00178     return _uniquename;
00179 }
00180 
00181 
00182 void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply,
00183                               time_t msec_timeout)
00184 {
00185     uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT |
00186                       DBUS_NAME_FLAG_REPLACE_EXISTING);
00187     WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
00188                   "org.freedesktop.DBus", "RequestName");
00189     msg.append(name).append(flags);
00190     send(msg, onreply, msec_timeout);
00191 }
00192 
00193 
00194 uint32_t WvDBusConn::send(WvDBusMsg msg)
00195 {
00196     msg.marshal(out_queue);
00197     if (authorized)
00198     {
00199         log(" >> %s\n", msg);
00200         write(out_queue);
00201     }
00202     else
00203         log(" .> %s\n", msg);
00204     return msg.get_serial();
00205 }
00206 
00207 
00208 void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply,
00209                       time_t msec_timeout)
00210 {
00211     send(msg);
00212     if (onreply)
00213         add_pending(msg, onreply, msec_timeout);
00214 }
00215 
00216 
00217 class xxReplyWaiter
00218 {
00219 public:
00220     WvDBusMsg *reply;
00221     
00222     xxReplyWaiter()
00223         { reply = NULL; }
00224     ~xxReplyWaiter()
00225         { delete reply; }
00226     bool reply_wait(WvDBusMsg &msg)
00227         { reply = new WvDBusMsg(msg); return true; }
00228 };
00229 
00230 
00231 WvDBusMsg WvDBusConn::send_and_wait(WvDBusMsg msg, time_t msec_timeout,
00232                                 wv::function<void(uint32_t)> serial_cb)
00233 {
00234     xxReplyWaiter rw;
00235     
00236     send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1),
00237          msec_timeout);
00238     if (serial_cb)
00239         serial_cb(msg.get_serial());
00240     while (!rw.reply && isok())
00241         runonce();
00242     if (!rw.reply)
00243         return WvDBusError(msg, DBUS_ERROR_FAILED,
00244                            WvString("Connection closed (%s) "
00245                                     "while waiting for reply.",
00246                                     errstr()));
00247     else
00248         return *rw.reply;
00249 }
00250 
00251 
00252 void WvDBusConn::out(WvStringParm s)
00253 {
00254     log(" >> %s", s);
00255     print(s);
00256 }
00257 
00258 
00259 const char *WvDBusConn::in()
00260 {
00261     const char *s = trim_string(getline(0));
00262     if (s)
00263         log("<<  %s\n", s);
00264     return s;
00265 }
00266 
00267 
00268 void WvDBusConn::send_hello()
00269 {
00270     WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
00271                   "org.freedesktop.DBus", "Hello");
00272     send(msg, wv::bind(&WvDBusConn::_registered, this, _1));
00273     WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus",
00274                    "org.freedesktop.DBus", "AddMatch");
00275     msg2.append("type='signal'");
00276     send(msg2); // don't need to monitor this for completion
00277 }
00278 
00279 
00280 void WvDBusConn::set_uniquename(WvStringParm s)
00281 {
00282     // we want to print the message before switching log.app, so that we
00283     // can trace which log.app turned into which
00284     log("Assigned name '%s'\n", s);
00285     _uniquename = s;
00286     log.app = WvString("DBus %s%s", client ? "" : "s", uniquename());
00287 }
00288 
00289 
00290 void WvDBusConn::try_auth()
00291 {
00292     bool done = auth->authorize(*this);
00293     if (done)
00294     {
00295         // ready to send messages!
00296         if (out_queue.used())
00297         {
00298             log(" >> (sending enqueued messages)\n");
00299             write(out_queue);
00300         }
00301 
00302         authorized = true;
00303     }
00304 }
00305 
00306 
00307 void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie)
00308 {
00309     callbacks.append(new CallbackInfo(pri, cb, cookie), true);
00310 }
00311 
00312 
00313 void WvDBusConn::del_callback(void *cookie)
00314 {
00315     // remember, there might be more than one callback with the same cookie.
00316     CallbackInfoList::Iter i(callbacks);
00317     for (i.rewind(); i.next(); )
00318         if (i->cookie == cookie)
00319             i.xunlink();
00320 }
00321 
00322 
00323 int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b)
00324 {
00325     return a->pri - b->pri;
00326 }
00327 
00328 bool WvDBusConn::filter_func(WvDBusMsg &msg)
00329 {
00330     log("<<  %s\n", msg);
00331 
00332     // handle replies
00333     uint32_t rserial = msg.get_replyserial();
00334     if (rserial)
00335     {
00336         Pending *p = pending[rserial];
00337         if (p)
00338         {
00339             p->cb(msg);
00340             pending.remove(p);
00341             return true; // handled it
00342         }
00343     }
00344 
00345     // handle all the generic filters
00346     CallbackInfoList::Sorter i(callbacks, priority_order);
00347     for (i.rewind(); i.next(); )
00348     {
00349         bool handled = i->cb(msg);
00350         if (handled) return true;
00351     }
00352 
00353     return false; // couldn't handle the message, sorry
00354 }
00355 
00356 
00357 WvDBusClientAuth::WvDBusClientAuth()
00358 {
00359     sent_request = false;
00360 }
00361 
00362 
00363 wvuid_t WvDBusClientAuth::get_uid()
00364 {
00365     return wvgetuid();
00366 }
00367 
00368 
00369 bool WvDBusClientAuth::authorize(WvDBusConn &c)
00370 {
00371     if (!sent_request)
00372     {
00373         c.write("\0", 1);
00374         WvString uid = get_uid();
00375         c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid));
00376         sent_request = true;
00377     }
00378     else
00379     {
00380         const char *line = c.in();
00381         if (line)
00382         {
00383             if (!strncasecmp(line, "OK ", 3))
00384             {
00385                 c.out("BEGIN\r\n");
00386                 return true;
00387             }
00388             else if (!strncasecmp(line, "ERROR ", 6))
00389                 c.seterr("Auth failed: %s", line);
00390             else
00391                 c.seterr("Unknown AUTH response: '%s'", line);
00392         }
00393     }
00394 
00395     return false;
00396 }
00397 
00398 
00399 time_t WvDBusConn::mintimeout_msec()
00400 {
00401     WvTime when = 0;
00402     PendingDict::Iter i(pending);
00403     for (i.rewind(); i.next(); )
00404     {
00405         if (!when || when > i->valid_until)
00406             when = i->valid_until;
00407     }
00408     if (!when)
00409         return -1;
00410     else if (when <= wvstime())
00411         return 0;
00412     else
00413         return msecdiff(when, wvstime());
00414 }
00415 
00416 
00417 bool WvDBusConn::post_select(SelectInfo &si)
00418 {
00419     bool ready = WvStreamClone::post_select(si);
00420     if (si.inherit_request) return ready;
00421     
00422     if (in_post_select) return false;
00423     in_post_select = true;
00424 
00425     if (!authorized && ready)
00426         try_auth();
00427 
00428     if (!alarm_remaining())
00429     {
00430         WvTime now = wvstime();
00431         PendingDict::Iter i(pending);
00432         for (i.rewind(); i.next(); )
00433         {
00434             if (now > i->valid_until)
00435             {
00436                 log("Expiring %s\n", i->msg);
00437                 expire_pending(i.ptr());
00438                 i.rewind();
00439             }
00440         }
00441     }
00442 
00443     if (authorized && ready)
00444     {
00445         // put this in a loop so that wvdbusd can forward packets rapidly.
00446         // Otherwise TCP_NODELAY kicks in, because we do a select() loop
00447         // between packets, which causes delay_output() to flush.
00448         bool ran;
00449         do
00450         {
00451             ran = false;
00452             size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue);
00453             size_t amt = needed - in_queue.used();
00454             if (amt < 4096)
00455                 amt = 4096;
00456             read(in_queue, amt);
00457             WvDBusMsg *m;
00458             while ((m = WvDBusMsg::demarshal(in_queue)) != NULL)
00459             {
00460                 ran = true;
00461                 filter_func(*m);
00462                 delete m;
00463             }
00464         } while (ran);
00465     }
00466 
00467     alarm(mintimeout_msec());
00468     in_post_select = false;
00469     return false;
00470 }
00471 
00472 
00473 bool WvDBusConn::isidle()
00474 {
00475     return !out_queue.used() && pending.isempty();
00476 }
00477 
00478 
00479 void WvDBusConn::expire_pending(Pending *p)
00480 {
00481     if (p)
00482     {
00483         WvDBusCallback xcb(p->cb);
00484         pending.remove(p); // prevent accidental recursion
00485         WvDBusError e(p->msg, DBUS_ERROR_FAILED,
00486                       "Timed out while waiting for reply");
00487         xcb(e);
00488     }
00489 }
00490 
00491 
00492 void WvDBusConn::cancel_pending(uint32_t serial)
00493 {
00494     Pending *p = pending[serial];
00495     if (p)
00496     {
00497         WvDBusCallback xcb(p->cb);
00498         WvDBusMsg msg(p->msg);
00499         pending.remove(p); // prevent accidental recursion
00500         WvDBusError e(msg, DBUS_ERROR_FAILED,
00501                       "Canceled while waiting for reply");
00502         xcb(e);
00503     }
00504 }
00505 
00506 
00507 void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb,
00508                  time_t msec_timeout)
00509 {
00510     uint32_t serial = msg.get_serial();
00511     assert(serial);
00512     if (pending[serial])
00513         cancel_pending(serial);
00514     pending.add(new Pending(msg, cb, msec_timeout), true);
00515     alarm(mintimeout_msec());
00516 }
00517 
00518 
00519 bool WvDBusConn::_registered(WvDBusMsg &msg)
00520 {
00521     WvDBusMsg::Iter i(msg);
00522     _uniquename = i.getnext().get_str();
00523     set_uniquename(_uniquename);
00524     return true;
00525 }
00526