00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "BundleActions.h"
00019 #include "Bundle.h"
00020 #include "BundleDaemon.h"
00021 #include "BundleList.h"
00022 #include "conv_layers/ConvergenceLayer.h"
00023 #include "contacts/Link.h"
00024 #include "storage/BundleStore.h"
00025
00026 namespace dtn {
00027
00028
00029 void
00030 BundleActions::open_link(Link* link)
00031 {
00032 log_debug("opening link %s", link->name());
00033
00034 if (link->isopen() || link->contact() != NULL) {
00035 log_err("not opening link %s since already open", link->name());
00036 return;
00037 }
00038
00039 if (! link->isavailable()) {
00040 log_err("not opening link %s since not available", link->name());
00041 return;
00042 }
00043
00044 link->open();
00045 }
00046
00047
00048 void
00049 BundleActions::close_link(Link* link)
00050 {
00051 log_debug("closing link %s", link->name());
00052
00053 if (! link->isopen() && ! link->isopening()) {
00054 log_err("not closing link %s since not open", link->name());
00055 return;
00056 }
00057
00058 link->close();
00059 ASSERT(link->contact() == NULL);
00060 }
00061
00062
00063 bool
00064 BundleActions::send_bundle(Bundle* bundle, Link* link,
00065 ForwardingInfo::action_t action,
00066 const CustodyTimerSpec& custody_timer)
00067 {
00068 if(bundle->xmit_blocks_.find_blocks(link) != NULL)
00069 {
00070 log_err("link not ready to handle another bundle, dropping send request");
00071 return false;
00072 }
00073
00074
00075
00076
00077
00078 BlockInfoVec* blocks = BundleProtocol::prepare_blocks(bundle, link);
00079 size_t total_len = BundleProtocol::generate_blocks(bundle, blocks, link);
00080
00081 log_debug("send bundle *%p to %s link %s (%s) (total len %zu)",
00082 bundle, link->type_str(), link->name(), link->nexthop(),
00083 total_len);
00084
00085 if (link->state() != Link::OPEN) {
00086 log_err("send bundle *%p to %s link %s (%s): link not open!!",
00087 bundle, link->type_str(), link->name(), link->nexthop());
00088 return false;
00089 }
00090
00091 ForwardingInfo::state_t state = bundle->fwdlog_.get_latest_entry(link);
00092 if (state == ForwardingInfo::IN_FLIGHT) {
00093 log_err("send bundle *%p to %s link %s (%s): already in flight",
00094 bundle, link->type_str(), link->name(), link->nexthop());
00095 return false;
00096 }
00097
00098 if ((link->params().mtu_ != 0) && (total_len > link->params().mtu_)) {
00099 log_err("send bundle *%p to %s link %s (%s): length %zu > mtu %u",
00100 bundle, link->type_str(), link->name(), link->nexthop(),
00101 total_len, link->params().mtu_);
00102 return false;
00103 }
00104
00105 bundle->fwdlog_.add_entry(link, action, ForwardingInfo::IN_FLIGHT,
00106 custody_timer);
00107
00108 link->stats()->bundles_queued_++;
00109 link->stats()->bytes_queued_ += total_len;
00110
00111 ASSERT(link->contact() != NULL);
00112 link->clayer()->send_bundle(link->contact(), bundle);
00113 return true;
00114 }
00115
00116
00117 bool
00118 BundleActions::cancel_bundle(Bundle* bundle, Link* link)
00119 {
00120 log_debug("cancel bundle *%p on %s link %s (%s)",
00121 bundle, link->type_str(), link->name(), link->nexthop());
00122
00123 if (link->state() != Link::OPEN) {
00124 return false;
00125 }
00126
00127 ASSERT(link->contact() != NULL);
00128 bundle->fwdlog_.update(link, ForwardingInfo::CANCELLED);
00129
00130 return link->clayer()->cancel_bundle(link->contact(), bundle);
00131 }
00132
00133
00134 void
00135 BundleActions::inject_bundle(Bundle* bundle)
00136 {
00137 PANIC("XXX/demmer fix this");
00138
00139 log_debug("inject bundle *%p", bundle);
00140 BundleDaemon::instance()->pending_bundles()->push_back(bundle);
00141 store_add(bundle);
00142 }
00143
00144
00145 bool
00146 BundleActions::delete_bundle(Bundle* bundle,
00147 BundleProtocol::status_report_reason_t reason,
00148 bool log_on_error)
00149 {
00150 log_debug("attempting to delete bundle *%p from data store", bundle);
00151 bool del = BundleDaemon::instance()->delete_bundle(bundle, reason);
00152
00153 if (log_on_error && !del) {
00154 log_err("Failed to delete bundle *%p from data store", bundle);
00155 }
00156 return del;
00157 }
00158
00159
00160 void
00161 BundleActions::store_add(Bundle* bundle)
00162 {
00163 log_debug("adding bundle %d to data store", bundle->bundleid_);
00164 bool added = BundleStore::instance()->add(bundle);
00165 if (! added) {
00166 log_crit("error adding bundle %d to data store!!", bundle->bundleid_);
00167 }
00168 }
00169
00170
00171 void
00172 BundleActions::store_update(Bundle* bundle)
00173 {
00174 log_debug("updating bundle %d in data store", bundle->bundleid_);
00175 bool updated = BundleStore::instance()->update(bundle);
00176 if (! updated) {
00177 log_crit("error updating bundle %d in data store!!", bundle->bundleid_);
00178 }
00179 }
00180
00181
00182 void
00183 BundleActions::store_del(Bundle* bundle)
00184 {
00185 log_debug("removing bundle %d from data store", bundle->bundleid_);
00186 bool removed = BundleStore::instance()->del(bundle);
00187 if (! removed) {
00188 log_crit("error removing bundle %d from data store!!",
00189 bundle->bundleid_);
00190 }
00191 }
00192
00193 }