APIServer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <sys/stat.h>
00019 #include <oasys/compat/inet_aton.h>
00020 #include <oasys/io/FileIOClient.h>
00021 #include <oasys/io/NetUtils.h>
00022 #include <oasys/util/Pointers.h>
00023 #include <oasys/util/ScratchBuffer.h>
00024 #include <oasys/util/XDRUtils.h>
00025 
00026 #include "APIServer.h"
00027 #include "bundling/APIBlockProcessor.h"
00028 #include "bundling/Bundle.h"
00029 #include "bundling/BundleEvent.h"
00030 #include "bundling/BundleDaemon.h"
00031 #include "bundling/BundleStatusReport.h"
00032 #include "bundling/SDNV.h"
00033 #include "cmd/APICommand.h"
00034 #include "reg/APIRegistration.h"
00035 #include "reg/RegistrationTable.h"
00036 #include "routing/BundleRouter.h"
00037 #include "storage/GlobalStore.h"
00038 
00039 #ifndef MIN
00040 #define MIN(x, y) ((x)<(y) ? (x) : (y))
00041 #endif
00042 
00043 
00044 #ifdef __CYGWIN__
00045 // Cygwin's xdr.h file is k&r, so we need to make the declarations
00046 // more specific here to avoid errors when compiling with g++ instead
00047 // of gcc.
00048 
00049 extern "C" {
00050     extern void xdrmem_create(XDR *__xdrs, __const caddr_t __addr,
00051                               u_int __size, enum xdr_op __xop);
00052 }
00053 
00054 // these defines add a cast to change the function pointer for a function
00055 // with no args (which we get from xdr.h) into a function pointer with
00056 // args (i.e. k&r to ansi c).
00057 
00058 typedef void (*xdr_setpos_t)(XDR *, int);
00059 #undef xdr_setpos
00060 #define xdr_setpos(xdrs, pos) ((xdr_setpos_t)(*(xdrs)->x_ops->x_setpostn))(xdrs, pos)
00061 
00062 typedef int (*xdr_getpos_t)(XDR *);
00063 #undef xdr_getpos
00064 #define xdr_getpos(xdrs) ((xdr_getpos_t)(*(xdrs)->x_ops->x_getpostn))(xdrs)
00065 
00066 typedef int (*xdr_putlong_t)(XDR *, long *);
00067 #undef xdr_putlong
00068 #define xdr_putlong(xdrs, ptr) ((xdr_putlong_t)(*(xdrs)->x_ops->x_putlong))(xdrs, ptr)
00069 
00070 #endif
00071 
00072 namespace dtn {
00073 
00074 //----------------------------------------------------------------------
00075 APIServer::APIServer()
00076     : TCPServerThread("APIServer", "/dtn/apiserver", DELETE_ON_EXIT)
00077 {
00078     local_addr_ = htonl(INADDR_LOOPBACK);
00079     local_port_ = DTN_IPC_PORT;
00080 
00081     // override the defaults via environment variables, if given
00082     char *env;
00083     if ((env = getenv("DTNAPI_ADDR")) != NULL) {
00084         if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
00085         {
00086             log_err("DTNAPI_ADDR environment variable (%s) "
00087                     "not a valid ip address, using default of localhost",
00088                     env);
00089             // in case inet_aton touched it
00090             local_addr_ = htonl(INADDR_LOOPBACK);
00091         } else {
00092             log_debug("local address set to %s by DTNAPI_ADDR "
00093                       "environment variable", env);
00094         }
00095     }
00096 
00097     if ((env = getenv("DTNAPI_PORT")) != NULL) {
00098         char *end;
00099         u_int port = strtoul(env, &end, 10);
00100         if (*end != '\0' || port > 0xffff)
00101         {
00102             log_err("DTNAPI_PORT environment variable (%s) "
00103                     "not a valid ip port, using default of %d",
00104                     env, DTN_IPC_PORT);
00105             port = DTN_IPC_PORT;
00106         } else {
00107             log_debug("api port set to %s by DTNAPI_PORT "
00108                       "environment variable", env);
00109         }
00110         local_port_ = (u_int16_t)port;
00111     }
00112 
00113     if (local_addr_ != INADDR_ANY || local_port_ != 0) {
00114         log_debug("APIServer init (evironment set addr %s port %d)",
00115                   intoa(local_addr_), local_port_);
00116     } else {
00117         log_debug("APIServer init");
00118     }
00119 
00120     oasys::TclCommandInterp::instance()->reg(new APICommand(this));
00121 }
00122 
00123 //----------------------------------------------------------------------
00124 void
00125 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
00126 {
00127     APIClient* c = new APIClient(fd, addr, port);
00128     c->start();
00129 }
00130 
00131 //----------------------------------------------------------------------
00132 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port)
00133     : Thread("APIClient", DELETE_ON_EXIT),
00134       TCPClient(fd, addr, port, "/dtn/apiclient"),
00135       notifier_(logpath_)
00136 {
00137     // note that we skip space for the message length and code/status
00138     xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
00139     xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
00140 
00141     bindings_ = new APIRegistrationList();
00142 }
00143 
00144 //----------------------------------------------------------------------
00145 APIClient::~APIClient()
00146 {
00147     log_debug("client destroyed");
00148     delete_z(bindings_);
00149 }
00150 
00151 //----------------------------------------------------------------------
00152 void
00153 APIClient::close_session()
00154 {
00155     TCPClient::close();
00156 
00157     APIRegistration* reg;
00158     while (! bindings_->empty()) {
00159         reg = bindings_->front();
00160         bindings_->pop_front();
00161         
00162         reg->set_active(false);
00163 
00164         if (reg->expired()) {
00165             log_debug("removing expired registration %d", reg->regid());
00166             BundleDaemon::post(new RegistrationExpiredEvent(reg->regid()));
00167         }
00168     }
00169 }
00170 
00171 //----------------------------------------------------------------------
00172 int
00173 APIClient::handle_handshake()
00174 {
00175     u_int32_t handshake;
00176     u_int16_t message_type, ipc_version;
00177     
00178     int ret = readall((char*)&handshake, sizeof(handshake));
00179     if (ret != sizeof(handshake)) {
00180         log_err("error reading handshake: (got %d/%zu), \"error\" %s",
00181                 ret, sizeof(handshake), strerror(errno));
00182         return -1;
00183     }
00184 
00185     message_type = ntohl(handshake) >> 16;
00186     ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
00187 
00188     if (message_type != DTN_OPEN) {
00189         log_err("handshake (%d)'s message type %d != DTN_OPEN (%d)",
00190                 handshake, message_type, DTN_OPEN);
00191         return -1;
00192     }
00193     
00194     if (ipc_version != DTN_IPC_VERSION) {
00195         log_err("handshake (%d)'s version %d != DTN_IPC_VERSION (%d)",
00196                 handshake, ipc_version, DTN_IPC_VERSION);
00197         return -1;
00198     }
00199     
00200     ret = writeall((char*)&handshake, sizeof(handshake));
00201     if (ret != sizeof(handshake)) {
00202         log_err("error writing handshake: %s", strerror(errno));
00203         return -1;
00204     }
00205 
00206     return 0;
00207 }
00208 
00209 //----------------------------------------------------------------------
00210 void
00211 APIClient::run()
00212 {
00213     int ret;
00214     u_int8_t type;
00215     u_int32_t len;
00216     
00217     log_info("new session %s:%d -> %s:%d",
00218              intoa(local_addr()), local_port(),
00219              intoa(remote_addr()), remote_port());
00220 
00221     if (handle_handshake() != 0) {
00222         close_session();
00223         return;
00224     }
00225     
00226     while (true) {
00227         xdr_setpos(&xdr_encode_, 0);
00228         xdr_setpos(&xdr_decode_, 0);
00229 
00230         // read the incoming message into the fourth byte of the
00231         // buffer, since the typecode + message length is only five
00232         // bytes long, but the XDR engines are set to point at the
00233         // eighth byte of the buffer
00234         ret = read(&buf_[3], DTN_MAX_API_MSG);
00235             
00236         if (ret <= 0) {
00237             log_warn("client error or disconnection");
00238             close_session();
00239             return;
00240         }
00241         
00242         if (ret < 5) {
00243             log_err("ack!! can't handle really short read...");
00244             close_session();
00245             return;
00246         }
00247 
00248         // NOTE: this protocol is duplicated in the implementation of
00249         // handle_begin_poll to take care of a cancel_poll request
00250         // coming in while the thread is waiting for bundles so any
00251         // modifications must be propagated there
00252         type = buf_[3];
00253         memcpy(&len, &buf_[4], sizeof(len));
00254 
00255         len = ntohl(len);
00256 
00257         ret -= 5;
00258         log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
00259 
00260         // if we didn't get the whole message, loop to get the rest,
00261         // skipping the header bytes and the already-read amount
00262         if (ret < (int)len) {
00263             int toget = len - ret;
00264             if (readall(&buf_[8 + ret], toget) != toget) {
00265                 log_err("error reading message remainder: %s",
00266                         strerror(errno));
00267                 close_session();
00268                 return;
00269             }
00270         }
00271 
00272         // dispatch to the handler routine
00273         switch(type) {
00274 #define DISPATCH(_type, _fn)                    \
00275         case _type:                             \
00276             ret = _fn();                        \
00277             break;
00278             
00279             DISPATCH(DTN_LOCAL_EID,         handle_local_eid);
00280             DISPATCH(DTN_REGISTER,          handle_register);
00281             DISPATCH(DTN_UNREGISTER,        handle_unregister);
00282             DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
00283             DISPATCH(DTN_SEND,              handle_send);
00284             DISPATCH(DTN_BIND,              handle_bind);
00285             DISPATCH(DTN_UNBIND,            handle_unbind);
00286             DISPATCH(DTN_RECV,              handle_recv);
00287             DISPATCH(DTN_BEGIN_POLL,        handle_begin_poll);
00288             DISPATCH(DTN_CANCEL_POLL,       handle_cancel_poll);
00289             DISPATCH(DTN_CLOSE,             handle_close);
00290 #undef DISPATCH
00291 
00292         default:
00293             log_err("unknown message type code 0x%x", type);
00294             ret = DTN_EMSGTYPE;
00295             break;
00296         }
00297 
00298         // if the handler returned -1, then the session should be
00299         // immediately terminated
00300         if (ret == -1) {
00301             close_session();
00302             return;
00303         }
00304         
00305         // send the response
00306         if (send_response(ret) != 0) {
00307             return;
00308         }
00309         // if there was an IPC communication error or unknown message
00310         // type, close terminate the session
00311         // XXX/matt we could potentially close on all errors, not just these 2
00312         if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
00313             close_session();
00314             return;
00315         }
00316         
00317     } // while(1)
00318 }
00319 
00320 //----------------------------------------------------------------------
00321 int
00322 APIClient::send_response(int ret)
00323 {
00324     u_int32_t len, msglen;
00325     
00326     // make sure the dispatched function returned a valid error
00327     // code
00328     ASSERT(ret == DTN_SUCCESS ||
00329            (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
00330         
00331     // fill in the reply message with the status code and the
00332     // length of the reply. note that if there is no reply, then
00333     // the xdr position should still be zero
00334     len = xdr_getpos(&xdr_encode_);
00335     log_debug("building reply: status %s, length %d",
00336               dtnipc_msgtoa(ret), len);
00337 
00338     msglen = len + 8;
00339     ret = ntohl(ret);
00340     len = htonl(len);
00341 
00342     memcpy(buf_,     &ret, sizeof(ret));
00343     memcpy(&buf_[4], &len, sizeof(len));
00344 
00345     log_debug("sending %d byte reply message", msglen);
00346     if (writeall(buf_, msglen) != (int)msglen) {
00347         log_err("error sending reply: %s", strerror(errno));
00348         close_session();
00349         return -1;
00350     }
00351 
00352     return 0;
00353 }
00354         
00355 //----------------------------------------------------------------------
00356 bool
00357 APIClient::is_bound(u_int32_t regid)
00358 {
00359     APIRegistrationList::iterator iter;
00360     for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00361         if ((*iter)->regid() == regid) {
00362             return true;
00363         }
00364     }
00365 
00366     return false;
00367 }
00368 
00369 //----------------------------------------------------------------------
00370 int
00371 APIClient::handle_local_eid()
00372 {
00373     dtn_service_tag_t service_tag;
00374     dtn_endpoint_id_t local_eid;
00375     
00376     // unpack the request
00377     if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
00378     {
00379         log_err("error in xdr unpacking arguments");
00380         return DTN_EXDR;
00381     }
00382 
00383     // build up the response
00384     EndpointID eid(BundleDaemon::instance()->local_eid());
00385     if (eid.append_service_tag(service_tag.tag) == false) {
00386         log_err("error appending service tag");
00387         return DTN_EINVAL;
00388     }
00389 
00390     memset(&local_eid, 0, sizeof(local_eid));
00391     eid.copyto(&local_eid);
00392     
00393     // pack the response
00394     if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
00395         log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
00396         return DTN_EXDR;
00397     }
00398 
00399     log_debug("get_local_eid encoded %d byte response",
00400               xdr_getpos(&xdr_encode_));
00401     
00402     return DTN_SUCCESS;
00403 }
00404 
00405 //----------------------------------------------------------------------
00406 int
00407 APIClient::handle_register()
00408 {
00409     APIRegistration* reg;
00410     Registration::failure_action_t action;
00411     EndpointIDPattern endpoint;
00412     std::string script;
00413     
00414     dtn_reg_info_t reginfo;
00415 
00416     memset(&reginfo, 0, sizeof(reginfo));
00417     
00418     // unpack and parse the request
00419     if (!xdr_dtn_reg_info_t(&xdr_decode_, &reginfo))
00420     {
00421         log_err("error in xdr unpacking arguments");
00422         return DTN_EXDR;
00423     }
00424 
00425     // make sure we free any dynamically-allocated bits in the
00426     // incoming structure before we exit the proc
00427     oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)&reginfo);
00428     
00429     endpoint.assign(&reginfo.endpoint);
00430 
00431     if (!endpoint.valid()) {
00432         log_err("invalid endpoint id in register: '%s'",
00433                 reginfo.endpoint.uri);
00434         return DTN_EINVAL;
00435     }
00436     
00437     switch (reginfo.failure_action) {
00438     case DTN_REG_DEFER: action = Registration::DEFER; break;
00439     case DTN_REG_DROP:  action = Registration::DROP;  break;
00440     case DTN_REG_EXEC:  action = Registration::EXEC;  break;
00441     default: {
00442         log_err("invalid failure action code 0x%x", reginfo.failure_action);
00443         return DTN_EINVAL;
00444     }
00445     }
00446 
00447     if (action == Registration::EXEC) {
00448         script.assign(reginfo.script.script_val, reginfo.script.script_len);
00449     }
00450 
00451     u_int32_t regid = GlobalStore::instance()->next_regid();
00452     reg = new APIRegistration(regid, endpoint, action,
00453                               reginfo.expiration, script);
00454 
00455     if (! reginfo.init_passive) {
00456         // XXX/demmer fixme to allow multiple registrations
00457         if (! bindings_->empty()) {
00458             log_err("error: handle already bound to a registration");
00459             return DTN_EBUSY;
00460         }
00461         
00462         // store the registration in the list for this session
00463         bindings_->push_back(reg);
00464         reg->set_active(true);
00465     }
00466     
00467     BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
00468                                 &notifier_);
00469     
00470     // fill the response with the new registration id
00471     if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
00472         log_err("internal error in xdr: xdr_dtn_reg_id_t");
00473         return DTN_EXDR;
00474     }
00475     
00476     return DTN_SUCCESS;
00477 }
00478 
00479 //----------------------------------------------------------------------
00480 int
00481 APIClient::handle_unregister()
00482 {
00483     Registration* reg;
00484     dtn_reg_id_t regid;
00485     
00486     // unpack and parse the request
00487     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid))
00488     {
00489         log_err("error in xdr unpacking arguments");
00490         return DTN_EXDR;
00491     }
00492 
00493     reg = BundleDaemon::instance()->reg_table()->get(regid);
00494     if (reg == NULL) {
00495         return DTN_ENOTFOUND;
00496     }
00497 
00498     // handle the special case in which we're unregistering a
00499     // currently bound registration, in which we actually leave it
00500     // around in the expired state, soit will be cleaned up when the
00501     // application either calls dtn_unbind() or closes the api socket
00502     if (is_bound(reg->regid()) && reg->active()) {
00503         if (reg->expired()) {
00504             return DTN_EINVAL;
00505         }
00506         
00507         reg->force_expire();
00508         ASSERT(reg->expired());
00509         return DTN_SUCCESS;
00510     }
00511 
00512     // otherwise it's an error to call unregister on a registration
00513     // that's in-use by someone else
00514     if (reg->active()) {
00515         return DTN_EBUSY;
00516     }
00517 
00518     BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
00519                                 &notifier_);
00520     
00521     return DTN_SUCCESS;
00522 }
00523 
00524 //----------------------------------------------------------------------
00525 int
00526 APIClient::handle_find_registration()
00527 {
00528     Registration* reg;
00529     EndpointIDPattern endpoint;
00530     dtn_endpoint_id_t app_eid;
00531 
00532     // unpack and parse the request
00533     if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
00534     {
00535         log_err("error in xdr unpacking arguments");
00536         return DTN_EXDR;
00537     }
00538 
00539     endpoint.assign(&app_eid);
00540     if (!endpoint.valid()) {
00541         log_err("invalid endpoint id in find_registration: '%s'",
00542                 app_eid.uri);
00543         return DTN_EINVAL;
00544     }
00545 
00546     reg = BundleDaemon::instance()->reg_table()->get(endpoint);
00547     if (reg == NULL) {
00548         return DTN_ENOTFOUND;
00549     }
00550 
00551     u_int32_t regid = reg->regid();
00552     
00553     // fill the response with the new registration id
00554     if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
00555         log_err("internal error in xdr: xdr_dtn_reg_id_t");
00556         return DTN_EXDR;
00557     }
00558     
00559     return DTN_SUCCESS;
00560 }
00561 
00562 //----------------------------------------------------------------------
00563 int
00564 APIClient::handle_bind()
00565 {
00566     dtn_reg_id_t regid;
00567 
00568     // unpack the request
00569     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
00570         log_err("error in xdr unpacking arguments");
00571         return DTN_EXDR;
00572     }
00573 
00574     // look up the registration
00575     const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00576     Registration* reg = regtable->get(regid);
00577 
00578     if (!reg) {
00579         log_err("can't find registration %d", regid);
00580         return DTN_ENOTFOUND;
00581     }
00582 
00583     APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00584     if (api_reg == NULL) {
00585         log_crit("registration %d is not an API registration!!",
00586                  regid);
00587         return DTN_ENOTFOUND;
00588     }
00589 
00590     if (api_reg->active()) {
00591         log_err("registration %d is already in active mode", regid);
00592         return DTN_EBUSY;
00593     }
00594 
00595     // XXX/demmer fixme to allow multiple registrations
00596     if (! bindings_->empty()) {
00597         log_err("error: handle already bound to a registration");
00598         return DTN_EBUSY;
00599     }
00600     
00601     // store the registration in the list for this session
00602     bindings_->push_back(api_reg);
00603     api_reg->set_active(true);
00604 
00605     log_info("DTN_BIND: bound to registration %d", reg->regid());
00606     
00607     return DTN_SUCCESS;
00608 }
00609     
00610 //----------------------------------------------------------------------
00611 int
00612 APIClient::handle_unbind()
00613 {
00614     dtn_reg_id_t regid;
00615 
00616     // unpack the request
00617     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
00618         log_err("error in xdr unpacking arguments");
00619         return DTN_EXDR;
00620     }
00621 
00622     // look up the registration
00623     const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00624     Registration* reg = regtable->get(regid);
00625 
00626     if (!reg) {
00627         log_err("can't find registration %d", regid);
00628         return DTN_ENOTFOUND;
00629     }
00630 
00631     APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00632     if (api_reg == NULL) {
00633         log_crit("registration %d is not an API registration!!",
00634                  regid);
00635         return DTN_ENOTFOUND;
00636     }
00637 
00638     APIRegistrationList::iterator iter;
00639     for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00640         if (*iter == api_reg) {
00641             bindings_->erase(iter);
00642             ASSERT(api_reg->active());
00643             api_reg->set_active(false);
00644 
00645             log_info("DTN_UNBIND: unbound from registration %d", regid);
00646             return DTN_SUCCESS;
00647         }
00648     }
00649 
00650     log_err("registration %d not bound to this api client", regid);
00651     return DTN_ENOTFOUND;
00652 }
00653     
00654 //----------------------------------------------------------------------
00655 int
00656 APIClient::handle_send()
00657 {
00658     dtn_bundle_spec_t spec;
00659     dtn_bundle_payload_t payload;
00660 
00661     memset(&spec, 0, sizeof(spec));
00662     memset(&payload, 0, sizeof(payload));
00663     
00664     /* Unpack the arguments */
00665     if (!xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
00666         !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
00667     {
00668         log_err("error in xdr unpacking arguments");
00669         return DTN_EXDR;
00670     }
00671 
00672     BundleRef b("APIClient::handle_send");
00673     b = new Bundle();
00674     
00675     // make sure any xdr calls to malloc are cleaned up
00676     oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t,
00677                            (char*)&spec);
00678     oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t,
00679                            (char*)&payload);
00680     
00681     // assign the addressing fields
00682     b->source_.assign(&spec.source);
00683     b->dest_.assign(&spec.dest);
00684     if (spec.replyto.uri[0] == '\0') {
00685         b->replyto_.assign(EndpointID::NULL_EID());
00686     } else {
00687         b->replyto_.assign(&spec.replyto);
00688     }
00689     b->custodian_.assign(EndpointID::NULL_EID());
00690      
00691     oasys::StringBuffer error;
00692     if (!b->validate(&error)) {
00693         log_err("bundle validation failed: %s", error.data());
00694         return DTN_EINVAL;
00695     }
00696     
00697     // the priority code
00698     switch (spec.priority) {
00699 #define COS(_cos) case _cos: b->priority_ = Bundle::_cos; break;
00700         COS(COS_BULK);
00701         COS(COS_NORMAL);
00702         COS(COS_EXPEDITED);
00703         COS(COS_RESERVED);
00704 #undef COS
00705     default:
00706         log_err("invalid priority level %d", (int)spec.priority);
00707         return DTN_EINVAL;
00708     };
00709 
00710     // delivery options
00711     if (spec.dopts & DOPTS_CUSTODY)
00712         b->custody_requested_ = true;
00713     
00714     if (spec.dopts & DOPTS_DELIVERY_RCPT)
00715         b->delivery_rcpt_ = true;
00716 
00717     if (spec.dopts & DOPTS_RECEIVE_RCPT)
00718         b->receive_rcpt_ = true;
00719 
00720     if (spec.dopts & DOPTS_FORWARD_RCPT)
00721         b->forward_rcpt_ = true;
00722 
00723     if (spec.dopts & DOPTS_CUSTODY_RCPT)
00724         b->custody_rcpt_ = true;
00725 
00726     if (spec.dopts & DOPTS_DELETE_RCPT)
00727         b->deletion_rcpt_ = true;
00728 
00729     // expiration time
00730     b->expiration_ = spec.expiration;
00731 
00732     for (u_int i = 0; i < spec.blocks.blocks_len; i++) {
00733         dtn_extension_block_t* block = &spec.blocks.blocks_val[i];
00734 
00735         b->api_blocks_.push_back(BlockInfo(APIBlockProcessor::instance()));
00736         BlockInfo* info = &b->api_blocks_.back();
00737         APIBlockProcessor::instance()->
00738             init_block(info, block->type, block->flags,
00739                        (u_char*)block->data.data_val,
00740                        block->data.data_len);
00741     }
00742     
00743     // set up the payload, including calculating its length, but don't
00744     // copy it in yet
00745     size_t payload_len;
00746     char filename[PATH_MAX];
00747 
00748     switch (payload.location) {
00749     case DTN_PAYLOAD_MEM:
00750         payload_len = payload.buf.buf_len;
00751         break;
00752         
00753     case DTN_PAYLOAD_FILE:
00754     case DTN_PAYLOAD_TEMP_FILE:
00755         struct stat finfo;
00756         sprintf(filename, "%.*s", 
00757                 (int)payload.filename.filename_len,
00758                 payload.filename.filename_val);
00759 
00760         if (stat(filename, &finfo) != 0)
00761         {
00762             log_err("payload file %s does not exist!", filename);
00763             return DTN_EINVAL;
00764         }
00765         
00766         payload_len = finfo.st_size;
00767         break;
00768 
00769     default:
00770         log_err("payload.location of %d unknown", payload.location);
00771         return DTN_EINVAL;
00772     }
00773     
00774     b->payload_.set_length(payload_len);
00775 
00776     // before filling in the payload, we first probe the router to
00777     // determine if there's sufficient storage for the bundle
00778     bool result;
00779     int  reason;
00780     BundleDaemon::post_and_wait(
00781         new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason),
00782         &notifier_);
00783 
00784     if (!result) {
00785         log_info("DTN_SEND bundle not accepted: reason %s",
00786                  BundleStatusReport::reason_to_str(reason));
00787 
00788         switch (reason) {
00789         case BundleProtocol::REASON_DEPLETED_STORAGE:
00790             return DTN_ENOSPACE;
00791         default:
00792             return DTN_EINTERNAL;
00793         }
00794     }
00795 
00796     switch (payload.location) {
00797     case DTN_PAYLOAD_MEM:
00798         b->payload_.set_data((u_char*)payload.buf.buf_val,
00799                              payload.buf.buf_len);
00800         break;
00801         
00802     case DTN_PAYLOAD_FILE:
00803         FILE* file;
00804         int r, left;
00805         u_char buffer[4096];
00806 
00807         if ((file = fopen(filename, "r")) == NULL)
00808         {
00809             log_err("payload file %s can't be opened!", filename);
00810             return DTN_EINVAL;
00811         }
00812         
00813         left = payload_len;
00814         r = 0;
00815         while (left > 0)
00816         {
00817             r = fread(buffer, 1, (left>4096)?4096:left, file);
00818             
00819             if (r)
00820             {
00821                 b->payload_.append_data(buffer, r);
00822                 left -= r;
00823             }
00824             else
00825             {
00826                 sleep(1); // pause before re-reading
00827             }
00828         }
00829 
00830         fclose(file);
00831         break;
00832         
00833     case DTN_PAYLOAD_TEMP_FILE:
00834         if (! b->payload_.replace_with_file(filename)) {
00835             log_err("payload file %s can't be linked or copied",
00836                     filename);
00837             return DTN_EINVAL;
00838         }
00839         
00840         if (::unlink(filename) != 0) {
00841             log_err("error unlinking payload temp file: %s",
00842                     strerror(errno));
00843             // continue on since this is non-fatal
00844         }
00845     }
00846 
00847     //  before posting the received event, fill in the bundle id struct
00848     dtn_bundle_id_t id;
00849     memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
00850     id.creation_ts.secs  = b->creation_ts_.seconds_;
00851     id.creation_ts.seqno = b->creation_ts_.seqno_;
00852     
00853     log_info("DTN_SEND bundle *%p", b.object());
00854 
00855     // deliver the bundle
00856     // Note: the bundle state may change once it has been posted
00857     BundleDaemon::post_and_wait(new BundleReceivedEvent(b.object(), EVENTSRC_APP),
00858                                 &notifier_);
00859 
00860     // return the bundle id struct
00861     if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
00862         log_err("internal error in xdr: xdr_dtn_bundle_id_t");
00863         return DTN_EXDR;
00864     }
00865     
00866     return DTN_SUCCESS;
00867 }
00868 
00869 // Size for temporary memory buffer used when delivering bundles
00870 // via files.
00871 #define DTN_FILE_DELIVERY_BUF_SIZE 1000
00872 
00873 //----------------------------------------------------------------------
00874 int
00875 APIClient::handle_recv()
00876 {
00877     dtn_bundle_spec_t             spec;
00878     dtn_bundle_payload_t          payload;
00879     dtn_bundle_payload_location_t location;
00880     dtn_bundle_status_report_t    status_report;
00881     dtn_timeval_t                 timeout;
00882     oasys::ScratchBuffer<u_char*> buf;
00883     APIRegistration*              reg = NULL;
00884     bool                          sock_ready = false;
00885     oasys::FileIOClient           tmpfile;
00886 
00887     // unpack the arguments
00888     if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
00889         (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
00890     {
00891         log_err("error in xdr unpacking arguments");
00892         return DTN_EXDR;
00893     }
00894     
00895     int err = wait_for_bundle("recv", timeout, &reg, &sock_ready);
00896     if (err != 0) {
00897         return err;
00898     }
00899     
00900     // if there's data on the socket, that either means the socket was
00901     // closed by an exiting application or the app is violating the
00902     // protocol...
00903     if (sock_ready) {
00904         log_debug("handle_recv: api socket ready -- trying to read one byte");
00905         char b;
00906         if (read(&b, 1) != 0) {
00907             log_err("handle_recv: protocol error -- "
00908                     "data arrived or error while blocked in recv");
00909             return DTN_ECOMM;
00910         }
00911 
00912         log_info("IPC socket closed while blocked in read... "
00913                  "application must have exited");
00914         return -1;
00915     }
00916     
00917     BundleRef bref("APIClient::handle_recv");
00918     bref = reg->bundle_list()->pop_front();
00919     Bundle* b = bref.object();
00920     ASSERT(b != NULL);
00921     
00922     log_debug("handle_recv: popped bundle %d for registration %d (timeout %d)",
00923               b->bundleid_, reg->regid(), timeout);
00924     
00925     memset(&spec, 0, sizeof(spec));
00926     memset(&payload, 0, sizeof(payload));
00927     memset(&status_report, 0, sizeof(status_report));
00928 
00929     // copyto will malloc string buffer space that needs to be freed
00930     // at the end of the fn
00931     b->source_.copyto(&spec.source);
00932     b->dest_.copyto(&spec.dest);
00933     b->replyto_.copyto(&spec.replyto);
00934 
00935     spec.dopts = 0;
00936     if (b->custody_requested_) spec.dopts |= DOPTS_CUSTODY;
00937     if (b->delivery_rcpt_)     spec.dopts |= DOPTS_DELIVERY_RCPT;
00938     if (b->receive_rcpt_)      spec.dopts |= DOPTS_RECEIVE_RCPT;
00939     if (b->forward_rcpt_)      spec.dopts |= DOPTS_FORWARD_RCPT;
00940     if (b->custody_rcpt_)      spec.dopts |= DOPTS_CUSTODY_RCPT;
00941     if (b->deletion_rcpt_)     spec.dopts |= DOPTS_DELETE_RCPT;
00942 
00943     spec.expiration = b->expiration_;
00944 
00945     // XXX copy extension blocks
00946     
00947     // XXX/demmer verify bundle size constraints
00948     payload.location = location;
00949     
00950     if (location == DTN_PAYLOAD_MEM) {
00951         // the app wants the payload in memory
00952         // XXX/demmer verify bounds
00953 
00954         size_t payload_len = b->payload_.length();
00955         payload.buf.buf_len = payload_len;
00956         if (payload_len != 0) {
00957             buf.reserve(payload_len);
00958             payload.buf.buf_val =
00959                 (char*)b->payload_.read_data(0, payload_len, buf.buf());
00960         } else {
00961             payload.buf.buf_val = 0;
00962         }
00963         
00964     } else if (location == DTN_PAYLOAD_FILE) {
00965         char *tdir, templ[64];
00966 
00967         // XXX/demmer do this with a hard link
00968 
00969         tdir = getenv("TMP");
00970         if (tdir == NULL) {
00971             tdir = getenv("TEMP");
00972         }
00973         if (tdir == NULL) {
00974             tdir = "/tmp";
00975         }
00976 
00977         snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
00978 
00979         if (tmpfile.mkstemp(templ) == -1) {
00980             log_err("can't open temporary file to deliver bundle");
00981             return DTN_EINTERNAL;
00982         }
00983         
00984         if (b->payload_.location() == BundlePayload::MEMORY) {
00985             tmpfile.writeall((char*)b->payload_.memory_data(),
00986                              b->payload_.length());
00987             
00988         } else {
00989             b->payload_.copy_file(&tmpfile);
00990         }
00991 
00992         payload.filename.filename_val = (char*)tmpfile.path();
00993         payload.filename.filename_len = tmpfile.path_len() + 1;
00994         tmpfile.close();
00995         
00996     } else {
00997         log_err("payload location %d not understood", location);
00998         return DTN_EINVAL;
00999     }
01000     
01001     /*
01002      * If the bundle is a status report, parse it and copy out the
01003      * data into the status report.
01004      */
01005     BundleStatusReport::data_t sr_data;
01006     if (BundleStatusReport::parse_status_report(&sr_data, b))
01007     {
01008         payload.status_report = &status_report;
01009         sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source);
01010         status_report.bundle_id.creation_ts.secs =
01011             sr_data.orig_creation_tv_.seconds_;
01012         status_report.bundle_id.creation_ts.seqno =
01013             sr_data.orig_creation_tv_.seqno_;
01014         status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_;
01015         status_report.bundle_id.orig_length = sr_data.orig_frag_length_;
01016 
01017         status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_;
01018         status_report.flags =  (dtn_status_report_flags_t)sr_data.status_flags_;
01019     }
01020     
01021     if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
01022     {
01023         log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
01024         return DTN_EXDR;
01025     }
01026     
01027     if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
01028     {
01029         log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
01030         return DTN_EXDR;
01031     }
01032 
01033     // prevent xdr_free of non-malloc'd pointer
01034     payload.status_report = NULL;
01035     
01036     log_info("DTN_RECV: "
01037              "successfully delivered bundle %d to registration %d",
01038              b->bundleid_, reg->regid());
01039     
01040     BundleDaemon::post(new BundleDeliveredEvent(b, reg));
01041 
01042     return DTN_SUCCESS;
01043 }
01044 
01045 //----------------------------------------------------------------------
01046 int
01047 APIClient::handle_begin_poll()
01048 {
01049     dtn_timeval_t    timeout;
01050     APIRegistration* reg = NULL;
01051     bool             sock_ready = false;
01052     
01053     // unpack the arguments
01054     if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
01055     {
01056         log_err("error in xdr unpacking arguments");
01057         return DTN_EXDR;
01058     }
01059 
01060     int err = wait_for_bundle("poll", timeout, &reg, &sock_ready);
01061     if (err != 0) {
01062         return err;
01063     }
01064 
01065     // if there's data on the socket, then the application either quit
01066     // and closed the socket, or called dtn_poll_cancel
01067     if (sock_ready) {
01068         log_debug("handle_begin_poll: "
01069                   "api socket ready -- trying to read one byte");
01070         char type;
01071         
01072         int ret = read(&type, 1);
01073         if (ret == 0) {
01074             log_info("IPC socket closed while blocked in read... "
01075                      "application must have exited");
01076             return -1;
01077         }
01078 
01079         if (ret == -1) {
01080             log_err("handle_begin_poll: protocol error -- "
01081                     "error while blocked in poll");
01082             return DTN_ECOMM;
01083         }
01084 
01085         if (type != DTN_CANCEL_POLL) {
01086             log_err("handle_poll: error got unexpected message '%s' "
01087                     "while blocked in poll", dtnipc_msgtoa(type));
01088             return DTN_ECOMM;
01089         }
01090 
01091         // read in the length which must be zero
01092         u_int32_t len;
01093         ret = read((char*)&len, 4);
01094         if (ret != 4 || len != 0) {
01095             log_err("handle_begin_poll: protocol error -- "
01096                     "error getting cancel poll length");
01097             return DTN_ECOMM;
01098         }
01099 
01100         // immediately send the response to the poll cancel, then
01101         // we return from the handler which will follow it with the
01102         // response code to the original poll request
01103         send_response(DTN_SUCCESS);
01104     }
01105     
01106     return DTN_SUCCESS;
01107 }
01108 
01109 //----------------------------------------------------------------------
01110 int
01111 APIClient::handle_cancel_poll()
01112 {
01113     // the only reason we should get in here is if the call to
01114     // dtn_begin_poll() returned but the app still called cancel_poll
01115     // and so the messages crossed. but, since there's nothing wrong
01116     // with this, we just return success in both cases
01117     
01118     return DTN_SUCCESS;
01119 }
01120         
01121 //----------------------------------------------------------------------
01122 int
01123 APIClient::wait_for_bundle(const char* operation, dtn_timeval_t dtn_timeout,
01124                            APIRegistration** regp, bool* sock_ready)
01125 {
01126     APIRegistration* reg;
01127     
01128     // XXX/demmer implement this for multiple registrations by
01129     // building up a poll vector here. for now we assert in bind that
01130     // there's only one binding.
01131     
01132     if (bindings_->empty()) {
01133         log_err("wait_for_bundle(%s): no bound registration", 
01134                 operation);
01135         return DTN_EINVAL;
01136     }
01137     
01138     reg = bindings_->front();
01139 
01140     // short-circuit the poll
01141     if (reg->bundle_list()->size() != 0) {
01142         log_debug("wait_for_bundle(%s): "
01143                   "immediately returning bundle for reg %d",
01144                   operation, reg->regid());
01145         *regp = reg;
01146         return 0;
01147     }
01148 
01149     int timeout = (int)dtn_timeout;
01150     if (timeout < -1) {
01151         log_err("wait_for_bundle(%s): "
01152                 "invalid timeout value %d", operation, timeout);
01153         return DTN_EINVAL;
01154     }
01155 
01156     struct pollfd pollfds[2];
01157 
01158     struct pollfd* bundle_poll = &pollfds[0];
01159     bundle_poll->fd            = reg->bundle_list()->notifier()->read_fd();
01160     bundle_poll->events        = POLLIN;
01161     bundle_poll->revents       = 0;
01162     
01163     struct pollfd* sock_poll   = &pollfds[1];
01164     sock_poll->fd              = TCPClient::fd_;
01165     sock_poll->events          = POLLIN | POLLERR;
01166     sock_poll->revents         = 0;
01167 
01168     log_debug("wait_for_bundle(%s): "
01169               "blocking to get bundle for registration %d (timeout %d)",
01170               operation, reg->regid(), timeout);
01171     int nready = oasys::IO::poll_multiple(&pollfds[0], 2, timeout,
01172                                           NULL, logpath_);
01173 
01174     if (nready == oasys::IOTIMEOUT) {
01175         log_debug("wait_for_bundle(%s): timeout waiting for bundle",
01176                   operation);
01177         return DTN_ETIMEOUT;
01178 
01179     } else if (nready <= 0) {
01180         log_err("wait_for_bundle(%s): unexpected error polling for bundle",
01181                 operation);
01182         return DTN_EINTERNAL;
01183     }
01184 
01185     ASSERT(nready == 1);
01186     
01187     // if there's data on the socket, that either means the socket was
01188     // closed by an exiting application or the app is violating the
01189     // protocol...
01190     if (sock_poll->revents != 0) {
01191         *sock_ready = true;
01192         return 0;
01193     }
01194 
01195     // otherwise, there should be data on the bundle list
01196     if (bundle_poll->revents == 0) {
01197         log_crit("wait_for_bundle(%s): unexpected error polling for bundle: "
01198                  "neither file descriptor is ready", operation);
01199         return DTN_EINTERNAL;
01200     }
01201 
01202     if (reg->bundle_list()->size() == 0) {
01203         log_err("wait_for_bundle(%s): "
01204                 "bundle list returned ready but no bundle on queue!!",
01205                 operation);
01206         return DTN_EINTERNAL;
01207     }
01208 
01209     *regp = reg;
01210     return 0;
01211 }
01212 
01213 //----------------------------------------------------------------------
01214 int
01215 APIClient::handle_close()
01216 {
01217     log_info("received DTN_CLOSE message; closing API handle");
01218     // return -1 to force the session to close:
01219     return -1;
01220 }
01221 
01222 } // namespace dtn

Generated on Sat Sep 8 08:43:23 2007 for DTN Reference Implementation by  doxygen 1.5.3