00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <algorithm>
00022
00023 #include <sys/types.h>
00024 #include <sys/stat.h>
00025 #include <oasys/compat/inet_aton.h>
00026 #include <oasys/compat/rpc.h>
00027 #include <oasys/io/FileIOClient.h>
00028 #include <oasys/io/NetUtils.h>
00029 #include <oasys/util/Pointers.h>
00030 #include <oasys/util/ScratchBuffer.h>
00031 #include <oasys/util/XDRUtils.h>
00032
00033 #include "APIServer.h"
00034 #include "bundling/APIBlockProcessor.h"
00035 #include "bundling/Bundle.h"
00036 #include "bundling/BundleEvent.h"
00037 #include "bundling/BundleDaemon.h"
00038 #include "bundling/BundleStatusReport.h"
00039 #include "bundling/SDNV.h"
00040 #include "bundling/GbofId.h"
00041 #include "naming/EndpointID.h"
00042 #include "cmd/APICommand.h"
00043 #include "reg/APIRegistration.h"
00044 #include "reg/RegistrationTable.h"
00045 #include "routing/BundleRouter.h"
00046 #include "storage/GlobalStore.h"
00047 #include "session/Session.h"
00048
00049 #ifndef MIN
00050 #define MIN(x, y) ((x)<(y) ? (x) : (y))
00051 #endif
00052
00053 namespace dtn {
00054
00055
00056 APIServer::APIServer()
00057
00058 : TCPServerThread("APIServer", "/dtn/apiserver", 0)
00059 {
00060 enabled_ = true;
00061 local_addr_ = htonl(INADDR_LOOPBACK);
00062 local_port_ = DTN_IPC_PORT;
00063
00064
00065 char *env;
00066 if ((env = getenv("DTNAPI_ADDR")) != NULL) {
00067 if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
00068 {
00069 log_err("DTNAPI_ADDR environment variable (%s) "
00070 "not a valid ip address, using default of localhost",
00071 env);
00072
00073 local_addr_ = htonl(INADDR_LOOPBACK);
00074 } else {
00075 log_debug("local address set to %s by DTNAPI_ADDR "
00076 "environment variable", env);
00077 }
00078 }
00079
00080 if ((env = getenv("DTNAPI_PORT")) != NULL) {
00081 char *end;
00082 u_int port = strtoul(env, &end, 10);
00083 if (*end != '\0' || port > 0xffff)
00084 {
00085 log_err("DTNAPI_PORT environment variable (%s) "
00086 "not a valid ip port, using default of %d",
00087 env, DTN_IPC_PORT);
00088 port = DTN_IPC_PORT;
00089 } else {
00090 log_debug("api port set to %s by DTNAPI_PORT "
00091 "environment variable", env);
00092 }
00093 local_port_ = (u_int16_t)port;
00094 }
00095
00096 if (local_addr_ != INADDR_ANY || local_port_ != 0) {
00097 log_debug("APIServer init (evironment set addr %s port %d)",
00098 intoa(local_addr_), local_port_);
00099 } else {
00100 log_debug("APIServer init");
00101 }
00102
00103 oasys::TclCommandInterp::instance()->reg(new APICommand(this));
00104 }
00105
00106
00107 void
00108 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
00109 {
00110 APIClient* c = new APIClient(fd, addr, port, this);
00111 register_client(c);
00112 c->start();
00113 }
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 void
00132 APIServer::shutdown_hook()
00133 {
00134
00135 std::list<APIClient *>::iterator ci;
00136 client_list_lock.lock("APIServer::shutdown");
00137 for (ci = client_list.begin(); ci != client_list.end(); ++ci) {
00138 (*ci)->set_should_stop();
00139 }
00140 client_list_lock.unlock();
00141
00142 #define MAX_SPIN_TIME (5 * 1000000) // max sleep in usec
00143 #define EACH_SPIN_TIME 10000 // sleep 10ms each time
00144
00145
00146
00147
00148
00149
00150
00151 int count = 0;
00152 while (count++ < (MAX_SPIN_TIME / EACH_SPIN_TIME)) {
00153 client_list_lock.lock("APIServer::shutdown");
00154 bool empty = client_list.empty();
00155 client_list_lock.unlock();
00156 if (!empty)
00157 usleep(EACH_SPIN_TIME);
00158 else
00159 break;
00160 }
00161 return;
00162 }
00163
00164
00165
00166
00167
00168
00169 void
00170 APIServer::register_client(APIClient *c)
00171 {
00172 oasys::ScopeLock l(&client_list_lock, "APIServer::register_client");
00173 client_list.push_front(c);
00174 }
00175
00176 void
00177 APIServer::unregister_client(APIClient *c)
00178 {
00179
00180 oasys::ScopeLock l(&client_list_lock, "APIServer::unregister_client");
00181 client_list.remove(c);
00182 }
00183
00184
00185 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port, APIServer *parent)
00186 : Thread("APIClient", DELETE_ON_EXIT),
00187 TCPClient(fd, addr, port, "/dtn/apiclient"),
00188 notifier_(logpath_),
00189 parent_(parent),
00190 total_sent_(0),
00191 total_rcvd_(0)
00192 {
00193
00194 xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
00195 xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
00196
00197 bindings_ = new APIRegistrationList();
00198 sessions_ = new APIRegistrationList();
00199 }
00200
00201
00202 APIClient::~APIClient()
00203 {
00204 log_debug("client destroyed");
00205 delete_z(bindings_);
00206 delete_z(sessions_);
00207 }
00208
00209
00210 void
00211 APIClient::close_client()
00212 {
00213 TCPClient::close();
00214
00215 APIRegistration* reg;
00216 while (! bindings_->empty()) {
00217 reg = bindings_->front();
00218 bindings_->pop_front();
00219
00220 reg->set_active(false);
00221
00222 if (reg->expired()) {
00223 log_debug("removing expired registration %d", reg->regid());
00224 BundleDaemon::post(new RegistrationExpiredEvent(reg));
00225 }
00226 }
00227
00228
00229 sessions_->clear();
00230
00231 parent_->unregister_client(this);
00232 }
00233
00234
00235 int
00236 APIClient::handle_handshake()
00237 {
00238 u_int32_t handshake;
00239 u_int16_t message_type, ipc_version;
00240
00241 int ret = readall((char*)&handshake, sizeof(handshake));
00242 if (ret != sizeof(handshake)) {
00243 log_err("error reading handshake: (got %d/%zu), \"error\" %s",
00244 ret, sizeof(handshake), strerror(errno));
00245 return -1;
00246 }
00247
00248 total_rcvd_ += ret;
00249
00250 message_type = ntohl(handshake) >> 16;
00251 ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
00252
00253 if (message_type != DTN_OPEN) {
00254 log_err("handshake (0x%x)'s message type %d != DTN_OPEN (%d)",
00255 handshake, message_type, DTN_OPEN);
00256 return -1;
00257 }
00258
00259
00260
00261
00262 handshake = htonl(DTN_OPEN << 16 | DTN_IPC_VERSION);
00263
00264 ret = writeall((char*)&handshake, sizeof(handshake));
00265 if (ret != sizeof(handshake)) {
00266 log_err("error writing handshake: %s", strerror(errno));
00267 return -1;
00268 }
00269
00270 total_sent_ += ret;
00271
00272 if (ipc_version != DTN_IPC_VERSION) {
00273 log_err("handshake (0x%x)'s version %d != DTN_IPC_VERSION (%d)",
00274 handshake, ipc_version, DTN_IPC_VERSION);
00275 return -1;
00276 }
00277
00278 return 0;
00279 }
00280
00281
00282 void
00283 APIClient::run()
00284 {
00285 int ret;
00286 u_int8_t type;
00287 u_int32_t len;
00288
00289 log_info("new session %s:%d -> %s:%d",
00290 intoa(local_addr()), local_port(),
00291 intoa(remote_addr()), remote_port());
00292
00293 if (handle_handshake() != 0) {
00294 close_client();
00295 return;
00296 }
00297
00298 while (true) {
00299
00300
00301 if (should_stop()) {
00302 close_client();
00303 return;
00304 }
00305
00306 xdr_setpos(&xdr_encode_, 0);
00307 xdr_setpos(&xdr_decode_, 0);
00308
00309
00310
00311
00312
00313 log_debug("waiting for next message... total sent/rcvd: %zu/%zu",
00314 total_sent_, total_rcvd_);
00315
00316 ret = read(&buf_[3], 5);
00317 if (ret <= 0) {
00318 log_warn("client disconnected without calling dtn_close");
00319 close_client();
00320 return;
00321 }
00322 total_rcvd_ += ret;
00323
00324 if (ret < 5) {
00325 log_err("ack!! can't handle really short read...");
00326 close_client();
00327 return;
00328 }
00329
00330
00331
00332
00333
00334 type = buf_[3];
00335 memcpy(&len, &buf_[4], sizeof(len));
00336
00337 len = ntohl(len);
00338
00339 ret -= 5;
00340 log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
00341
00342
00343
00344 if (ret < (int)len) {
00345 int toget = len - ret;
00346 log_debug("reading remainder of message... total sent/rcvd: %zu/%zu",
00347 total_sent_, total_rcvd_);
00348 if (readall(&buf_[8 + ret], toget) != toget) {
00349 log_err("error reading message remainder: %s",
00350 strerror(errno));
00351 close_client();
00352 return;
00353 }
00354 total_rcvd_ += toget;
00355 }
00356
00357
00358
00359 if (should_stop()) {
00360 close_client();
00361 return;
00362 }
00363
00364
00365 switch(type) {
00366 #define DISPATCH(_type, _fn) \
00367 case _type: \
00368 ret = _fn(); \
00369 break;
00370
00371 DISPATCH(DTN_LOCAL_EID, handle_local_eid);
00372 DISPATCH(DTN_REGISTER, handle_register);
00373 DISPATCH(DTN_UNREGISTER, handle_unregister);
00374 DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
00375 DISPATCH(DTN_SEND, handle_send);
00376 DISPATCH(DTN_CANCEL, handle_cancel);
00377 DISPATCH(DTN_BIND, handle_bind);
00378 DISPATCH(DTN_UNBIND, handle_unbind);
00379 DISPATCH(DTN_RECV, handle_recv);
00380 DISPATCH(DTN_BEGIN_POLL, handle_begin_poll);
00381 DISPATCH(DTN_CANCEL_POLL, handle_cancel_poll);
00382 DISPATCH(DTN_CLOSE, handle_close);
00383 DISPATCH(DTN_SESSION_UPDATE, handle_session_update);
00384 #undef DISPATCH
00385
00386 default:
00387 log_err("unknown message type code 0x%x", type);
00388 ret = DTN_EMSGTYPE;
00389 break;
00390 }
00391
00392
00393
00394 if (ret == -1) {
00395 close_client();
00396 return;
00397 }
00398
00399
00400 if (send_response(ret) != 0) {
00401 return;
00402 }
00403
00404
00405
00406
00407 if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
00408 close_client();
00409 return;
00410 }
00411
00412 }
00413 }
00414
00415
00416 int
00417 APIClient::send_response(int ret)
00418 {
00419 u_int32_t len, msglen;
00420
00421
00422
00423 ASSERT(ret == DTN_SUCCESS ||
00424 (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
00425
00426
00427
00428
00429 len = xdr_getpos(&xdr_encode_);
00430 log_debug("building reply: status %s, length %d",
00431 dtn_strerror(ret), len);
00432
00433 msglen = len + 8;
00434 ret = ntohl(ret);
00435 len = htonl(len);
00436
00437 memcpy(buf_, &ret, sizeof(ret));
00438 memcpy(&buf_[4], &len, sizeof(len));
00439
00440 log_debug("sending %d byte reply message... total sent/rcvd: %zu/%zu",
00441 msglen, total_sent_, total_rcvd_);
00442
00443 if (writeall(buf_, msglen) != (int)msglen) {
00444 log_err("error sending reply: %s", strerror(errno));
00445 close_client();
00446 return -1;
00447 }
00448
00449 total_sent_ += msglen;
00450
00451 return 0;
00452 }
00453
00454
00455 bool
00456 APIClient::is_bound(u_int32_t regid)
00457 {
00458 APIRegistrationList::iterator iter;
00459 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00460 if ((*iter)->regid() == regid) {
00461 return true;
00462 }
00463 }
00464
00465 return false;
00466 }
00467
00468
00469 int
00470 APIClient::handle_local_eid()
00471 {
00472 dtn_service_tag_t service_tag;
00473 dtn_endpoint_id_t local_eid;
00474
00475
00476 if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
00477 {
00478 log_err("error in xdr unpacking arguments");
00479 return DTN_EXDR;
00480 }
00481
00482
00483 EndpointID eid(BundleDaemon::instance()->local_eid());
00484 if (eid.append_service_tag(service_tag.tag) == false) {
00485 log_err("error appending service tag");
00486 return DTN_EINVAL;
00487 }
00488
00489 memset(&local_eid, 0, sizeof(local_eid));
00490 eid.copyto(&local_eid);
00491
00492
00493 if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
00494 log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
00495 return DTN_EXDR;
00496 }
00497
00498 log_debug("get_local_eid encoded %d byte response",
00499 xdr_getpos(&xdr_encode_));
00500
00501 return DTN_SUCCESS;
00502 }
00503
00504
00505 int
00506 APIClient::handle_register()
00507 {
00508 APIRegistration* reg;
00509 Registration::failure_action_t action;
00510 EndpointIDPattern endpoint;
00511 std::string script;
00512
00513 dtn_reg_info_t reginfo;
00514
00515 memset(®info, 0, sizeof(reginfo));
00516
00517
00518 if (!xdr_dtn_reg_info_t(&xdr_decode_, ®info))
00519 {
00520 log_err("error in xdr unpacking arguments");
00521 return DTN_EXDR;
00522 }
00523
00524
00525
00526 oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)®info);
00527
00528 endpoint.assign(®info.endpoint);
00529
00530 if (!endpoint.valid()) {
00531 log_err("invalid endpoint id in register: '%s'",
00532 reginfo.endpoint.uri);
00533 return DTN_EINVAL;
00534 }
00535
00536
00537
00538
00539
00540 u_int failure_action = reginfo.flags & 0x3;
00541 switch (failure_action) {
00542 case DTN_REG_DEFER: action = Registration::DEFER; break;
00543 case DTN_REG_DROP: action = Registration::DROP; break;
00544 case DTN_REG_EXEC: action = Registration::EXEC; break;
00545 default: {
00546 log_err("invalid registration flags 0x%x", reginfo.flags);
00547 return DTN_EINVAL;
00548 }
00549 }
00550
00551
00552 u_int32_t session_flags = 0;
00553 if (reginfo.flags & DTN_SESSION_CUSTODY) {
00554 session_flags |= Session::CUSTODY;
00555 }
00556 if (reginfo.flags & DTN_SESSION_SUBSCRIBE) {
00557 session_flags |= Session::SUBSCRIBE;
00558 }
00559 if (reginfo.flags & DTN_SESSION_PUBLISH) {
00560 session_flags |= Session::PUBLISH;
00561 }
00562
00563 u_int other_flags = reginfo.flags & ~0x1f;
00564 if (other_flags != 0) {
00565 log_err("invalid registration flags 0x%x", reginfo.flags);
00566 return DTN_EINVAL;
00567 }
00568
00569 if (action == Registration::EXEC) {
00570 script.assign(reginfo.script.script_val, reginfo.script.script_len);
00571 }
00572
00573 u_int32_t regid = GlobalStore::instance()->next_regid();
00574 reg = new APIRegistration(regid, endpoint, action, session_flags,
00575 reginfo.expiration, script);
00576
00577 if (! reginfo.init_passive) {
00578
00579 bindings_->push_back(reg);
00580 reg->set_active(true);
00581 }
00582
00583 if (session_flags & Session::CUSTODY) {
00584 sessions_->push_back(reg);
00585 ASSERT(reg->session_notify_list() != NULL);
00586 }
00587
00588 BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
00589 ¬ifier_);
00590
00591
00592 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) {
00593 log_err("internal error in xdr: xdr_dtn_reg_id_t");
00594 return DTN_EXDR;
00595 }
00596
00597 return DTN_SUCCESS;
00598 }
00599
00600
00601 int
00602 APIClient::handle_unregister()
00603 {
00604 Registration* reg;
00605 dtn_reg_id_t regid;
00606
00607
00608 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id))
00609 {
00610 log_err("error in xdr unpacking arguments");
00611 return DTN_EXDR;
00612 }
00613
00614 reg = BundleDaemon::instance()->reg_table()->get(regid);
00615 if (reg == NULL) {
00616 return DTN_ENOTFOUND;
00617 }
00618
00619
00620
00621
00622
00623 if (is_bound(reg->regid()) && reg->active()) {
00624 if (reg->expired()) {
00625 return DTN_EINVAL;
00626 }
00627
00628 reg->force_expire();
00629 ASSERT(reg->expired());
00630 return DTN_SUCCESS;
00631 }
00632
00633
00634
00635 if (reg->active()) {
00636 return DTN_EBUSY;
00637 }
00638
00639 BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
00640 ¬ifier_);
00641
00642 return DTN_SUCCESS;
00643 }
00644
00645
00646 int
00647 APIClient::handle_find_registration()
00648 {
00649 Registration* reg;
00650 EndpointIDPattern endpoint;
00651 dtn_endpoint_id_t app_eid;
00652
00653
00654 if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
00655 {
00656 log_err("error in xdr unpacking arguments");
00657 return DTN_EXDR;
00658 }
00659
00660 endpoint.assign(&app_eid);
00661 if (!endpoint.valid()) {
00662 log_err("invalid endpoint id in find_registration: '%s'",
00663 app_eid.uri);
00664 return DTN_EINVAL;
00665 }
00666
00667 reg = BundleDaemon::instance()->reg_table()->get(endpoint);
00668 if (reg == NULL) {
00669 return DTN_ENOTFOUND;
00670 }
00671
00672 u_int32_t regid = reg->regid();
00673
00674
00675 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) {
00676 log_err("internal error in xdr: xdr_dtn_reg_id_t");
00677 return DTN_EXDR;
00678 }
00679
00680 return DTN_SUCCESS;
00681 }
00682
00683
00684 int
00685 APIClient::handle_bind()
00686 {
00687 dtn_reg_id_t regid;
00688
00689
00690 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) {
00691 log_err("error in xdr unpacking arguments");
00692 return DTN_EXDR;
00693 }
00694
00695
00696 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00697 Registration* reg = regtable->get(regid);
00698
00699 if (!reg) {
00700 log_err("can't find registration %d", regid);
00701 return DTN_ENOTFOUND;
00702 }
00703
00704 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00705 if (api_reg == NULL) {
00706 log_crit("registration %d is not an API registration!!",
00707 regid);
00708 return DTN_ENOTFOUND;
00709 }
00710
00711 if (api_reg->active()) {
00712 log_err("registration %d is already in active mode", regid);
00713 return DTN_EBUSY;
00714 }
00715
00716
00717 bindings_->push_back(api_reg);
00718 api_reg->set_active(true);
00719
00720 log_info("DTN_BIND: bound to registration %d", reg->regid());
00721
00722 return DTN_SUCCESS;
00723 }
00724
00725
00726 int
00727 APIClient::handle_unbind()
00728 {
00729 dtn_reg_id_t regid;
00730
00731
00732 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) {
00733 log_err("error in xdr unpacking arguments");
00734 return DTN_EXDR;
00735 }
00736
00737
00738 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00739 Registration* reg = regtable->get(regid);
00740
00741 if (!reg) {
00742 log_err("can't find registration %d", regid);
00743 return DTN_ENOTFOUND;
00744 }
00745
00746 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00747 if (api_reg == NULL) {
00748 log_crit("registration %d is not an API registration!!",
00749 regid);
00750 return DTN_ENOTFOUND;
00751 }
00752
00753 APIRegistrationList::iterator iter;
00754 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00755 if (*iter == api_reg) {
00756 bindings_->erase(iter);
00757 ASSERT(api_reg->active());
00758 api_reg->set_active(false);
00759
00760 if (reg->expired()) {
00761 log_debug("removing expired registration %d", reg->regid());
00762 BundleDaemon::post(new RegistrationExpiredEvent(reg));
00763 }
00764
00765 log_info("DTN_UNBIND: unbound from registration %d", regid);
00766 return DTN_SUCCESS;
00767 }
00768 }
00769
00770 log_err("registration %d not bound to this api client", regid);
00771 return DTN_ENOTFOUND;
00772 }
00773
00774
00775 int
00776 APIClient::handle_send()
00777 {
00778 dtn_reg_id_t regid;
00779 dtn_bundle_spec_t spec;
00780 dtn_bundle_payload_t payload;
00781
00782 memset(&spec, 0, sizeof(spec));
00783 memset(&payload, 0, sizeof(payload));
00784
00785
00786 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id) ||
00787 !xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
00788 !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
00789 {
00790 log_err("error in xdr unpacking arguments");
00791 return DTN_EXDR;
00792 }
00793
00794 BundleRef b("APIClient::handle_send");
00795 b = new Bundle();
00796
00797
00798 oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t,
00799 (char*)&spec);
00800 oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t,
00801 (char*)&payload);
00802
00803
00804
00805
00806 b->mutable_source()->assign(&spec.source);
00807 b->mutable_dest()->assign(&spec.dest);
00808
00809
00810 if (spec.replyto.uri[0] == '\0') {
00811 b->mutable_replyto()->assign(EndpointID::NULL_EID());
00812 } else {
00813 b->mutable_replyto()->assign(&spec.replyto);
00814 }
00815
00816
00817 b->mutable_custodian()->assign(EndpointID::NULL_EID());
00818
00819
00820
00821
00822
00823 if (spec.dopts & DOPTS_SINGLETON_DEST)
00824 {
00825 b->set_singleton_dest(true);
00826 }
00827 else if (spec.dopts & DOPTS_MULTINODE_DEST)
00828 {
00829 b->set_singleton_dest(false);
00830 }
00831 else
00832 {
00833 EndpointID::singleton_info_t info;
00834
00835 if (b->dest().known_scheme()) {
00836 info = b->dest().is_singleton();
00837
00838
00839 ASSERT(info != EndpointID::UNKNOWN);
00840 } else {
00841 info = EndpointID::is_singleton_default_;
00842 }
00843
00844 switch (info) {
00845 case EndpointID::UNKNOWN:
00846 log_err("bundle destination %s in unknown scheme and "
00847 "app did not assert singleton/multipoint",
00848 b->dest().c_str());
00849 return DTN_EINVAL;
00850
00851 case EndpointID::SINGLETON:
00852 b->set_singleton_dest(true);
00853 break;
00854
00855 case EndpointID::MULTINODE:
00856 b->set_singleton_dest(false);
00857 break;
00858 }
00859 }
00860
00861
00862 switch (spec.priority) {
00863 #define COS(_cos) case _cos: b->set_priority(Bundle::_cos); break;
00864 COS(COS_BULK);
00865 COS(COS_NORMAL);
00866 COS(COS_EXPEDITED);
00867 COS(COS_RESERVED);
00868 #undef COS
00869 default:
00870 log_err("invalid priority level %d", (int)spec.priority);
00871 return DTN_EINVAL;
00872 };
00873
00874
00875
00876 const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
00877 RegistrationList unused;
00878 if (b->source() == EndpointID::NULL_EID())
00879 {
00880
00881
00882 if (spec.dopts) {
00883 log_err("bundle with null source EID requested reports and/or "
00884 "custody transfer");
00885 return DTN_EINVAL;
00886 }
00887
00888 b->set_do_not_fragment(true);
00889 }
00890 else if (reg_table->get_matching(b->source(), &unused) != 0)
00891 {
00892
00893 }
00894 else if (b->source().subsume(BundleDaemon::instance()->local_eid()))
00895 {
00896
00897 }
00898 else
00899 {
00900 log_err("this node is not a member of the bundle's source EID (%s)",
00901 b->source().str().c_str());
00902 return DTN_EINVAL;
00903 }
00904
00905
00906
00907 Registration* reg = reg_table->get(regid);
00908 if (reg && reg->session_flags() != 0) {
00909 b->mutable_session_eid()->assign(reg->endpoint().str());
00910 }
00911
00912
00913 if (spec.dopts & DOPTS_CUSTODY)
00914 b->set_custody_requested(true);
00915
00916 if (spec.dopts & DOPTS_DELIVERY_RCPT)
00917 b->set_delivery_rcpt(true);
00918
00919 if (spec.dopts & DOPTS_RECEIVE_RCPT)
00920 b->set_receive_rcpt(true);
00921
00922 if (spec.dopts & DOPTS_FORWARD_RCPT)
00923 b->set_forward_rcpt(true);
00924
00925 if (spec.dopts & DOPTS_CUSTODY_RCPT)
00926 b->set_custody_rcpt(true);
00927
00928 if (spec.dopts & DOPTS_DELETE_RCPT)
00929 b->set_deletion_rcpt(true);
00930
00931 if (spec.dopts & DOPTS_DO_NOT_FRAGMENT)
00932 b->set_do_not_fragment(true);
00933
00934
00935 b->set_expiration(spec.expiration);
00936
00937
00938 if (spec.sequence_id.data.data_len != 0)
00939 {
00940 std::string str(spec.sequence_id.data.data_val,
00941 spec.sequence_id.data.data_len);
00942
00943 bool ok = b->mutable_sequence_id()->parse(str);
00944 if (! ok) {
00945 log_err("invalid sequence id '%s'", str.c_str());
00946 return DTN_EINVAL;
00947 }
00948 }
00949
00950 if (spec.obsoletes_id.data.data_len != 0)
00951 {
00952 std::string str(spec.obsoletes_id.data.data_val,
00953 spec.obsoletes_id.data.data_len);
00954
00955 bool ok = b->mutable_obsoletes_id()->parse(str);
00956 if (! ok) {
00957 log_err("invalid obsoletes id '%s'", str.c_str());
00958 return DTN_EINVAL;
00959 }
00960 }
00961
00962
00963 for (u_int i = 0; i < spec.blocks.blocks_len; i++) {
00964 dtn_extension_block_t* block = &spec.blocks.blocks_val[i];
00965
00966 BlockInfo* info =
00967 b->api_blocks()->append_block(APIBlockProcessor::instance());
00968 APIBlockProcessor::instance()->
00969 init_block(info, b->api_blocks(),
00970 block->type, block->flags,
00971 (u_char*)block->data.data_val,
00972 block->data.data_len);
00973 }
00974
00975
00976 for (unsigned int i = 0; i < spec.metadata.metadata_len; ++i) {
00977 dtn_extension_block_t* block = &spec.metadata.metadata_val[i];
00978
00979 LinkRef null_link("APIServer::handle_send");
00980 MetadataVec * vec = b->generated_metadata().find_blocks(null_link);
00981 if (vec == NULL) {
00982 vec = b->mutable_generated_metadata()->create_blocks(null_link);
00983 }
00984 ASSERT(vec != NULL);
00985
00986 MetadataBlock * meta_block = new MetadataBlock(
00987 (u_int64_t)block->type,
00988 (u_char *)block->data.data_val,
00989 (u_int32_t)block->data.data_len);
00990 meta_block->set_flags((u_int64_t)block->flags);
00991
00992
00993
00994
00995
00996
00997 vec->push_back(meta_block);
00998 b->mutable_recv_metadata()->push_back(meta_block);
00999 }
01000
01001
01002 oasys::StringBuffer error;
01003 if (!b->validate(&error)) {
01004 log_err("bundle validation failed: %s", error.data());
01005 return DTN_EINVAL;
01006 }
01007
01008
01009
01010 size_t payload_len;
01011 char filename[PATH_MAX];
01012
01013 switch (payload.location) {
01014 case DTN_PAYLOAD_MEM:
01015 payload_len = payload.buf.buf_len;
01016 break;
01017
01018 case DTN_PAYLOAD_FILE:
01019 case DTN_PAYLOAD_TEMP_FILE:
01020 struct stat finfo;
01021 sprintf(filename, "%.*s",
01022 (int)payload.filename.filename_len,
01023 payload.filename.filename_val);
01024
01025 if (stat(filename, &finfo) != 0)
01026 {
01027 log_err("payload file %s does not exist!", filename);
01028 return DTN_EINVAL;
01029 }
01030
01031 payload_len = finfo.st_size;
01032 break;
01033
01034 default:
01035 log_err("payload.location of %d unknown", payload.location);
01036 return DTN_EINVAL;
01037 }
01038
01039 b->mutable_payload()->set_length(payload_len);
01040
01041
01042
01043 bool result;
01044 int reason;
01045 BundleDaemon::post_and_wait(
01046 new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason),
01047 ¬ifier_);
01048
01049 if (!result) {
01050 log_info("DTN_SEND bundle not accepted: reason %s",
01051 BundleStatusReport::reason_to_str(reason));
01052
01053 switch (reason) {
01054 case BundleProtocol::REASON_DEPLETED_STORAGE:
01055 return DTN_ENOSPACE;
01056 default:
01057 return DTN_EINTERNAL;
01058 }
01059 }
01060
01061 switch (payload.location) {
01062 case DTN_PAYLOAD_MEM:
01063 b->mutable_payload()->set_data((u_char*)payload.buf.buf_val,
01064 payload.buf.buf_len);
01065 break;
01066
01067 case DTN_PAYLOAD_FILE:
01068 FILE* file;
01069 int r, left;
01070 u_char buffer[4096];
01071 size_t offset;
01072
01073 if ((file = fopen(filename, "r")) == NULL)
01074 {
01075 log_err("payload file %s can't be opened: %s",
01076 filename, strerror(errno));
01077 return DTN_EINVAL;
01078 }
01079
01080 left = payload_len;
01081 r = 0;
01082 offset = 0;
01083 while (left > 0)
01084 {
01085 r = fread(buffer, 1, (left>4096)?4096:left, file);
01086
01087 if (r)
01088 {
01089 b->mutable_payload()->write_data(buffer, offset, r);
01090 left -= r;
01091 offset += r;
01092 }
01093 else
01094 {
01095 sleep(1);
01096 }
01097 }
01098
01099 fclose(file);
01100 break;
01101
01102 case DTN_PAYLOAD_TEMP_FILE:
01103 if (! b->mutable_payload()->replace_with_file(filename)) {
01104 log_err("payload file %s can't be linked or copied",
01105 filename);
01106 return DTN_EINVAL;
01107 }
01108
01109 if (::unlink(filename) != 0) {
01110 log_err("error unlinking payload temp file: %s",
01111 strerror(errno));
01112
01113 }
01114 }
01115
01116
01117 dtn_bundle_id_t id;
01118 memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
01119 id.creation_ts.secs = b->creation_ts().seconds_;
01120 id.creation_ts.seqno = b->creation_ts().seqno_;
01121 id.frag_offset = 0;
01122 id.orig_length = 0;
01123
01124 log_info("DTN_SEND bundle *%p", b.object());
01125
01126
01127
01128 BundleDaemon::post_and_wait(
01129 new BundleReceivedEvent(b.object(), EVENTSRC_APP),
01130 ¬ifier_);
01131
01132
01133 if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
01134 log_err("internal error in xdr: xdr_dtn_bundle_id_t");
01135 return DTN_EXDR;
01136 }
01137
01138 return DTN_SUCCESS;
01139 }
01140
01141
01142 int
01143 APIClient::handle_cancel()
01144 {
01145 dtn_bundle_id_t id;
01146
01147 memset(&id, 0, sizeof(id));
01148
01149
01150 if (!xdr_dtn_bundle_id_t(&xdr_decode_, &id))
01151 {
01152 log_err("error in xdr unpacking arguments");
01153 return DTN_EXDR;
01154 }
01155
01156 GbofId gbof_id;
01157 gbof_id.source_ = EndpointID( std::string(id.source.uri) );
01158 gbof_id.creation_ts_.seconds_ = id.creation_ts.secs;
01159 gbof_id.creation_ts_.seqno_ = id.creation_ts.seqno;
01160 gbof_id.is_fragment_ = (id.orig_length > 0);
01161 gbof_id.frag_length_ = id.orig_length;
01162 gbof_id.frag_offset_ = id.frag_offset;
01163
01164 BundleRef bundle;
01165 oasys::ScopeLock pending_lock(
01166 BundleDaemon::instance()->pending_bundles()->lock(), "handle_cancel");
01167 bundle = BundleDaemon::instance()->pending_bundles()->find(gbof_id);
01168
01169 if (!bundle.object()) {
01170 log_warn("no bundle matching [%s]; cannot cancel",
01171 gbof_id.str().c_str());
01172 return DTN_ENOTFOUND;
01173 }
01174
01175 log_info("DTN_CANCEL bundle *%p", bundle.object());
01176
01177 BundleDaemon::post(new BundleCancelRequest(bundle, std::string()));
01178 return DTN_SUCCESS;
01179 }
01180
01181
01182
01183 #define DTN_FILE_DELIVERY_BUF_SIZE 1000
01184
01185
01186 int
01187 APIClient::handle_recv()
01188 {
01189 dtn_bundle_spec_t spec;
01190 dtn_bundle_payload_t payload;
01191 dtn_bundle_payload_location_t location;
01192 dtn_bundle_status_report_t status_report;
01193 dtn_timeval_t timeout;
01194 oasys::ScratchBuffer<u_char*> buf;
01195 APIRegistration* reg = NULL;
01196 bool sock_ready = false;
01197 oasys::FileIOClient tmpfile;
01198
01199
01200 if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
01201 (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
01202 {
01203 log_err("error in xdr unpacking arguments");
01204 return DTN_EXDR;
01205 }
01206
01207 int err = wait_for_notify("recv", timeout, ®, NULL, &sock_ready);
01208 if (err != 0) {
01209 return err;
01210 }
01211
01212
01213
01214
01215 if (sock_ready) {
01216 return handle_unexpected_data("handle_recv");
01217 }
01218
01219 ASSERT(reg != NULL);
01220
01221 BundleRef bref("APIClient::handle_recv");
01222 bref = reg->bundle_list()->pop_front();
01223 Bundle* b = bref.object();
01224 ASSERT(b != NULL);
01225
01226 log_debug("handle_recv: popped *%p for registration %d (timeout %d)",
01227 b, reg->regid(), timeout);
01228
01229 memset(&spec, 0, sizeof(spec));
01230 memset(&payload, 0, sizeof(payload));
01231 memset(&status_report, 0, sizeof(status_report));
01232
01233
01234
01235 b->source().copyto(&spec.source);
01236 b->dest().copyto(&spec.dest);
01237 b->replyto().copyto(&spec.replyto);
01238
01239 spec.dopts = 0;
01240 if (b->custody_requested()) spec.dopts |= DOPTS_CUSTODY;
01241 if (b->delivery_rcpt()) spec.dopts |= DOPTS_DELIVERY_RCPT;
01242 if (b->receive_rcpt()) spec.dopts |= DOPTS_RECEIVE_RCPT;
01243 if (b->forward_rcpt()) spec.dopts |= DOPTS_FORWARD_RCPT;
01244 if (b->custody_rcpt()) spec.dopts |= DOPTS_CUSTODY_RCPT;
01245 if (b->deletion_rcpt()) spec.dopts |= DOPTS_DELETE_RCPT;
01246
01247 spec.expiration = b->expiration();
01248 spec.creation_ts.secs = b->creation_ts().seconds_;
01249 spec.creation_ts.seqno = b->creation_ts().seqno_;
01250 spec.delivery_regid = reg->regid();
01251
01252
01253 std::string sequence_id_str, obsoletes_id_str;
01254 if (! b->sequence_id().empty()) {
01255 sequence_id_str = b->sequence_id().to_str();
01256 spec.sequence_id.data.data_val = const_cast<char*>(sequence_id_str.c_str());
01257 spec.sequence_id.data.data_len = sequence_id_str.length();
01258 }
01259
01260 if (! b->obsoletes_id().empty()) {
01261 obsoletes_id_str = b->obsoletes_id().to_str();
01262 spec.obsoletes_id.data.data_val = const_cast<char*>(obsoletes_id_str.c_str());
01263 spec.obsoletes_id.data.data_len = obsoletes_id_str.length();
01264 }
01265
01266
01267 unsigned int blocks_found = 0;
01268 unsigned int data_len = 0;
01269 for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) {
01270 if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
01271 (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
01272 (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) {
01273 continue;
01274 }
01275 blocks_found++;
01276 data_len += b->recv_blocks()[i].data_length();
01277 }
01278
01279 if (blocks_found > 0) {
01280 unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
01281 data_len;
01282 void * buf = malloc(buf_len);
01283 memset(buf, 0, buf_len);
01284
01285 dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
01286 char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
01287 for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) {
01288 if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
01289 (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
01290 (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) {
01291 continue;
01292 }
01293
01294 bp->type = b->recv_blocks()[i].type();
01295 bp->flags = b->recv_blocks()[i].flags();
01296 bp->data.data_len = b->recv_blocks()[i].data_length();
01297 bp->data.data_val = dp;
01298 memcpy(dp, b->recv_blocks()[i].data(), bp->data.data_len);
01299
01300 bp++;
01301 dp += bp->data.data_len;
01302 }
01303
01304 spec.blocks.blocks_len = blocks_found;
01305 spec.blocks.blocks_val = (dtn_extension_block_t *)buf;
01306 }
01307
01308
01309 blocks_found = 0;
01310 data_len = 0;
01311 for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) {
01312 blocks_found++;
01313 data_len += b->recv_metadata()[i]->metadata_len();
01314 }
01315
01316 if (blocks_found > 0) {
01317 unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
01318 data_len;
01319 void * buf = (char *)malloc(buf_len);
01320 memset(buf, 0, buf_len);
01321
01322 dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
01323 char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
01324 for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) {
01325 bp->type = b->recv_metadata()[i]->ontology();
01326 bp->flags = b->recv_metadata()[i]->flags();
01327 bp->data.data_len = b->recv_metadata()[i]->metadata_len();
01328 bp->data.data_val = dp;
01329 memcpy(dp, b->recv_metadata()[i]->metadata(), bp->data.data_len);
01330 dp += bp->data.data_len;
01331 bp++;
01332 }
01333
01334 spec.metadata.metadata_len = blocks_found;
01335 spec.metadata.metadata_val = (dtn_extension_block_t *)buf;
01336 }
01337
01338 size_t payload_len = b->payload().length();
01339
01340 if (location == DTN_PAYLOAD_MEM && payload_len > DTN_MAX_BUNDLE_MEM)
01341 {
01342 log_debug("app requested memory delivery but payload is too big (%zu bytes)... "
01343 "using files instead",
01344 payload_len);
01345 location = DTN_PAYLOAD_FILE;
01346 }
01347
01348 if (location == DTN_PAYLOAD_MEM) {
01349
01350 payload.buf.buf_len = payload_len;
01351 if (payload_len != 0) {
01352 buf.reserve(payload_len);
01353 payload.buf.buf_val =
01354 (char*)b->payload().read_data(0, payload_len, buf.buf());
01355 } else {
01356 payload.buf.buf_val = 0;
01357 }
01358
01359 } else if (location == DTN_PAYLOAD_FILE) {
01360 char *tdir, templ[64];
01361
01362 tdir = getenv("TMP");
01363 if (tdir == NULL) {
01364 tdir = getenv("TEMP");
01365 }
01366 if (tdir == NULL) {
01367 tdir = "/tmp";
01368 }
01369
01370 snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
01371
01372 if (tmpfile.mkstemp(templ) == -1) {
01373 log_err("can't open temporary file to deliver bundle");
01374 return DTN_EINTERNAL;
01375 }
01376
01377 if (chmod(tmpfile.path(), 0666) < 0) {
01378 log_warn("can't set the permission of temp file to 0666: %s",
01379 strerror(errno));
01380 }
01381
01382 b->payload().copy_file(&tmpfile);
01383
01384 payload.filename.filename_val = (char*)tmpfile.path();
01385 payload.filename.filename_len = tmpfile.path_len() + 1;
01386 tmpfile.close();
01387
01388 } else {
01389 log_err("payload location %d not understood", location);
01390 return DTN_EINVAL;
01391 }
01392
01393 payload.location = location;
01394
01395
01396
01397
01398
01399 BundleStatusReport::data_t sr_data;
01400 if (BundleStatusReport::parse_status_report(&sr_data, b))
01401 {
01402 payload.status_report = &status_report;
01403 sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source);
01404 status_report.bundle_id.creation_ts.secs =
01405 sr_data.orig_creation_tv_.seconds_;
01406 status_report.bundle_id.creation_ts.seqno =
01407 sr_data.orig_creation_tv_.seqno_;
01408 status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_;
01409 status_report.bundle_id.orig_length = sr_data.orig_frag_length_;
01410
01411 status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_;
01412 status_report.flags = (dtn_status_report_flags_t)sr_data.status_flags_;
01413
01414 status_report.receipt_ts.secs = sr_data.receipt_tv_.seconds_;
01415 status_report.receipt_ts.seqno = sr_data.receipt_tv_.seqno_;
01416 status_report.custody_ts.secs = sr_data.custody_tv_.seconds_;
01417 status_report.custody_ts.seqno = sr_data.custody_tv_.seqno_;
01418 status_report.forwarding_ts.secs = sr_data.forwarding_tv_.seconds_;
01419 status_report.forwarding_ts.seqno = sr_data.forwarding_tv_.seqno_;
01420 status_report.delivery_ts.secs = sr_data.delivery_tv_.seconds_;
01421 status_report.delivery_ts.seqno = sr_data.delivery_tv_.seqno_;
01422 status_report.deletion_ts.secs = sr_data.deletion_tv_.seconds_;
01423 status_report.deletion_ts.seqno = sr_data.deletion_tv_.seqno_;
01424 status_report.ack_by_app_ts.secs = sr_data.ack_by_app_tv_.seconds_;
01425 status_report.ack_by_app_ts.seqno = sr_data.ack_by_app_tv_.seqno_;
01426 }
01427
01428 if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
01429 {
01430 log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
01431 return DTN_EXDR;
01432 }
01433
01434 if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
01435 {
01436 log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
01437 return DTN_EXDR;
01438 }
01439
01440
01441 payload.status_report = NULL;
01442
01443 log_info("DTN_RECV: "
01444 "successfully delivered bundle %d to registration %d",
01445 b->bundleid(), reg->regid());
01446
01447 BundleDaemon::post(new BundleDeliveredEvent(b, reg));
01448
01449 return DTN_SUCCESS;
01450 }
01451
01452
01453 int
01454 APIClient::handle_begin_poll()
01455 {
01456 dtn_timeval_t timeout;
01457 APIRegistration* recv_reg = NULL;
01458 APIRegistration* notify_reg = NULL;
01459 bool sock_ready = false;
01460
01461
01462 if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
01463 {
01464 log_err("error in xdr unpacking arguments");
01465 return DTN_EXDR;
01466 }
01467
01468 int err = wait_for_notify("poll", timeout, &recv_reg, ¬ify_reg,
01469 &sock_ready);
01470 if (err != 0) {
01471 return err;
01472 }
01473
01474
01475
01476 if (sock_ready) {
01477 log_debug("handle_begin_poll: "
01478 "api socket ready -- trying to read one byte");
01479 char type;
01480
01481 int ret = read(&type, 1);
01482 if (ret == 0) {
01483 log_info("IPC socket closed while blocked in read... "
01484 "application must have exited");
01485 return -1;
01486 }
01487
01488 if (ret == -1) {
01489 log_err("handle_begin_poll: protocol error -- "
01490 "error while blocked in poll");
01491 return DTN_ECOMM;
01492 }
01493
01494 if (type != DTN_CANCEL_POLL) {
01495 log_err("handle_poll: error got unexpected message '%s' "
01496 "while blocked in poll", dtnipc_msgtoa(type));
01497 return DTN_ECOMM;
01498 }
01499
01500
01501 u_int32_t len;
01502 ret = read((char*)&len, 4);
01503 if (ret != 4 || len != 0) {
01504 log_err("handle_begin_poll: protocol error -- "
01505 "error getting cancel poll length");
01506 return DTN_ECOMM;
01507 }
01508
01509 total_rcvd_ += 5;
01510
01511 log_debug("got DTN_CANCEL_POLL while blocked in poll");
01512
01513
01514
01515 send_response(DTN_SUCCESS);
01516 } else if (recv_reg != NULL) {
01517 log_debug("handle_begin_poll: bundle arrived");
01518
01519 } else if (notify_reg != NULL) {
01520 log_debug("handle_begin_poll: subscriber notify arrived");
01521
01522 } else {
01523
01524 NOTREACHED;
01525 }
01526
01527 return DTN_SUCCESS;
01528 }
01529
01530
01531 int
01532 APIClient::handle_cancel_poll()
01533 {
01534
01535
01536
01537
01538
01539 return DTN_SUCCESS;
01540 }
01541
01542
01543 int
01544 APIClient::handle_close()
01545 {
01546 log_info("received DTN_CLOSE message; closing API handle");
01547
01548 return -1;
01549 }
01550
01551
01552 int
01553 APIClient::handle_session_update()
01554 {
01555 APIRegistration* reg = NULL;
01556 bool sock_ready = false;
01557 dtn_timeval_t timeout;
01558
01559
01560 if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
01561 {
01562 log_err("error in xdr unpacking arguments");
01563 return DTN_EXDR;
01564 }
01565
01566 int err = wait_for_notify("session_update", timeout, NULL, ®,
01567 &sock_ready);
01568 if (err != 0) {
01569 return err;
01570 }
01571
01572
01573
01574
01575 if (sock_ready) {
01576 return handle_unexpected_data("handle_session_update");
01577 }
01578
01579 ASSERT(reg != NULL);
01580
01581 BundleRef bref("APIClient::handle_session_update");
01582 bref = reg->session_notify_list()->pop_front();
01583 Bundle* b = bref.object();
01584 ASSERT(b != NULL);
01585
01586 log_debug("handle_session_update: "
01587 "popped *%p for registration %d (timeout %d)",
01588 b, reg->regid(), timeout);
01589
01590
01591 ASSERT(b->session_flags() != 0);
01592
01593 unsigned int session_flags = 0;
01594 if (b->session_flags() & Session::SUBSCRIBE) {
01595 session_flags |= DTN_SESSION_SUBSCRIBE;
01596 }
01597
01598
01599 dtn_endpoint_id_t session_eid;
01600 b->session_eid().copyto(&session_eid);
01601
01602 if (!xdr_u_int(&xdr_encode_, &session_flags) ||
01603 !xdr_dtn_endpoint_id_t(&xdr_encode_, &session_eid))
01604 {
01605 log_err("internal error in xdr");
01606 return DTN_EXDR;
01607 }
01608
01609 log_info("session_update: "
01610 "notification for session %s status %s",
01611 b->session_eid().c_str(), Session::flag_str(b->session_flags()));
01612
01613 BundleDaemon::post(new BundleDeliveredEvent(b, reg));
01614
01615 return DTN_SUCCESS;
01616 }
01617
01618
01619 int
01620 APIClient::wait_for_notify(const char* operation,
01621 dtn_timeval_t dtn_timeout,
01622 APIRegistration** recv_ready_reg,
01623 APIRegistration** session_ready_reg,
01624 bool* sock_ready)
01625 {
01626 APIRegistration* reg;
01627
01628 ASSERT(sock_ready != NULL);
01629 if (recv_ready_reg) *recv_ready_reg = NULL;
01630 if (session_ready_reg) *session_ready_reg = NULL;
01631
01632 if (bindings_->empty()) {
01633 log_err("wait_for_notify(%s): no bound registrations", operation);
01634 return DTN_EINVAL;
01635 }
01636
01637 int timeout = (int)dtn_timeout;
01638 if (timeout < -1) {
01639 log_err("wait_for_notify(%s): "
01640 "invalid timeout value %d", operation, timeout);
01641 return DTN_EINVAL;
01642 }
01643
01644
01645
01646
01647
01648
01649
01650 struct pollfd static_pollfds[64];
01651 struct pollfd* pollfds;
01652 oasys::ScopeMalloc pollfd_malloc;
01653 size_t npollfds = 1;
01654 if (recv_ready_reg) npollfds += bindings_->size();
01655 if (session_ready_reg) npollfds += sessions_->size();
01656
01657 if (npollfds <= 64) {
01658 pollfds = &static_pollfds[0];
01659 } else {
01660 pollfds = (struct pollfd*)malloc(npollfds * sizeof(struct pollfd));
01661 pollfd_malloc = pollfds;
01662 }
01663
01664 struct pollfd* sock_poll = &pollfds[0];
01665 sock_poll->fd = TCPClient::fd_;
01666 sock_poll->events = POLLIN | POLLERR;
01667 sock_poll->revents = 0;
01668
01669
01670
01671
01672 APIRegistrationList::iterator iter;
01673 unsigned int i = 1;
01674 if (recv_ready_reg) {
01675 log_debug("wait_for_notify(%s): checking %zu bindings",
01676 operation, bindings_->size());
01677
01678 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
01679 reg = *iter;
01680
01681 if (! reg->bundle_list()->empty()) {
01682 log_debug("wait_for_notify(%s): "
01683 "immediately returning bundle for reg %d",
01684 operation, reg->regid());
01685 *recv_ready_reg = reg;
01686 return 0;
01687 }
01688
01689 pollfds[i].fd = reg->bundle_list()->notifier()->read_fd();
01690 pollfds[i].events = POLLIN;
01691 pollfds[i].revents = 0;
01692 ++i;
01693 ASSERT(i <= npollfds);
01694 }
01695 }
01696
01697
01698 if (session_ready_reg) {
01699 log_debug("wait_for_notify(%s): checking %zu sessions",
01700 operation, sessions_->size());
01701
01702 for (iter = sessions_->begin(); iter != sessions_->end(); ++iter)
01703 {
01704 reg = *iter;
01705 ASSERT(reg->session_notify_list() != NULL);
01706 if (! reg->session_notify_list()->empty()) {
01707 log_debug("wait_for_notify(%s): "
01708 "immediately returning notified reg %d",
01709 operation, reg->regid());
01710 *session_ready_reg = reg;
01711 return 0;
01712 }
01713
01714 pollfds[i].fd = reg->session_notify_list()->notifier()->read_fd();
01715 pollfds[i].events = POLLIN;
01716 pollfds[i].revents = 0;
01717 ++i;
01718 ASSERT(i <= npollfds);
01719 }
01720 }
01721
01722 if (timeout == 0) {
01723 log_debug("wait_for_notify(%s): "
01724 "no ready registrations and timeout=%d, returning immediately",
01725 operation, timeout);
01726 return DTN_ETIMEOUT;
01727 }
01728
01729 log_debug("wait_for_notify(%s): "
01730 "blocking to get events from %zu sources (timeout %d)",
01731 operation, npollfds, timeout);
01732 int nready = oasys::IO::poll_multiple(&pollfds[0], npollfds, timeout,
01733 NULL, logpath_);
01734
01735 if (nready == oasys::IOTIMEOUT) {
01736 log_debug("wait_for_notify(%s): timeout waiting for events",
01737 operation);
01738 return DTN_ETIMEOUT;
01739
01740 } else if (nready <= 0) {
01741 log_err("wait_for_notify(%s): unexpected error polling for events",
01742 operation);
01743 return DTN_EINTERNAL;
01744 }
01745
01746
01747
01748 if (sock_poll->revents != 0) {
01749 *sock_ready = true;
01750 return 0;
01751 }
01752
01753
01754
01755 if (recv_ready_reg) {
01756 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
01757 reg = *iter;
01758 if (! reg->bundle_list()->empty()) {
01759 *recv_ready_reg = reg;
01760 break;
01761 }
01762 }
01763 }
01764
01765 if (session_ready_reg) {
01766 for (iter = sessions_->begin(); iter != sessions_->end(); ++iter)
01767 {
01768 reg = *iter;
01769 if (! reg->session_notify_list()->empty()) {
01770 *session_ready_reg = reg;
01771 break;
01772 }
01773 }
01774 }
01775
01776 if ((recv_ready_reg && *recv_ready_reg == NULL) &&
01777 (session_ready_reg && *session_ready_reg == NULL))
01778 {
01779 log_err("wait_for_notify(%s): error -- no lists have any events",
01780 operation);
01781 return DTN_EINTERNAL;
01782 }
01783
01784 return 0;
01785 }
01786
01787
01788 int
01789 APIClient::handle_unexpected_data(const char* operation)
01790 {
01791 log_debug("%s: api socket ready -- trying to read one byte",
01792 operation);
01793 char b;
01794 if (read(&b, 1) != 0) {
01795 log_err("%s: protocol error -- "
01796 "data arrived or error while blocked in recv",
01797 operation);
01798 return DTN_ECOMM;
01799 }
01800
01801 log_info("IPC socket closed while blocked in read... "
01802 "application must have exited");
01803 return -1;
01804 }
01805
01806 }