00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include "TableBasedRouter.h"
00022 #include "RouteTable.h"
00023 #include "bundling/BundleActions.h"
00024 #include "bundling/BundleDaemon.h"
00025 #include "bundling/TempBundle.h"
00026 #include "contacts/Contact.h"
00027 #include "contacts/ContactManager.h"
00028 #include "contacts/Link.h"
00029 #include "reg/Registration.h"
00030 #include "session/Session.h"
00031
00032 namespace dtn {
00033
00034
00035 TableBasedRouter::TableBasedRouter(const char* classname,
00036 const std::string& name)
00037 : BundleRouter(classname, name),
00038 reception_cache_(std::string(logpath()) + "/reception_cache",
00039 1024)
00040 {
00041 route_table_ = new RouteTable(name);
00042 }
00043
00044
00045 TableBasedRouter::~TableBasedRouter()
00046 {
00047 delete route_table_;
00048 }
00049
00050
00051 void
00052 TableBasedRouter::add_route(RouteEntry *entry)
00053 {
00054 route_table_->add_entry(entry);
00055 handle_changed_routes();
00056 }
00057
00058
00059 void
00060 TableBasedRouter::del_route(const EndpointIDPattern& dest)
00061 {
00062 route_table_->del_entries(dest);
00063
00064
00065
00066 reception_cache_.evict_all();
00067
00068
00069 }
00070
00071
00072 void
00073 TableBasedRouter::handle_changed_routes()
00074 {
00075
00076
00077 reception_cache_.evict_all();
00078 reroute_all_bundles();
00079 reroute_all_sessions();
00080 }
00081
00082
00083 void
00084 TableBasedRouter::handle_event(BundleEvent* event)
00085 {
00086 dispatch_event(event);
00087 }
00088
00089
00090 Session*
00091 TableBasedRouter::get_session_for_bundle(Bundle* bundle)
00092 {
00093 if (bundle->session_flags() != 0)
00094 {
00095 log_debug("get_session_for_bundle: bundle id %d is a subscription msg",
00096 bundle->bundleid());
00097 return NULL;
00098 }
00099
00100 if (bundle->sequence_id().empty() &&
00101 bundle->obsoletes_id().empty() &&
00102 bundle->session_eid().length() == 0)
00103 {
00104 log_debug("get_session_for_bundle: bundle id %u not a session bundle",
00105 bundle->bundleid());
00106 return NULL;
00107 }
00108
00109 EndpointID session_eid = bundle->session_eid();
00110 if (session_eid.length() == 0)
00111 {
00112 session_eid.assign(std::string("dtn-unicast-session:") +
00113 bundle->source().str() +
00114 "," +
00115 bundle->dest().str());
00116 ASSERT(session_eid.valid());
00117 }
00118
00119 Session* session = sessions_.get_session(session_eid);
00120 log_debug("get_session_for_bundle: *%p *%p", bundle, session);
00121 return session;
00122 }
00123
00124
00125 bool
00126 TableBasedRouter::add_bundle_to_session(Bundle* bundle, Session* session)
00127 {
00128
00129 static BundleProtocol::status_report_reason_t deletion_reason =
00130 BundleProtocol::REASON_DEPLETED_STORAGE;
00131
00132 log_debug("adding *%p to *%p", bundle, session);
00133
00134 if (! bundle->sequence_id().empty())
00135 {
00136 oasys::ScopeLock l(session->bundles()->lock(),
00137 "TableBasedRouter::add_subscriber");
00138 BundleList::iterator iter = session->bundles()->begin();
00139 while (iter != session->bundles()->end())
00140 {
00141 Bundle* old_bundle = *iter;
00142 ++iter;
00143
00144
00145 if (old_bundle->sequence_id().empty()) {
00146 continue;
00147 }
00148
00149
00150
00151 if (bundle->obsoletes_id() >= old_bundle->sequence_id())
00152 {
00153 log_debug("*%p obsoletes *%p... removing old bundle",
00154 bundle, old_bundle);
00155
00156 bool ok = session->bundles()->erase(old_bundle);
00157 ASSERT(ok);
00158 BundleDaemon::post_at_head(
00159 new BundleDeleteRequest(old_bundle, deletion_reason));
00160 continue;
00161 }
00162
00163
00164 if (old_bundle->obsoletes_id() >= bundle->sequence_id())
00165 {
00166 log_debug("*%p obsoletes *%p... ignoring new arrival",
00167 old_bundle, bundle);
00168 BundleDaemon::post_at_head(
00169 new BundleDeleteRequest(bundle, deletion_reason));
00170 return false;
00171 }
00172
00173
00174
00175
00176 if (bundle->sequence_id() == old_bundle->sequence_id())
00177 {
00178 log_debug("*%p and *%p have same sequence id... "
00179 "ignoring new arrival",
00180 old_bundle, bundle);
00181 BundleDaemon::post_at_head(
00182 new BundleDeleteRequest(bundle, deletion_reason));
00183 return false;
00184 }
00185
00186 log_debug("compared *%p and *%p, nothing is obsoleted",
00187 old_bundle, bundle);
00188 }
00189 }
00190
00191 session->bundles()->push_back(bundle);
00192 session->sequence_id()->update(bundle->sequence_id());
00193
00194 return true;
00195 }
00196
00197
00198 void
00199 TableBasedRouter::handle_bundle_received(BundleReceivedEvent* event)
00200 {
00201 bool should_route = true;
00202
00203 Bundle* bundle = event->bundleref_.object();
00204 log_debug("handle bundle received: *%p", bundle);
00205
00206 EndpointID remote_eid(EndpointID::NULL_EID());
00207
00208 if (event->link_ != NULL) {
00209 remote_eid = event->link_->remote_eid();
00210 }
00211
00212 if (! reception_cache_.add_entry(bundle, remote_eid))
00213 {
00214 log_info("ignoring duplicate bundle: *%p", bundle);
00215 BundleDaemon::post_at_head(
00216 new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO));
00217 return;
00218 }
00219
00220
00221
00222
00223
00224 Session* session = get_session_for_bundle(bundle);
00225 if (session != NULL)
00226 {
00227
00228
00229
00230 should_route = add_bundle_to_session(bundle, session);
00231 if (! should_route) {
00232 log_debug("session bundle %u is DOA", bundle->bundleid());
00233 return;
00234 }
00235 }
00236
00237
00238
00239 if (bundle->session_flags() != 0) {
00240 should_route = handle_session_bundle(event);
00241 }
00242
00243 if (should_route) {
00244 route_bundle(bundle);
00245 } else {
00246 BundleDaemon::post_at_head(
00247 new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO));
00248 }
00249 }
00250
00251
00252 void
00253 TableBasedRouter::remove_from_deferred(const BundleRef& bundle, int actions)
00254 {
00255 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00256 oasys::ScopeLock l(cm->lock(), "TableBasedRouter::remove_from_deferred");
00257
00258 const LinkSet* links = cm->links();
00259 LinkSet::const_iterator iter;
00260 for (iter = links->begin(); iter != links->end(); ++iter) {
00261 const LinkRef& link = *iter;
00262
00263
00264
00265
00266
00267
00268 if (link->router_info() == NULL) {
00269 continue;
00270 }
00271
00272 DeferredList* deferred = deferred_list(link);
00273 ForwardingInfo info;
00274 if (deferred->find(bundle, &info))
00275 {
00276 if (info.action() & actions) {
00277 log_debug("removing bundle *%p from link *%p deferred list",
00278 bundle.object(), (*iter).object());
00279 deferred->del(bundle);
00280 }
00281 }
00282 }
00283 }
00284
00285
00286 void
00287 TableBasedRouter::handle_bundle_transmitted(BundleTransmittedEvent* event)
00288 {
00289 const BundleRef& bundle = event->bundleref_;
00290 log_debug("handle bundle transmitted: *%p", bundle.object());
00291
00292
00293
00294 remove_from_deferred(bundle, ForwardingInfo::FORWARD_ACTION);
00295
00296
00297
00298 const LinkRef& link = event->contact_->link();
00299 check_next_hop(link);
00300 }
00301
00302
00303 bool
00304 TableBasedRouter::can_delete_bundle(const BundleRef& bundle)
00305 {
00306 log_debug("TableBasedRouter::can_delete_bundle: checking if we can delete *%p",
00307 bundle.object());
00308
00309
00310 if (bundle->fwdlog()->get_count(ForwardingInfo::TRANSMITTED |
00311 ForwardingInfo::DELIVERED) == 0)
00312 {
00313 log_debug("TableBasedRouter::can_delete_bundle(%u): "
00314 "not yet transmitted or delivered",
00315 bundle->bundleid());
00316 return false;
00317 }
00318
00319
00320 if (bundle->local_custody()) {
00321 log_debug("TableBasedRouter::can_delete_bundle(%u): "
00322 "not deleting because we have custody",
00323 bundle->bundleid());
00324 return false;
00325 }
00326
00327
00328 Session* session = get_session_for_bundle(bundle.object());
00329 if (session && !session->subscribers().empty())
00330 {
00331 log_debug("TableBasedRouter::can_delete_bundle(%u): "
00332 "session has subscribers",
00333 bundle->bundleid());
00334 return false;
00335 }
00336
00337 return true;
00338 }
00339
00340
00341 void
00342 TableBasedRouter::delete_bundle(const BundleRef& bundle)
00343 {
00344 log_debug("delete *%p", bundle.object());
00345
00346 remove_from_deferred(bundle, ForwardingInfo::ANY_ACTION);
00347
00348 Session* session = get_session_for_bundle(bundle.object());
00349 if (session)
00350 {
00351 bool ok = session->bundles()->erase(bundle);
00352 (void)ok;
00353
00354 log_debug("delete_bundle: removing *%p from *%p: %s",
00355 bundle.object(), session, ok ? "success" : "not in session list");
00356
00357
00358 }
00359
00360
00361
00362 }
00363
00364
00365 void
00366 TableBasedRouter::handle_bundle_cancelled(BundleSendCancelledEvent* event)
00367 {
00368 Bundle* bundle = event->bundleref_.object();
00369 log_debug("handle bundle cancelled: *%p", bundle);
00370
00371
00372
00373 if (!bundle->expired()) {
00374 route_bundle(bundle);
00375 }
00376 }
00377
00378
00379 void
00380 TableBasedRouter::handle_route_add(RouteAddEvent* event)
00381 {
00382 add_route(event->entry_);
00383 }
00384
00385
00386 void
00387 TableBasedRouter::handle_route_del(RouteDelEvent* event)
00388 {
00389 del_route(event->dest_);
00390 }
00391
00392
00393 void
00394 TableBasedRouter::add_nexthop_route(const LinkRef& link)
00395 {
00396
00397
00398
00399
00400 EndpointID eid = link->remote_eid();
00401 if (config_.add_nexthop_routes_ && eid != EndpointID::NULL_EID())
00402 {
00403 EndpointIDPattern eid_pattern(link->remote_eid());
00404
00405
00406 if (!eid_pattern.append_service_wildcard())
00407
00408 eid_pattern.assign(link->remote_eid());
00409
00410
00411
00412
00413 RouteEntryVec ignored;
00414 if (route_table_->get_matching(eid_pattern, link, &ignored) == 0) {
00415 RouteEntry *entry = new RouteEntry(eid_pattern, link);
00416 entry->set_action(ForwardingInfo::FORWARD_ACTION);
00417 add_route(entry);
00418 }
00419 }
00420 }
00421
00422
00423 bool
00424 TableBasedRouter::should_fwd(const Bundle* bundle, RouteEntry* route)
00425 {
00426 if (route == NULL)
00427 return false;
00428
00429
00430
00431
00432 EndpointID prevhop;
00433 if (reception_cache_.lookup(bundle, &prevhop))
00434 {
00435 if (prevhop == route->link()->remote_eid() &&
00436 prevhop != EndpointID::NULL_EID())
00437 {
00438 log_debug("should_fwd bundle %d: "
00439 "skip %s since bundle arrived from the same node",
00440 bundle->bundleid(), route->link()->name());
00441 return false;
00442 }
00443 }
00444
00445 return BundleRouter::should_fwd(bundle, route->link(), route->action());
00446 }
00447
00448
00449 void
00450 TableBasedRouter::handle_contact_up(ContactUpEvent* event)
00451 {
00452 LinkRef link = event->contact_->link();
00453 ASSERT(link != NULL);
00454 ASSERT(!link->isdeleted());
00455
00456 if (! link->isopen()) {
00457 log_err("contact up(*%p): event delivered but link not open",
00458 link.object());
00459 }
00460
00461 add_nexthop_route(link);
00462 check_next_hop(link);
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473 RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
00474 if (iter != reroute_timers_.end()) {
00475 log_debug("link %s reopened, cancelling reroute timer", link->name());
00476 RerouteTimer* t = iter->second;
00477 reroute_timers_.erase(iter);
00478 t->cancel();
00479 }
00480 }
00481
00482
00483 void
00484 TableBasedRouter::handle_contact_down(ContactDownEvent* event)
00485 {
00486 LinkRef link = event->contact_->link();
00487 ASSERT(link != NULL);
00488 ASSERT(!link->isdeleted());
00489
00490
00491
00492
00493
00494 size_t num_queued = link->queue()->size();
00495 if (num_queued != 0) {
00496 RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
00497 if (iter == reroute_timers_.end()) {
00498 log_debug("link %s went down with %zu bundles queued, "
00499 "scheduling reroute timer in %u seconds",
00500 link->name(), num_queued,
00501 link->params().potential_downtime_);
00502 RerouteTimer* t = new RerouteTimer(this, link);
00503 t->schedule_in(link->params().potential_downtime_ * 1000);
00504
00505 reroute_timers_[link->name_str()] = t;
00506 }
00507 }
00508 }
00509
00510
00511 void
00512 TableBasedRouter::RerouteTimer::timeout(const struct timeval& now)
00513 {
00514 (void)now;
00515 router_->reroute_bundles(link_);
00516 }
00517
00518
00519 void
00520 TableBasedRouter::reroute_bundles(const LinkRef& link)
00521 {
00522 ASSERT(!link->isdeleted());
00523
00524
00525
00526 if (link->state() != Link::UNAVAILABLE) {
00527 log_warn("reroute timer fired but link *%p state is %s, not UNAVAILABLE",
00528 link.object(), Link::state_to_str(link->state()));
00529 return;
00530 }
00531
00532 log_debug("reroute timer fired -- cancelling %zu bundles on link *%p",
00533 link->queue()->size(), link.object());
00534
00535
00536
00537
00538
00539
00540 oasys::ScopeLock l(link->queue()->lock(),
00541 "TableBasedRouter::reroute_bundles");
00542 BundleRef bundle("TableBasedRouter::reroute_bundles");
00543 while (! link->queue()->empty()) {
00544 bundle = link->queue()->front();
00545 actions_->cancel_bundle(bundle.object(), link);
00546 ASSERT(! bundle->is_queued_on(link->queue()));
00547 }
00548
00549
00550
00551 ASSERT(link->inflight()->empty());
00552 }
00553
00554
00555 void
00556 TableBasedRouter::handle_link_available(LinkAvailableEvent* event)
00557 {
00558 LinkRef link = event->link_;
00559 ASSERT(link != NULL);
00560 ASSERT(!link->isdeleted());
00561
00562
00563 if (config_.open_discovered_links_ &&
00564 !link->isopen() &&
00565 link->type() == Link::OPPORTUNISTIC &&
00566 event->reason_ == ContactEvent::DISCOVERY)
00567 {
00568 actions_->open_link(link);
00569 }
00570
00571
00572 check_next_hop(link);
00573 }
00574
00575
00576 void
00577 TableBasedRouter::handle_link_created(LinkCreatedEvent* event)
00578 {
00579 LinkRef link = event->link_;
00580 ASSERT(link != NULL);
00581 ASSERT(!link->isdeleted());
00582
00583 link->set_router_info(new DeferredList(logpath(), link));
00584
00585 add_nexthop_route(link);
00586 }
00587
00588
00589 void
00590 TableBasedRouter::handle_link_deleted(LinkDeletedEvent* event)
00591 {
00592 LinkRef link = event->link_;
00593 ASSERT(link != NULL);
00594
00595 route_table_->del_entries_for_nexthop(link);
00596
00597 RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
00598 if (iter != reroute_timers_.end()) {
00599 log_debug("link %s deleted, cancelling reroute timer", link->name());
00600 RerouteTimer* t = iter->second;
00601 reroute_timers_.erase(iter);
00602 t->cancel();
00603 }
00604 }
00605
00606
00607 void
00608 TableBasedRouter::handle_custody_timeout(CustodyTimeoutEvent* event)
00609 {
00610
00611
00612
00613
00614
00615
00616
00617 route_bundle(event->bundle_.object());
00618 }
00619
00620
00621 void
00622 TableBasedRouter::get_routing_state(oasys::StringBuffer* buf)
00623 {
00624 buf->appendf("Route table for %s router:\n\n", name_.c_str());
00625 route_table_->dump(buf);
00626
00627 if (!sessions_.empty())
00628 {
00629 buf->appendf("Session table (%zu sessions):\n", sessions_.size());
00630 sessions_.dump(buf);
00631 buf->appendf("\n");
00632 }
00633
00634 if (!session_custodians_.empty())
00635 {
00636 buf->appendf("Session custodians (%zu registrations):\n",
00637 session_custodians_.size());
00638
00639 for (RegistrationList::iterator iter = session_custodians_.begin();
00640 iter != session_custodians_.end(); ++iter)
00641 {
00642 buf->appendf(" *%p\n", *iter);
00643 }
00644 buf->appendf("\n");
00645 }
00646 }
00647
00648
00649 void
00650 TableBasedRouter::tcl_dump_state(oasys::StringBuffer* buf)
00651 {
00652 oasys::ScopeLock l(route_table_->lock(),
00653 "TableBasedRouter::tcl_dump_state");
00654
00655 RouteEntryVec::const_iterator iter;
00656 for (iter = route_table_->route_table()->begin();
00657 iter != route_table_->route_table()->end(); ++iter)
00658 {
00659 const RouteEntry* e = *iter;
00660 buf->appendf(" {%s %s source_eid %s priority %d} ",
00661 e->dest_pattern().c_str(),
00662 e->next_hop_str().c_str(),
00663 e->source_pattern().c_str(),
00664 e->priority());
00665 }
00666 }
00667
00668
00669 bool
00670 TableBasedRouter::fwd_to_nexthop(Bundle* bundle, RouteEntry* route)
00671 {
00672 const LinkRef& link = route->link();
00673
00674
00675 if (link->isavailable() && (!link->isopen()) && (!link->isopening())) {
00676 log_debug("opening *%p because a message is intended for it",
00677 link.object());
00678 actions_->open_link(link);
00679 }
00680
00681
00682
00683
00684
00685
00686 if (link->isopen() && !link->queue_is_full()) {
00687 log_debug("queuing *%p on *%p", bundle, link.object());
00688 actions_->queue_bundle(bundle, link, route->action(),
00689 route->custody_spec());
00690 return true;
00691 }
00692
00693
00694
00695 DeferredList* deferred = deferred_list(link);
00696 if (! bundle->is_queued_on(deferred->list())) {
00697 BundleRef bref(bundle, "TableBasedRouter::fwd_to_nexthop");
00698 ForwardingInfo info(ForwardingInfo::NONE,
00699 route->action(),
00700 link->name_str(),
00701 0xffffffff,
00702 link->remote_eid(),
00703 route->custody_spec());
00704 deferred->add(bref, info);
00705 } else {
00706 log_warn("bundle *%p already exists on deferred list of link *%p",
00707 bundle, link.object());
00708 }
00709
00710 if (!link->isavailable()) {
00711 log_debug("can't forward *%p to *%p because link not available",
00712 bundle, link.object());
00713 } else if (! link->isopen()) {
00714 log_debug("can't forward *%p to *%p because link not open",
00715 bundle, link.object());
00716 } else if (link->queue_is_full()) {
00717 log_debug("can't forward *%p to *%p because link queue is full",
00718 bundle, link.object());
00719 } else {
00720 log_debug("can't forward *%p to *%p", bundle, link.object());
00721 }
00722
00723 return false;
00724 }
00725
00726
00727 int
00728 TableBasedRouter::route_bundle(Bundle* bundle)
00729 {
00730 RouteEntryVec matches;
00731 RouteEntryVec::iterator iter;
00732
00733 log_debug("route_bundle: checking bundle %d", bundle->bundleid());
00734
00735
00736 if (bundle->fwdlog()->get_count(EndpointIDPattern::WILDCARD_EID(),
00737 ForwardingInfo::SUPPRESSED) > 0)
00738 {
00739 log_info("route_bundle: "
00740 "ignoring bundle %d since forwarding is suppressed",
00741 bundle->bundleid());
00742 return 0;
00743 }
00744
00745 LinkRef null_link("TableBasedRouter::route_bundle");
00746 route_table_->get_matching(bundle->dest(), null_link, &matches);
00747
00748
00749
00750 sort_routes(bundle, &matches);
00751
00752 log_debug("route_bundle bundle id %d: checking %zu route entry matches",
00753 bundle->bundleid(), matches.size());
00754
00755 unsigned int count = 0;
00756 for (iter = matches.begin(); iter != matches.end(); ++iter)
00757 {
00758 RouteEntry* route = *iter;
00759 log_debug("checking route entry %p link %s (%p)",
00760 *iter, route->link()->name(), route->link().object());
00761
00762 if (! should_fwd(bundle, *iter)) {
00763 continue;
00764 }
00765
00766 if (deferred_list(route->link())->list()->contains(bundle)) {
00767 log_debug("route_bundle bundle %d: "
00768 "ignoring link *%p since already deferred",
00769 bundle->bundleid(), route->link().object());
00770 continue;
00771 }
00772
00773
00774
00775
00776
00777
00778 check_next_hop(route->link());
00779
00780 if (!fwd_to_nexthop(bundle, *iter)) {
00781 continue;
00782 }
00783
00784 ++count;
00785 }
00786
00787 log_debug("route_bundle bundle id %d: forwarded on %u links",
00788 bundle->bundleid(), count);
00789 return count;
00790 }
00791
00792
00793 void
00794 TableBasedRouter::sort_routes(Bundle* bundle, RouteEntryVec* routes)
00795 {
00796 (void)bundle;
00797 std::sort(routes->begin(), routes->end(), RoutePrioritySort());
00798 }
00799
00800
00801 void
00802 TableBasedRouter::check_next_hop(const LinkRef& next_hop)
00803 {
00804
00805 if (! next_hop->isopen()) {
00806 log_debug("check_next_hop %s -> %s: link not open...",
00807 next_hop->name(), next_hop->nexthop());
00808 return;
00809 }
00810
00811
00812
00813 if (! next_hop->queue_has_space()) {
00814 log_debug("check_next_hop %s -> %s: no space in queue...",
00815 next_hop->name(), next_hop->nexthop());
00816 return;
00817 }
00818
00819 log_debug("check_next_hop %s -> %s: checking deferred bundle list...",
00820 next_hop->name(), next_hop->nexthop());
00821
00822
00823
00824
00825
00826 DeferredList* deferred = deferred_list(next_hop);
00827
00828 oasys::ScopeLock l(deferred->list()->lock(),
00829 "TableBasedRouter::check_next_hop");
00830 BundleList::iterator iter = deferred->list()->begin();
00831 while (iter != deferred->list()->end())
00832 {
00833 if (next_hop->queue_is_full()) {
00834 log_debug("check_next_hop %s: link queue is full, stopping loop",
00835 next_hop->name());
00836 break;
00837 }
00838
00839 BundleRef bundle("TableBasedRouter::check_next_hop");
00840 bundle = *iter;
00841 ++iter;
00842
00843 ForwardingInfo info = deferred->info(bundle);
00844
00845
00846
00847
00848
00849
00850 if (! BundleRouter::should_fwd(bundle.object(), next_hop,
00851 info.action()))
00852 {
00853 log_debug("check_next_hop: not forwarding to link %s",
00854 next_hop->name());
00855 continue;
00856 }
00857
00858
00859 if (next_hop->isavailable() &&
00860 (!next_hop->isopen()) && (!next_hop->isopening()))
00861 {
00862 log_debug("check_next_hop: "
00863 "opening *%p because a message is intended for it",
00864 next_hop.object());
00865 actions_->open_link(next_hop);
00866 }
00867
00868
00869 deferred->del(bundle);
00870
00871 log_debug("check_next_hop: sending *%p to *%p",
00872 bundle.object(), next_hop.object());
00873 actions_->queue_bundle(bundle.object() , next_hop,
00874 info.action(), info.custody_spec());
00875 }
00876 }
00877
00878
00879 void
00880 TableBasedRouter::reroute_all_bundles()
00881 {
00882 oasys::ScopeLock l(pending_bundles_->lock(),
00883 "TableBasedRouter::reroute_all_bundles");
00884
00885 log_debug("reroute_all_bundles... %zu bundles on pending list",
00886 pending_bundles_->size());
00887
00888
00889
00890
00891 BundleList::iterator iter;
00892 for (iter = pending_bundles_->begin();
00893 iter != pending_bundles_->end();
00894 ++iter)
00895 {
00896 route_bundle(*iter);
00897 }
00898 }
00899
00900
00901 void
00902 TableBasedRouter::recompute_routes()
00903 {
00904 reroute_all_bundles();
00905 }
00906
00907
00908 TableBasedRouter::DeferredList::DeferredList(const char* logpath,
00909 const LinkRef& link)
00910 : RouterInfo(),
00911 Logger("%s/deferred/%s", logpath, link->name()),
00912 list_(link->name_str() + ":deferred"),
00913 count_(0)
00914 {
00915 }
00916
00917
00918 void
00919 TableBasedRouter::DeferredList::dump_stats(oasys::StringBuffer* buf)
00920 {
00921 buf->appendf(" -- %zu bundles_deferred", count_);
00922 }
00923
00924
00925 bool
00926 TableBasedRouter::DeferredList::find(const BundleRef& bundle,
00927 ForwardingInfo* info)
00928 {
00929 InfoMap::const_iterator iter = info_.find(bundle->bundleid());
00930 if (iter == info_.end()) {
00931 return false;
00932 }
00933 *info = iter->second;
00934 return true;
00935 }
00936
00937
00938 const ForwardingInfo&
00939 TableBasedRouter::DeferredList::info(const BundleRef& bundle)
00940 {
00941 InfoMap::const_iterator iter = info_.find(bundle->bundleid());
00942 ASSERT(iter != info_.end());
00943 return iter->second;
00944 }
00945
00946
00947 bool
00948 TableBasedRouter::DeferredList::add(const BundleRef& bundle,
00949 const ForwardingInfo& info)
00950 {
00951 if (list_.contains(bundle)) {
00952 log_err("bundle *%p already in deferred list!",
00953 bundle.object());
00954 return false;
00955 }
00956
00957 log_debug("adding *%p to deferred (length %zu)",
00958 bundle.object(), count_);
00959
00960 count_++;
00961 list_.push_back(bundle);
00962
00963 info_.insert(InfoMap::value_type(bundle->bundleid(), info));
00964
00965 return true;
00966 }
00967
00968
00969 bool
00970 TableBasedRouter::DeferredList::del(const BundleRef& bundle)
00971 {
00972 if (! list_.erase(bundle)) {
00973 return false;
00974 }
00975
00976 ASSERT(count_ > 0);
00977 count_--;
00978
00979 log_debug("removed *%p from deferred (length %zu)",
00980 bundle.object(), count_);
00981
00982 size_t n = info_.erase(bundle->bundleid());
00983 ASSERT(n == 1);
00984
00985 return true;
00986 }
00987
00988
00989 TableBasedRouter::DeferredList*
00990 TableBasedRouter::deferred_list(const LinkRef& link)
00991 {
00992 DeferredList* dq = dynamic_cast<DeferredList*>(link->router_info());
00993 ASSERT(dq != NULL);
00994 return dq;
00995 }
00996
00997
00998
00999 void
01000 TableBasedRouter::handle_registration_added(RegistrationAddedEvent* event)
01001 {
01002 Registration* reg = event->registration_;
01003
01004 if (reg == NULL || reg->session_flags() == 0) {
01005 return;
01006 }
01007
01008 log_debug("got new session registration %u", reg->regid());
01009
01010 if (reg->session_flags() & Session::CUSTODY) {
01011 log_debug("session custodian registration %u", reg->regid());
01012 session_custodians_.push_back(reg);
01013 }
01014
01015 else if (reg->session_flags() & Session::SUBSCRIBE) {
01016 log_debug("session subscription registration %u", reg->regid());
01017 Session* session = sessions_.get_session(reg->endpoint());
01018 session->add_subscriber(Subscriber(reg));
01019 subscribe_to_session(Session::SUBSCRIBE, session);
01020 }
01021
01022 else if (reg->session_flags() & Session::PUBLISH) {
01023 log_debug("session publish registration %u", reg->regid());
01024
01025 Session* session = sessions_.get_session(reg->endpoint());
01026 if (session->upstream().is_null()) {
01027 log_debug("unknown upstream for publish registration... "
01028 "trying to find one");
01029 find_session_upstream(session);
01030 }
01031
01032
01033 }
01034 }
01035
01036
01037 bool
01038 TableBasedRouter::subscribe_to_session(int mode, Session* session)
01039 {
01040 if (! session->upstream().is_local()) {
01041
01042
01043
01044 Bundle* bundle = new TempBundle();
01045 bundle->set_do_not_fragment(1);
01046 bundle->mutable_source()->assign(BundleDaemon::instance()->local_eid());
01047 bundle->mutable_dest()->assign("dtn-session:" + session->eid().str());
01048 bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
01049 bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
01050 bundle->set_expiration(config_.subscription_timeout_);
01051 bundle->set_singleton_dest(true);
01052 bundle->mutable_session_eid()->assign(session->eid());
01053 bundle->set_session_flags(mode);
01054 bundle->mutable_sequence_id()->assign(*session->sequence_id());
01055
01056 log_debug("sending subscribe bundle to session %s (timeout %u seconds)",
01057 session->eid().c_str(), config_.subscription_timeout_);
01058
01059 BundleDaemon::post_at_head(
01060 new BundleReceivedEvent(bundle, EVENTSRC_ROUTER));
01061
01062 if (session->resubscribe_timer() != NULL) {
01063 log_debug("cancelling old resubscribe timer");
01064 session->resubscribe_timer()->cancel();
01065 }
01066
01067 u_int resubscribe_timeout = config_.subscription_timeout_ * 1000 / 2;
01068 log_debug("scheduling resubscribe timer in %u msecs",
01069 resubscribe_timeout);
01070 ResubscribeTimer* timer = new ResubscribeTimer(this, session);
01071 timer->schedule_in(resubscribe_timeout);
01072 session->set_resubscribe_timer(timer);
01073
01074 } else {
01075
01076 log_debug("local upstream source: notifying registration");
01077 }
01078
01079 return true;
01080 }
01081
01082
01083 TableBasedRouter::ResubscribeTimer::ResubscribeTimer(TableBasedRouter* router,
01084 Session* session)
01085 : router_(router), session_(session)
01086 {
01087 }
01088
01089
01090 void
01091 TableBasedRouter::ResubscribeTimer::timeout(const struct timeval& now)
01092 {
01093 (void)now;
01094 router_->logf(oasys::LOG_DEBUG, "resubscribe timer fired for session *%p",
01095 session_);
01096 router_->subscribe_to_session(Session::RESUBSCRIBE, session_);
01097 session_->set_resubscribe_timer(NULL);
01098 delete this;
01099 }
01100
01101
01102 bool
01103 TableBasedRouter::handle_session_bundle(BundleReceivedEvent* event)
01104 {
01105 Bundle* bundle = event->bundleref_.object();
01106
01107 ASSERT(bundle->session_flags() != 0);
01108 ASSERT(bundle->session_eid() != EndpointID::NULL_EID());
01109
01110 Session* session = sessions_.get_session(bundle->session_eid());
01111
01112 log_debug("handle_session_bundle: got bundle *%p for session %d",
01113 bundle, session->id());
01114
01115
01116 if (event->source_ == EVENTSRC_STORE) {
01117 log_err("handle_session_bundle: can't handle reload from db yet");
01118 return false;
01119 }
01120
01121 bool should_route = true;
01122 switch (bundle->session_flags()) {
01123 case Session::SUBSCRIBE:
01124 case Session::RESUBSCRIBE:
01125 {
01126
01127
01128
01129 if (session->upstream().is_null()) {
01130 log_debug("handle_session_bundle: "
01131 "unknown upstream... trying to find one");
01132
01133 if (find_session_upstream(session))
01134 {
01135 ASSERT(!session->upstream().is_null());
01136
01137 const Subscriber& upstream = session->upstream();
01138 if (upstream.is_local())
01139 {
01140 log_debug("handle_session_bundle: "
01141 "forwarding %s bundle to upstream registration",
01142 Session::flag_str(bundle->session_flags()));
01143 upstream.reg()->session_notify(bundle);
01144 should_route = false;
01145 }
01146 else
01147 {
01148 log_debug("handle_session_bundle: "
01149 "found upstream *%p... routing bundle",
01150 &upstream);
01151 }
01152 }
01153 else
01154 {
01155
01156
01157
01158 log_info("can't find an upstream for session %s... "
01159 "waiting until route arrives",
01160 session->eid().c_str());
01161 }
01162 }
01163 else
01164 {
01165 const Subscriber& upstream = session->upstream();
01166 log_debug("handle_session_bundle: "
01167 "already subscribed to session through upstream *%p... "
01168 "suppressing subscription bundle %u",
01169 &upstream, bundle->bundleid());
01170
01171 bundle->fwdlog()->add_entry(EndpointIDPattern::WILDCARD_EID(),
01172 ForwardingInfo::FORWARD_ACTION,
01173 ForwardingInfo::SUPPRESSED);
01174 should_route = false;
01175 }
01176
01177
01178
01179
01180
01181 if (event->source_ == EVENTSRC_PEER)
01182 {
01183 if (bundle->prevhop().str() != "" &&
01184 bundle->prevhop() != EndpointID::NULL_EID())
01185 {
01186 log_debug("handle_session_bundle: "
01187 "adding downstream subscriber %s (seqid *%p)",
01188 bundle->prevhop().c_str(), &bundle->sequence_id());
01189
01190 add_subscriber(session, bundle->prevhop(), bundle->sequence_id());
01191 }
01192 else
01193 {
01194
01195 log_err("handle_session_bundle: "
01196 "downstream subscriber with no prevhop!!!!");
01197 }
01198 }
01199 break;
01200 }
01201
01202 default:
01203 {
01204 log_err("session flags %x not implemented", bundle->session_flags());
01205 }
01206 }
01207
01208 return should_route;
01209 }
01210
01211
01212 void
01213 TableBasedRouter::reroute_all_sessions()
01214 {
01215 log_debug("reroute_all_bundles... %zu sessions",
01216 sessions_.size());
01217
01218 for (SessionTable::iterator iter = sessions_.begin();
01219 iter != sessions_.end(); ++iter)
01220 {
01221 find_session_upstream(iter->second);
01222 }
01223 }
01224
01225
01226 bool
01227 TableBasedRouter::find_session_upstream(Session* session)
01228 {
01229
01230 for (RegistrationList::iterator iter = session_custodians_.begin();
01231 iter != session_custodians_.end(); ++iter)
01232 {
01233 Registration* reg = *iter;
01234 if (reg->endpoint().match(session->eid())) {
01235 Subscriber new_upstream(reg);
01236 if (session->upstream() == new_upstream) {
01237 log_debug("find_session_upstream: "
01238 "session %s upstream custody registration %d unchanged",
01239 session->eid().c_str(), reg->regid());
01240 } else {
01241 log_debug("find_session_upstream: "
01242 "session %s found new custody registration %d",
01243 session->eid().c_str(), reg->regid());
01244 session->set_upstream(new_upstream);
01245 }
01246 return true;
01247 }
01248 }
01249
01250
01251
01252
01253 RouteEntryVec matches;
01254 RouteEntryVec::iterator iter;
01255
01256 EndpointID subscribe_eid("dtn-session:" + session->eid().str());
01257 route_table_->get_matching(subscribe_eid, &matches);
01258
01259
01260
01261
01262 for (iter = matches.begin(); iter != matches.end(); ++iter)
01263 {
01264 const LinkRef& link = (*iter)->link();
01265 if (link->remote_eid().str() == "" ||
01266 link->remote_eid() == EndpointID::NULL_EID())
01267 {
01268 log_warn("find_session_upstream: "
01269 "got route match with no remote eid");
01270
01271 continue;
01272 }
01273
01274 Subscriber new_upstream(link->remote_eid());
01275 if (session->upstream() == new_upstream) {
01276 log_debug("find_session_upstream: "
01277 "session %s found existing upstream %s",
01278 session->eid().c_str(), link->remote_eid().c_str());
01279 } else {
01280 log_debug("find_session_upstream: session %s new upstream %s",
01281 session->eid().c_str(), link->remote_eid().c_str());
01282 session->set_upstream(Subscriber(link->remote_eid()));
01283 add_subscriber(session, link->remote_eid(), SequenceID());
01284 }
01285 return true;
01286 }
01287
01288 log_warn("find_session_upstream: can't find upstream for session %s",
01289 session->eid().c_str());
01290 return false;
01291 }
01292
01293
01294 void
01295 TableBasedRouter::add_subscriber(Session* session,
01296 const EndpointID& peer,
01297 const SequenceID& known_seqid)
01298 {
01299 log_debug("adding new subscriber for session %s -> %s",
01300 session->eid().c_str(), peer.c_str());
01301
01302 session->add_subscriber(Subscriber(peer));
01303
01304
01305
01306 RouteEntry *entry = new RouteEntry(session->eid(), peer);
01307 entry->set_action(ForwardingInfo::COPY_ACTION);
01308 route_table_->add_entry(entry);
01309
01310 log_debug("routing %zu session bundles", session->bundles()->size());
01311 oasys::ScopeLock l(session->bundles()->lock(),
01312 "TableBasedRouter::add_subscriber");
01313 for (BundleList::iterator iter = session->bundles()->begin();
01314 iter != session->bundles()->end(); ++iter)
01315 {
01316 Bundle* bundle = *iter;
01317 if (! bundle->sequence_id().empty() &&
01318 bundle->sequence_id() <= known_seqid)
01319 {
01320 log_debug("suppressing transmission of bundle %u (seqid *%p) "
01321 "to subscriber %s since covered by seqid *%p",
01322 bundle->bundleid(), &bundle->sequence_id(),
01323 peer.c_str(), &known_seqid);
01324 bundle->fwdlog()->add_entry(peer, ForwardingInfo::COPY_ACTION,
01325 ForwardingInfo::SUPPRESSED);
01326 continue;
01327 }
01328
01329 route_bundle(*iter);
01330 }
01331 }
01332
01333
01334 void
01335 TableBasedRouter::handle_registration_removed(RegistrationRemovedEvent* event)
01336 {
01337 (void)event;
01338 }
01339
01340
01341 void
01342 TableBasedRouter::handle_registration_expired(RegistrationExpiredEvent* event)
01343 {
01344
01345
01346 (void)event;
01347 }
01348
01349
01350 }