00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <algorithm>
00022 #include <stdlib.h>
00023 #include <oasys/thread/SpinLock.h>
00024
00025 #include "Bundle.h"
00026 #include "BundleList.h"
00027 #include "BundleMappings.h"
00028 #include "BundleTimestamp.h"
00029
00030 namespace dtn {
00031
00032
00033 BundleList::BundleList(const std::string& name, oasys::SpinLock* lock)
00034 : Logger("BundleList", "/dtn/bundle/list/%s", name.c_str()),
00035 name_(name), notifier_(NULL)
00036 {
00037 if (lock != NULL) {
00038 lock_ = lock;
00039 own_lock_ = false;
00040 } else {
00041 lock_ = new oasys::SpinLock();
00042 own_lock_ = true;
00043 }
00044 }
00045
00046
00047 void
00048 BundleList::set_name(const std::string& name)
00049 {
00050 name_ = name;
00051 logpathf("/dtn/bundle/list/%s", name.c_str());
00052 }
00053
00054
00055 BundleList::~BundleList()
00056 {
00057 clear();
00058 if (own_lock_) {
00059 delete lock_;
00060 }
00061 lock_ = NULL;
00062 }
00063
00064
00065 BundleRef
00066 BundleList::front() const
00067 {
00068 oasys::ScopeLock l(lock_, "BundleList::front");
00069
00070 BundleRef ret("BundleList::front() temporary");
00071 if (list_.empty())
00072 return ret;
00073
00074 ret = list_.front();
00075 return ret;
00076 }
00077
00078
00079 BundleRef
00080 BundleList::back() const
00081 {
00082 oasys::ScopeLock l(lock_, "BundleList::back");
00083
00084 BundleRef ret("BundleList::back() temporary");
00085 if (list_.empty())
00086 return ret;
00087
00088 ret = list_.back();
00089 return ret;
00090 }
00091
00092
00093 void
00094 BundleList::add_bundle(Bundle* b, const iterator& pos)
00095 {
00096 ASSERT(lock_->is_locked_by_me());
00097 ASSERT(b->lock()->is_locked_by_me());
00098
00099 if (b->is_queued_on(this)) {
00100 log_err("ERROR in add bundle: "
00101 "bundle id %d already on list [%s]",
00102 b->bundleid(), name_.c_str());
00103
00104 return;
00105 }
00106
00107 iterator new_pos = list_.insert(pos, b);
00108 b->mappings()->push_back(BundleMapping(this, new_pos));
00109 b->add_ref("bundle_list", name_.c_str());
00110
00111 if (notifier_ != 0) {
00112 notifier_->notify();
00113 }
00114
00115 log_debug("bundle id %d add mapping [%s] to list %p",
00116 b->bundleid(), name_.c_str(), this);
00117 }
00118
00119
00120 void
00121 BundleList::push_front(Bundle* b)
00122 {
00123 oasys::ScopeLock l(lock_, "BundleList::push_front");
00124 oasys::ScopeLock bl(b->lock(), "BundleList::push_front");
00125 add_bundle(b, list_.begin());
00126 }
00127
00128
00129 void
00130 BundleList::push_back(Bundle* b)
00131 {
00132 oasys::ScopeLock l(lock_, "BundleList::push_back");
00133 oasys::ScopeLock bl(b->lock(), "BundleList::push_back");
00134 add_bundle(b, list_.end());
00135 }
00136
00137
00138 void
00139 BundleList::insert_sorted(Bundle* b, sort_order_t sort_order)
00140 {
00141 iterator iter;
00142 oasys::ScopeLock l(lock_, "BundleList::insert_sorted");
00143 oasys::ScopeLock bl(b->lock(), "BundleList::insert_sorted");
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154 for (iter = list_.begin(); iter != list_.end(); ++iter)
00155 {
00156 if (sort_order == SORT_FRAG_OFFSET) {
00157 if ((*iter)->frag_offset() > b->frag_offset()) {
00158 break;
00159 }
00160
00161 } else if (sort_order == SORT_PRIORITY) {
00162 NOTIMPLEMENTED;
00163
00164 } else {
00165 PANIC("invalid value for sort order %d", sort_order);
00166 }
00167 }
00168
00169 add_bundle(b, iter);
00170 }
00171
00172
00173 void
00174 BundleList::insert_random(Bundle* b)
00175 {
00176 iterator iter;
00177 oasys::ScopeLock l(lock_, "BundleList::insert_random");
00178 oasys::ScopeLock bl(b->lock(), "BundleList::insert_random");
00179
00180 iter = begin();
00181 int location = 0;
00182 if (! empty()) {
00183 location = random() % size();
00184 }
00185
00186 log_info("insert_random at %d/%zu", location, size());
00187
00188 for (int i = 0; i < location; ++i) {
00189 ++iter;
00190 }
00191
00192 add_bundle(b, iter);
00193 }
00194
00195
00196 Bundle*
00197 BundleList::del_bundle(const iterator& pos, bool used_notifier)
00198 {
00199 Bundle* b = *pos;
00200 ASSERT(lock_->is_locked_by_me());
00201
00202
00203 oasys::ScopeLock l(b->lock(), "BundleList::del_bundle");
00204
00205
00206 log_debug("bundle id %d del_bundle: deleting mapping [%s]",
00207 b->bundleid(), name_.c_str());
00208 BundleMappings::iterator mapping = b->mappings()->find(this);
00209 if (mapping == b->mappings()->end()) {
00210 log_err("ERROR in del bundle: "
00211 "bundle id %d has no mapping for list [%s]",
00212 b->bundleid(), name_.c_str());
00213 } else {
00214 ASSERT(mapping->list() == this);
00215 ASSERT(mapping->position() == pos);
00216 b->mappings()->erase(mapping);
00217 }
00218
00219
00220 list_.erase(pos);
00221
00222
00223 if (notifier_ && !used_notifier) {
00224 notifier_->drain_pipe(1);
00225 }
00226
00227
00228
00229
00230 return b;
00231 }
00232
00233
00234 BundleRef
00235 BundleList::pop_front(bool used_notifier)
00236 {
00237 oasys::ScopeLock l(lock_, "pop_front");
00238
00239 BundleRef ret("BundleList::pop_front() temporary");
00240
00241 if (list_.empty()) {
00242 return ret;
00243 }
00244
00245 ASSERT(!empty());
00246
00247
00248
00249 ret = del_bundle(list_.begin(), used_notifier);
00250 ret.object()->del_ref("bundle_list", name_.c_str());
00251 return ret;
00252 }
00253
00254
00255 BundleRef
00256 BundleList::pop_back(bool used_notifier)
00257 {
00258 oasys::ScopeLock l(lock_, "BundleList::pop_back");
00259
00260 BundleRef ret("BundleList::pop_back() temporary");
00261
00262 if (list_.empty()) {
00263 return ret;
00264 }
00265
00266
00267
00268 ret = del_bundle(--list_.end(), used_notifier);
00269 ret->del_ref("bundle_list", name_.c_str());
00270 return ret;
00271 }
00272
00273
00274 bool
00275 BundleList::erase(Bundle* bundle, bool used_notifier)
00276 {
00277 if (bundle == NULL) {
00278 return false;
00279 }
00280
00281
00282
00283 ASSERTF(!bundle->lock()->is_locked_by_me(),
00284 "bundle cannot be locked before calling erase "
00285 "due to potential deadlock");
00286
00287 oasys::ScopeLock l(lock_, "BundleList::erase");
00288
00289
00290
00291 oasys::ScopeLock bl(bundle->lock(), "BundleList::erase");
00292
00293 BundleMappings::iterator mapping = bundle->mappings()->find(this);
00294 if (mapping == bundle->mappings()->end()) {
00295 return false;
00296 }
00297
00298 ASSERT(mapping->list() == this);
00299 ASSERT(*mapping->position() == bundle);
00300
00301
00302
00303 iterator pos = mapping->position();
00304 Bundle* b = del_bundle(pos, used_notifier);
00305 ASSERT(b == bundle);
00306
00307 bundle->del_ref("bundle_list", name_.c_str());
00308 return true;
00309 }
00310
00311
00312 void
00313 BundleList::erase(iterator& iter, bool used_notifier)
00314 {
00315 Bundle* bundle = *iter;
00316 ASSERTF(!bundle->lock()->is_locked_by_me(),
00317 "bundle cannot be locked in erase due to potential deadlock");
00318
00319 oasys::ScopeLock l(lock_, "BundleList::erase");
00320
00321 Bundle* b = del_bundle(iter, used_notifier);
00322 ASSERT(b == bundle);
00323
00324 bundle->del_ref("bundle_list", name_.c_str());
00325 }
00326
00327
00328 bool
00329 BundleList::contains(Bundle* bundle) const
00330 {
00331 oasys::ScopeLock l(lock_, "BundleList::contains");
00332
00333 if (bundle == NULL) {
00334 return false;
00335 }
00336
00337 bool ret = bundle->is_queued_on(this);
00338
00339 #define DEBUG_MAPPINGS
00340 #ifdef DEBUG_MAPPINGS
00341 bool ret2 = (std::find(begin(), end(), bundle) != end());
00342 ASSERT(ret == ret2);
00343 #endif
00344
00345 return ret;
00346 }
00347
00348
00349 BundleRef
00350 BundleList::find(u_int32_t bundle_id) const
00351 {
00352 oasys::ScopeLock l(lock_, "BundleList::find");
00353 BundleRef ret("BundleList::find() temporary");
00354 for (iterator iter = begin(); iter != end(); ++iter) {
00355 if ((*iter)->bundleid() == bundle_id) {
00356 ret = *iter;
00357 return ret;
00358 }
00359 }
00360
00361 return ret;
00362 }
00363
00364
00365 BundleRef
00366 BundleList::find(const EndpointID& source_eid,
00367 const BundleTimestamp& creation_ts) const
00368 {
00369 oasys::ScopeLock l(lock_, "BundleList::find");
00370 BundleRef ret("BundleList::find() temporary");
00371
00372 for (iterator iter = begin(); iter != end(); ++iter) {
00373 if ((*iter)->creation_ts().seconds_ == creation_ts.seconds_ &&
00374 (*iter)->creation_ts().seqno_ == creation_ts.seqno_ &&
00375 (*iter)->source().equals(source_eid))
00376 {
00377 ret = *iter;
00378 return ret;
00379 }
00380 }
00381
00382 return ret;
00383 }
00384
00385
00386 BundleRef
00387 BundleList::find(GbofId& gbof_id) const
00388 {
00389 oasys::ScopeLock l(lock_, "BundleList::find");
00390 BundleRef ret("BundleList::find() temporary");
00391
00392 for (iterator iter = begin(); iter != end(); ++iter) {
00393 if (gbof_id.equals((*iter)->source(),
00394 (*iter)->creation_ts(),
00395 (*iter)->is_fragment(),
00396 (*iter)->payload().length(),
00397 (*iter)->frag_offset()))
00398 {
00399 ret = *iter;
00400 return ret;
00401 }
00402 }
00403
00404 return ret;
00405 }
00406
00407
00408 BundleRef
00409 BundleList::find(const GbofId& gbof_id, const BundleTimestamp& extended_id) const
00410 {
00411 oasys::ScopeLock l(lock_, "BundleList::find");
00412 BundleRef ret("BundleList::find() temporary");
00413
00414 for (iterator iter = begin(); iter != end(); ++iter) {
00415 if (extended_id == (*iter)->extended_id() &&
00416 gbof_id.equals((*iter)->source(),
00417 (*iter)->creation_ts(),
00418 (*iter)->is_fragment(),
00419 (*iter)->payload().length(),
00420 (*iter)->frag_offset()))
00421 {
00422 ret = *iter;
00423 return ret;
00424 }
00425 }
00426
00427 return ret;
00428 }
00429
00430
00431 void
00432 BundleList::move_contents(BundleList* other)
00433 {
00434 oasys::ScopeLock l1(lock_, "BundleList::move_contents");
00435 oasys::ScopeLock l2(other->lock_, "BundleList::move_contents");
00436
00437 BundleRef b("BundleList::move_contents temporary");
00438 while (!list_.empty()) {
00439 b = pop_front();
00440 other->push_back(b.object());
00441 }
00442 }
00443
00444
00445 void
00446 BundleList::clear()
00447 {
00448 oasys::ScopeLock l(lock_, "BundleList::clear");
00449
00450 while (!list_.empty()) {
00451 BundleRef b("BundleList::clear temporary");
00452 b = pop_front();
00453 }
00454 }
00455
00456
00457
00458 size_t
00459 BundleList::size() const
00460 {
00461 oasys::ScopeLock l(lock_, "BundleList::size");
00462 return list_.size();
00463 }
00464
00465
00466 bool
00467 BundleList::empty() const
00468 {
00469 oasys::ScopeLock l(lock_, "BundleList::empty");
00470 return list_.empty();
00471 }
00472
00473
00474 BundleList::iterator
00475 BundleList::begin() const
00476 {
00477 if (!lock_->is_locked_by_me())
00478 PANIC("Must lock BundleList before using iterator");
00479
00480
00481
00482
00483 return const_cast<BundleList*>(this)->list_.begin();
00484 }
00485
00486
00487 BundleList::iterator
00488 BundleList::end() const
00489 {
00490 if (!lock_->is_locked_by_me())
00491 PANIC("Must lock BundleList before using iterator");
00492
00493
00494 return const_cast<BundleList*>(this)->list_.end();
00495 }
00496
00497
00498 BlockingBundleList::BlockingBundleList(const std::string& name)
00499 : BundleList(name)
00500 {
00501 notifier_ = new oasys::Notifier(logpath_);
00502 }
00503
00504
00505 BlockingBundleList::~BlockingBundleList()
00506 {
00507 delete notifier_;
00508 notifier_ = NULL;
00509 }
00510
00511
00512 BundleRef
00513 BlockingBundleList::pop_blocking(int timeout)
00514 {
00515 ASSERT(notifier_);
00516
00517 log_debug("pop_blocking on list %p [%s]", this, name().c_str());
00518
00519 lock_->lock("BlockingBundleList::pop_blocking");
00520
00521 bool used_wait;
00522 if (empty()) {
00523 used_wait = true;
00524 bool notified = notifier_->wait(lock_, timeout);
00525 ASSERT(lock_->is_locked_by_me());
00526
00527
00528
00529 if (!notified) {
00530 lock_->unlock();
00531 log_debug("pop_blocking timeout on list %p", this);
00532
00533 return BundleRef("BlockingBundleList::pop_blocking temporary");
00534 }
00535 } else {
00536 used_wait = false;
00537 }
00538
00539
00540
00541 ASSERT(!empty());
00542
00543 BundleRef ret("BlockingBundleList::pop_blocking temporary");
00544 ret = pop_front(used_wait);
00545
00546 lock_->unlock();
00547
00548 log_debug("pop_blocking got bundle %p from list %p",
00549 ret.object(), this);
00550
00551 return ret;
00552 }
00553
00554 }