00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <list>
00022
00023 #include "Bundle.h"
00024 #include "BundleEvent.h"
00025 #include "BundleDaemon.h"
00026 #include "BundleList.h"
00027 #include "BundleRef.h"
00028 #include "FragmentManager.h"
00029 #include "FragmentState.h"
00030 #include "BlockInfo.h"
00031 #include "BundleProtocol.h"
00032
00033 namespace dtn {
00034
00035 class BlockInfoPointerList : public std::list<BlockInfo*> { };
00036
00037
00038 FragmentManager::FragmentManager()
00039 : Logger("FragmentManager", "/dtn/bundle/fragmentation")
00040 {
00041 }
00042
00043
00044 Bundle*
00045 FragmentManager::create_fragment(Bundle* bundle,
00046 BlockInfoVec *blocks,
00047 size_t offset,
00048 size_t length)
00049 {
00050 Bundle* fragment = new Bundle();
00051
00052
00053 bundle->copy_metadata(fragment);
00054 fragment->set_is_fragment(true);
00055 fragment->set_do_not_fragment(false);
00056
00057
00058
00059 if (! bundle->is_fragment()) {
00060 fragment->set_orig_length(bundle->payload().length());
00061 fragment->set_frag_offset(offset);
00062 } else {
00063 fragment->set_orig_length(bundle->orig_length());
00064 fragment->set_frag_offset(bundle->frag_offset() + offset);
00065 }
00066
00067
00068 if ((offset + length) > fragment->orig_length()) {
00069 PANIC("fragment length overrun: "
00070 "orig_length %u frag_offset %u requested offset %zu length %zu",
00071 fragment->orig_length(), fragment->frag_offset(),
00072 offset, length);
00073 }
00074
00075
00076 fragment->mutable_payload()->set_length(length);
00077 fragment->mutable_payload()->write_data(bundle->payload(), offset, length, 0);
00078
00079
00080
00081
00082 BlockInfoVec::iterator iter;
00083 bool found_payload = false;
00084 for (iter = blocks->begin(); iter != blocks->end(); iter++) {
00085 int type = iter->type();
00086 if (type == BundleProtocol::PRIMARY_BLOCK
00087 || type == BundleProtocol::PAYLOAD_BLOCK
00088 || found_payload
00089 || iter->flags() & BundleProtocol::BLOCK_FLAG_REPLICATE) {
00090
00091
00092
00093 fragment->mutable_recv_blocks()->push_back(*iter);
00094 if (type == BundleProtocol::PAYLOAD_BLOCK) {
00095 found_payload = true;
00096 }
00097 }
00098 }
00099
00100 return fragment;
00101 }
00102
00103
00104 Bundle*
00105 FragmentManager::create_fragment(Bundle* bundle,
00106 const LinkRef& link,
00107 const BlockInfoPointerList& blocks_to_copy,
00108 size_t offset,
00109 size_t max_length)
00110 {
00111 size_t block_length = 0;
00112 BlockInfoPointerList::const_iterator block_i;
00113
00114 for (block_i = blocks_to_copy.begin();
00115 block_i != blocks_to_copy.end();
00116 ++block_i) {
00117 block_length += (*block_i)->contents().len();
00118 }
00119
00120 if (block_length > max_length) {
00121 log_err("unable to create a fragment of length %zu; minimum length "
00122 "required is %zu", max_length, block_length);
00123 return NULL;
00124 }
00125
00126 Bundle* fragment = new Bundle();
00127
00128
00129 bundle->copy_metadata(fragment);
00130 fragment->set_is_fragment(true);
00131 fragment->set_do_not_fragment(false);
00132
00133
00134
00135 if (! bundle->is_fragment()) {
00136 fragment->set_orig_length(bundle->payload().length());
00137 fragment->set_frag_offset(offset);
00138 } else {
00139 fragment->set_orig_length(bundle->orig_length());
00140 fragment->set_frag_offset(bundle->frag_offset() + offset);
00141 }
00142
00143
00144 size_t to_copy = std::min(max_length - block_length,
00145 bundle->payload().length() - offset);
00146 fragment->mutable_payload()->set_length(to_copy);
00147 fragment->mutable_payload()->write_data(bundle->payload(), offset, to_copy, 0);
00148 BlockInfoVec* xmit_blocks = fragment->xmit_blocks()->create_blocks(link);
00149
00150 for (block_i = blocks_to_copy.begin();
00151 block_i != blocks_to_copy.end();
00152 ++block_i) {
00153 xmit_blocks->push_back(BlockInfo(*(*block_i)));
00154 }
00155
00156 log_debug("created %zu byte fragment bundle with %zu bytes of payload",
00157 to_copy + block_length, to_copy);
00158
00159 return fragment;
00160 }
00161
00162
00163 bool
00164 FragmentManager::try_to_convert_to_fragment(Bundle* bundle)
00165 {
00166 const BlockInfo *payload_block
00167 = bundle->recv_blocks().find_block(BundleProtocol::PAYLOAD_BLOCK);
00168 if (!payload_block) {
00169 return false;
00170 }
00171 if (payload_block->data_offset() == 0) {
00172 return false;
00173 }
00174
00175 if (bundle->do_not_fragment()) {
00176 return false;
00177 }
00178
00179
00180 size_t payload_len = payload_block->data_length();
00181 size_t payload_rcvd = bundle->payload().length();
00182
00183
00184
00185 if (payload_len <= 1) {
00186 return false;
00187 }
00188
00189 if (payload_rcvd >= payload_len) {
00190 ASSERT(payload_block->complete() || payload_len == 0);
00191
00192 if (payload_block->last_block()) {
00193 return false;
00194 }
00195
00196
00197
00198 BlockInfoVec::const_iterator last_block =
00199 bundle->recv_blocks().end() - 1;
00200
00201 if (last_block->data_offset() != 0 && last_block->complete()
00202 && last_block->last_block()) {
00203 return false;
00204 }
00205
00206
00207
00208 payload_rcvd--;
00209 bundle->mutable_payload()->truncate(payload_rcvd);
00210 }
00211
00212 log_debug("partial bundle *%p, making reactive fragment of %zu bytes",
00213 bundle, payload_rcvd);
00214
00215 if (! bundle->is_fragment()) {
00216 bundle->set_is_fragment(true);
00217 bundle->set_orig_length(payload_len);
00218 bundle->set_frag_offset(0);
00219 } else {
00220
00221
00222 }
00223 bundle->set_fragmented_incoming(true);
00224
00225 return true;
00226 }
00227
00228
00229 void
00230 FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
00231 {
00232 char buf[128];
00233 snprintf(buf, 128, "%u.%u",
00234 bundle->creation_ts().seconds_,
00235 bundle->creation_ts().seqno_);
00236
00237 key->append(buf);
00238 key->append(bundle->source().c_str());
00239 key->append(bundle->dest().c_str());
00240 }
00241
00242
00243 FragmentState*
00244 FragmentManager::proactively_fragment(Bundle* bundle,
00245 const LinkRef& link,
00246 size_t max_length)
00247 {
00248 size_t payload_len = bundle->payload().length();
00249
00250 Bundle* fragment;
00251 FragmentState* state = new FragmentState(bundle);
00252
00253 size_t todo = payload_len;
00254 size_t offset = 0;
00255 size_t count = 0;
00256
00257 BlockInfoPointerList first_frag_blocks;
00258 BlockInfoPointerList all_frag_blocks;
00259 BlockInfoPointerList& this_frag_blocks = first_frag_blocks;
00260 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
00261
00262 BlockInfoVec::iterator block_i;
00263 for (block_i = blocks->begin(); block_i != blocks->end(); ++block_i) {
00264 BlockInfo* block_info = &(*block_i);
00265
00266 if (block_info->type() == BundleProtocol::PRIMARY_BLOCK ||
00267 block_info->type() == BundleProtocol::PAYLOAD_BLOCK) {
00268 all_frag_blocks.push_back(block_info);
00269 first_frag_blocks.push_back(block_info);
00270 }
00271
00272 else if (block_info->flags() & BundleProtocol::BLOCK_FLAG_REPLICATE)
00273 all_frag_blocks.push_back(block_info);
00274 else
00275 first_frag_blocks.push_back(block_info);
00276 }
00277
00278 do {
00279 fragment = create_fragment(bundle, link, this_frag_blocks,
00280 offset, max_length);
00281 ASSERT(fragment);
00282
00283 state->add_fragment(fragment);
00284 offset += fragment->payload().length();
00285 todo -= fragment->payload().length();
00286 this_frag_blocks = all_frag_blocks;
00287 ++count;
00288
00289 } while (todo > 0);
00290
00291 log_info("proactively fragmenting "
00292 "%zu byte payload into %zu %zu byte fragments",
00293 payload_len, count, max_length);
00294
00295 std::string hash_key;
00296 get_hash_key(fragment, &hash_key);
00297 fragment_table_[hash_key] = state;
00298
00299 return state;
00300 }
00301
00302 FragmentState*
00303 FragmentManager::get_fragment_state(Bundle* bundle)
00304 {
00305 std::string hash_key;
00306 get_hash_key(bundle, &hash_key);
00307 FragmentTable::iterator iter = fragment_table_.find(hash_key);
00308
00309 if (iter == fragment_table_.end()) {
00310 return NULL;
00311 } else {
00312 return iter->second;
00313 }
00314 }
00315
00316
00317 void
00318 FragmentManager::erase_fragment_state(FragmentState* state)
00319 {
00320 std::string hash_key;
00321 get_hash_key(state->bundle().object(), &hash_key);
00322 fragment_table_.erase(hash_key);
00323 }
00324
00325
00326 bool
00327 FragmentManager::try_to_reactively_fragment(Bundle* bundle,
00328 BlockInfoVec *blocks,
00329 size_t bytes_sent)
00330 {
00331 if (bundle->do_not_fragment()) {
00332 return false;
00333 }
00334
00335 size_t payload_offset = BundleProtocol::payload_offset(blocks);
00336 size_t total_length = BundleProtocol::total_length(blocks);
00337
00338 if (bytes_sent <= payload_offset) {
00339 return false;
00340 }
00341
00342 if (bytes_sent >= total_length) {
00343 return false;
00344 }
00345
00346 const BlockInfo *payload_block
00347 = blocks->find_block(BundleProtocol::PAYLOAD_BLOCK);
00348
00349 size_t payload_len = bundle->payload().length();
00350 size_t payload_sent = std::min(payload_len, bytes_sent - payload_offset);
00351
00352
00353
00354 if (payload_len <= 1) {
00355 return false;
00356 }
00357
00358 size_t frag_off, frag_len;
00359
00360 if (payload_sent >= payload_len) {
00361
00362 ASSERT(! payload_block->last_block());
00363
00364
00365 frag_off = payload_len - 1;
00366 frag_len = 1;
00367 }
00368 else {
00369 frag_off = payload_sent;
00370 frag_len = payload_len - payload_sent;
00371 }
00372
00373 log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
00374 frag_off, frag_len, payload_len);
00375
00376 Bundle* tail = create_fragment(bundle, blocks, frag_off, frag_len);
00377
00378
00379 BundleDaemon::post_at_head(
00380 new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION));
00381
00382 return true;
00383 }
00384
00385
00386 void
00387 FragmentManager::process_for_reassembly(Bundle* fragment)
00388 {
00389 FragmentState* state;
00390 FragmentTable::iterator iter;
00391
00392 ASSERT(fragment->is_fragment());
00393
00394
00395 std::string hash_key;
00396 get_hash_key(fragment, &hash_key);
00397 iter = fragment_table_.find(hash_key);
00398
00399 log_debug("processing bundle fragment id=%u hash=%s %d",
00400 fragment->bundleid(), hash_key.c_str(),
00401 fragment->is_fragment());
00402
00403 if (iter == fragment_table_.end()) {
00404 log_debug("no reassembly state for key %s -- creating new state",
00405 hash_key.c_str());
00406 state = new FragmentState();
00407
00408
00409
00410
00411 fragment->copy_metadata(state->bundle().object());
00412 state->bundle()->set_is_fragment(false);
00413 state->bundle()->mutable_payload()->
00414 set_length(fragment->orig_length());
00415 fragment_table_[hash_key] = state;
00416 } else {
00417 state = iter->second;
00418 log_debug("found reassembly state for key %s (%zu fragments)",
00419 hash_key.c_str(), state->fragment_list().size());
00420 }
00421
00422
00423 state->add_fragment(fragment);
00424
00425
00426 size_t fraglen = fragment->payload().length();
00427
00428 log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
00429 state->bundle()->payload().length(),
00430 0, fragment->frag_offset(), fraglen);
00431
00432 state->bundle()->mutable_payload()->
00433 write_data(fragment->payload(), 0, fraglen,
00434 fragment->frag_offset());
00435
00436
00437
00438 if (fragment->frag_offset() == 0 &&
00439 !state->bundle()->recv_blocks().empty())
00440 {
00441 BlockInfoVec::const_iterator block_i;
00442 for (block_i = fragment->recv_blocks().begin();
00443 block_i != fragment->recv_blocks().end(); ++block_i)
00444 {
00445 state->bundle()->mutable_recv_blocks()->
00446 push_back(BlockInfo(*block_i));
00447 }
00448 }
00449
00450
00451 if (! state->check_completed()) {
00452 return;
00453 }
00454
00455 BundleDaemon::post_at_head
00456 (new ReassemblyCompletedEvent(state->bundle().object(),
00457 &state->fragment_list()));
00458 ASSERT(state->fragment_list().size() == 0);
00459 fragment_table_.erase(hash_key);
00460 delete state;
00461 }
00462
00463
00464 void
00465 FragmentManager::delete_obsoleted_fragments(Bundle* bundle)
00466 {
00467 FragmentState* state;
00468 FragmentTable::iterator iter;
00469
00470
00471 std::string hash_key;
00472 get_hash_key(bundle, &hash_key);
00473 iter = fragment_table_.find(hash_key);
00474
00475 log_debug("checking for obsolete fragments id=%u hash=%s...",
00476 bundle->bundleid(), hash_key.c_str());
00477
00478 if (iter == fragment_table_.end()) {
00479 log_debug("no reassembly state for key %s",
00480 hash_key.c_str());
00481 return;
00482 }
00483
00484 state = iter->second;
00485 log_debug("found reassembly state... deleting %zu fragments",
00486 state->num_fragments());
00487
00488 BundleRef fragment("FragmentManager::delete_obsoleted_fragments");
00489 BundleList::iterator i;
00490 oasys::ScopeLock l(state->fragment_list().lock(),
00491 "FragmentManager::delete_obsoleted_fragments");
00492 while (! state->fragment_list().empty()) {
00493 BundleDaemon::post(new BundleDeleteRequest(state->fragment_list().pop_back(),
00494 BundleProtocol::REASON_NO_ADDTL_INFO));
00495 }
00496
00497 ASSERT(state->fragment_list().size() == 0);
00498 l.unlock();
00499 fragment_table_.erase(hash_key);
00500 delete state;
00501 }
00502
00503
00504 void
00505 FragmentManager::delete_fragment(Bundle* fragment)
00506 {
00507 FragmentState* state;
00508 FragmentTable::iterator iter;
00509
00510 ASSERT(fragment->is_fragment());
00511
00512
00513 std::string hash_key;
00514 get_hash_key(fragment, &hash_key);
00515 iter = fragment_table_.find(hash_key);
00516
00517
00518 if (iter == fragment_table_.end()) {
00519 return;
00520 }
00521
00522 state = iter->second;
00523
00524
00525 bool erased = state->erase_fragment(fragment);
00526
00527
00528 if (!erased) {
00529 return;
00530 }
00531
00532
00533
00534
00535
00536
00537 if (state->num_fragments() == 0) {
00538 fragment_table_.erase(hash_key);
00539 delete state;
00540 }
00541 }
00542
00543 }