00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/debug/DebugUtils.h>
00022 #include <oasys/thread/SpinLock.h>
00023
00024 #include "Bundle.h"
00025 #include "BundleDaemon.h"
00026 #include "BundleList.h"
00027 #include "ExpirationTimer.h"
00028
00029 #include "storage/GlobalStore.h"
00030
00031 namespace dtn {
00032
00033
00034 void
00035 Bundle::init(u_int32_t id)
00036 {
00037 bundleid_ = id;
00038 is_fragment_ = false;
00039 is_admin_ = false;
00040 do_not_fragment_ = false;
00041 in_datastore_ = false;
00042 custody_requested_ = false;
00043 local_custody_ = false;
00044 singleton_dest_ = true;
00045 priority_ = COS_NORMAL;
00046 receive_rcpt_ = false;
00047 custody_rcpt_ = false;
00048 forward_rcpt_ = false;
00049 delivery_rcpt_ = false;
00050 deletion_rcpt_ = false;
00051 app_acked_rcpt_ = false;
00052 orig_length_ = 0;
00053 frag_offset_ = 0;
00054 expiration_ = 0;
00055 owner_ = "";
00056 fragmented_incoming_= false;
00057 session_flags_ = 0;
00058
00059
00060
00061
00062 creation_ts_.seconds_ = BundleTimestamp::get_current_time();
00063 creation_ts_.seqno_ = bundleid_;
00064
00065
00066
00067
00068
00069
00070
00071 extended_id_ = creation_ts_;
00072
00073 log_debug_p("/dtn/bundle", "Bundle::init bundle id %d", id);
00074 }
00075
00076
00077 Bundle::Bundle(BundlePayload::location_t location)
00078 : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_),
00079 recv_metadata_("recv_metadata")
00080 {
00081 u_int32_t id = GlobalStore::instance()->next_bundleid();
00082 init(id);
00083 payload_.init(id, location);
00084 refcount_ = 0;
00085 expiration_timer_ = NULL;
00086 freed_ = false;
00087 }
00088
00089
00090 Bundle::Bundle(const oasys::Builder&)
00091 : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_),
00092 recv_metadata_("recv_metadata")
00093 {
00094
00095
00096
00097
00098 init(0xffffffff);
00099 refcount_ = 0;
00100 expiration_timer_ = NULL;
00101 freed_ = false;
00102 }
00103
00104
00105 Bundle::~Bundle()
00106 {
00107 log_debug_p("/dtn/bundle/free", "destroying bundle id %d", bundleid_);
00108
00109 ASSERT(mappings_.size() == 0);
00110 bundleid_ = 0xdeadf00d;
00111
00112 ASSERTF(expiration_timer_ == NULL,
00113 "bundle deleted with pending expiration timer");
00114
00115 }
00116
00117
00118 int
00119 Bundle::format(char* buf, size_t sz) const
00120 {
00121 if (is_admin()) {
00122 return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, is_admin]",
00123 bundleid_, source_.c_str(), dest_.c_str(),
00124 payload_.length());
00125 } else if (is_fragment()) {
00126 return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, fragment @%u/%u]",
00127 bundleid_, source_.c_str(), dest_.c_str(),
00128 payload_.length(), frag_offset_, orig_length_);
00129 } else {
00130 return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload]",
00131 bundleid_, source_.c_str(), dest_.c_str(),
00132 payload_.length());
00133 }
00134 }
00135
00136
00137 void
00138 Bundle::format_verbose(oasys::StringBuffer* buf)
00139 {
00140
00141 #define bool_to_str(x) ((x) ? "true" : "false")
00142
00143 buf->appendf("bundle id %d:\n", bundleid_);
00144 buf->appendf(" source: %s\n", source_.c_str());
00145 buf->appendf(" dest: %s\n", dest_.c_str());
00146 buf->appendf(" custodian: %s\n", custodian_.c_str());
00147 buf->appendf(" replyto: %s\n", replyto_.c_str());
00148 buf->appendf(" prevhop: %s\n", prevhop_.c_str());
00149 buf->appendf(" payload_length: %zu\n", payload_.length());
00150 buf->appendf(" priority: %d\n", priority_);
00151 buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_));
00152 buf->appendf(" local_custody: %s\n", bool_to_str(local_custody_));
00153 buf->appendf(" singleton_dest: %s\n", bool_to_str(singleton_dest_));
00154 buf->appendf(" receive_rcpt: %s\n", bool_to_str(receive_rcpt_));
00155 buf->appendf(" custody_rcpt: %s\n", bool_to_str(custody_rcpt_));
00156 buf->appendf(" forward_rcpt: %s\n", bool_to_str(forward_rcpt_));
00157 buf->appendf(" delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_));
00158 buf->appendf(" deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_));
00159 buf->appendf(" app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_));
00160 buf->appendf(" creation_ts: %u.%u\n",
00161 creation_ts_.seconds_, creation_ts_.seqno_);
00162 buf->appendf(" expiration: %d\n", expiration_);
00163 buf->appendf(" is_fragment: %s\n", bool_to_str(is_fragment_));
00164 buf->appendf(" is_admin: %s\n", bool_to_str(is_admin_));
00165 buf->appendf(" do_not_fragment: %s\n", bool_to_str(do_not_fragment_));
00166 buf->appendf(" orig_length: %d\n", orig_length_);
00167 buf->appendf(" frag_offset: %d\n", frag_offset_);
00168 buf->appendf(" sequence_id: %s\n", sequence_id_.to_str().c_str());
00169 buf->appendf(" obsoletes_id: %s\n", obsoletes_id_.to_str().c_str());
00170 buf->appendf(" session_eid: %s\n", session_eid_.c_str());
00171 buf->appendf(" session_flags: 0x%x\n", session_flags_);
00172 buf->append("\n");
00173
00174 buf->appendf("forwarding log:\n");
00175 fwdlog_.dump(buf);
00176 buf->append("\n");
00177
00178 oasys::ScopeLock l(&lock_, "Bundle::format_verbose");
00179 buf->appendf("queued on %zu lists:\n", mappings_.size());
00180 for (BundleMappings::iterator i = mappings_.begin();
00181 i != mappings_.end(); ++i) {
00182 buf->appendf("\t%s\n", i->list()->name().c_str());
00183 }
00184
00185 buf->append("\nblocks:");
00186 for (BlockInfoVec::iterator iter = recv_blocks_.begin();
00187 iter != recv_blocks_.end();
00188 ++iter)
00189 {
00190 buf->appendf("\n type: 0x%02x ", iter->type());
00191 if (iter->data_offset() == 0)
00192 buf->append("(runt)");
00193 else {
00194 if (!iter->complete())
00195 buf->append("(incomplete) ");
00196 buf->appendf("data length: %d", iter->full_length());
00197 }
00198 }
00199 if (api_blocks_.size() > 0) {
00200 buf->append("\napi_blocks:");
00201 for (BlockInfoVec::iterator iter = api_blocks_.begin();
00202 iter != api_blocks_.end();
00203 ++iter)
00204 {
00205 buf->appendf("\n type: 0x%02x data length: %d",
00206 iter->type(), iter->full_length());
00207 }
00208 }
00209 buf->append("\n");
00210 }
00211
00212
00213 void
00214 Bundle::serialize(oasys::SerializeAction* a)
00215 {
00216 a->process("bundleid", &bundleid_);
00217 a->process("is_fragment", &is_fragment_);
00218 a->process("is_admin", &is_admin_);
00219 a->process("do_not_fragment", &do_not_fragment_);
00220 a->process("source", &source_);
00221 a->process("dest", &dest_);
00222 a->process("custodian", &custodian_);
00223 a->process("replyto", &replyto_);
00224 a->process("prevhop", &prevhop_);
00225 a->process("priority", &priority_);
00226 a->process("custody_requested", &custody_requested_);
00227 a->process("local_custody", &local_custody_);
00228 a->process("singleton_dest", &singleton_dest_);
00229 a->process("custody_rcpt", &custody_rcpt_);
00230 a->process("receive_rcpt", &receive_rcpt_);
00231 a->process("forward_rcpt", &forward_rcpt_);
00232 a->process("delivery_rcpt", &delivery_rcpt_);
00233 a->process("deletion_rcpt", &deletion_rcpt_);
00234 a->process("app_acked_rcpt", &app_acked_rcpt_);
00235 a->process("creation_ts_seconds", &creation_ts_.seconds_);
00236 a->process("creation_ts_seqno", &creation_ts_.seqno_);
00237 a->process("expiration", &expiration_);
00238 a->process("payload", &payload_);
00239 a->process("orig_length", &orig_length_);
00240 a->process("frag_offset", &frag_offset_);
00241 a->process("owner", &owner_);
00242 a->process("session_eid", &session_eid_);
00243 a->process("session_flags", &session_flags_);
00244 a->process("extended_id_seconds", &extended_id_.seconds_);
00245 a->process("extended_id_seqno", &extended_id_.seqno_);
00246 a->process("recv_blocks", &recv_blocks_);
00247 a->process("api_blocks", &api_blocks_);
00248
00249
00250
00251
00252
00253 if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00254 in_datastore_ = true;
00255 payload_.init_from_store(bundleid_);
00256 }
00257 }
00258
00259
00260 void
00261 Bundle::copy_metadata(Bundle* new_bundle) const
00262 {
00263 new_bundle->is_admin_ = is_admin_;
00264 new_bundle->is_fragment_ = is_fragment_;
00265 new_bundle->do_not_fragment_ = do_not_fragment_;
00266 new_bundle->source_ = source_;
00267 new_bundle->dest_ = dest_;
00268 new_bundle->custodian_ = custodian_;
00269 new_bundle->replyto_ = replyto_;
00270 new_bundle->priority_ = priority_;
00271 new_bundle->custody_requested_ = custody_requested_;
00272 new_bundle->local_custody_ = false;
00273 new_bundle->singleton_dest_ = singleton_dest_;
00274 new_bundle->custody_rcpt_ = custody_rcpt_;
00275 new_bundle->receive_rcpt_ = receive_rcpt_;
00276 new_bundle->forward_rcpt_ = forward_rcpt_;
00277 new_bundle->delivery_rcpt_ = delivery_rcpt_;
00278 new_bundle->deletion_rcpt_ = deletion_rcpt_;
00279 new_bundle->app_acked_rcpt_ = app_acked_rcpt_;
00280 new_bundle->creation_ts_ = creation_ts_;
00281 new_bundle->expiration_ = expiration_;
00282 }
00283
00284
00285 int
00286 Bundle::add_ref(const char* what1, const char* what2)
00287 {
00288 (void)what1;
00289 (void)what2;
00290
00291 oasys::ScopeLock l(&lock_, "Bundle::add_ref");
00292
00293 ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)"
00294 "called when bundle is already being freed!", bundleid_, this);
00295
00296 ASSERT(refcount_ >= 0);
00297 int ret = ++refcount_;
00298 log_debug_p("/dtn/bundle/refs",
00299 "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s",
00300 bundleid_, this, refcount_ - 1, refcount_,
00301 mappings_.size(), what1, what2);
00302 return ret;
00303 }
00304
00305
00306 int
00307 Bundle::del_ref(const char* what1, const char* what2)
00308 {
00309 (void)what1;
00310 (void)what2;
00311
00312 oasys::ScopeLock l(&lock_, "Bundle::del_ref");
00313
00314 ASSERTF(freed_ == false, "Bundle::del_ref on bundle %d (%p)"
00315 "called when bundle is already being freed!", bundleid_, this);
00316
00317 int ret = --refcount_;
00318 log_debug_p("/dtn/bundle/refs",
00319 "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s",
00320 bundleid_, this, refcount_ + 1, refcount_,
00321 mappings_.size(), what1, what2);
00322
00323 if (refcount_ != 0) {
00324 return ret;
00325 }
00326
00327 freed_ = true;
00328
00329 log_debug_p("/dtn/bundle",
00330 "bundle id %d (%p): no more references, posting free event",
00331 bundleid_, this);
00332
00333 BundleDaemon::instance()->post(new BundleFreeEvent(this));
00334
00335 return 0;
00336 }
00337
00338
00339 size_t
00340 Bundle::num_mappings()
00341 {
00342 oasys::ScopeLock l(&lock_, "Bundle::num_mappings");
00343 return mappings_.size();
00344 }
00345
00346
00347 BundleMappings*
00348 Bundle::mappings()
00349 {
00350 ASSERTF(lock_.is_locked_by_me(),
00351 "Must lock Bundle before using mappings iterator");
00352
00353 return &mappings_;
00354 }
00355
00356
00357 bool
00358 Bundle::is_queued_on(const BundleList* bundle_list)
00359 {
00360 oasys::ScopeLock l(&lock_, "Bundle::is_queued_on");
00361 return mappings_.contains(bundle_list);
00362 }
00363
00364
00365 bool
00366 Bundle::validate(oasys::StringBuffer* errbuf)
00367 {
00368 if (!source_.valid()) {
00369 errbuf->appendf("invalid source eid [%s]", source_.c_str());
00370 return false;
00371 }
00372
00373 if (!dest_.valid()) {
00374 errbuf->appendf("invalid dest eid [%s]", dest_.c_str());
00375 return false;
00376 }
00377
00378 if (!replyto_.valid()) {
00379 errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str());
00380 return false;
00381 }
00382
00383 if (!custodian_.valid()) {
00384 errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str());
00385 return false;
00386 }
00387
00388 return true;
00389
00390 }
00391
00392 }