StreamConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <oasys/util/OptParser.h>
00019 #include "StreamConvergenceLayer.h"
00020 #include "bundling/AnnounceBundle.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "bundling/SDNV.h"
00023 #include "bundling/TempBundle.h"
00024 #include "contacts/ContactManager.h"
00025 
00026 namespace dtn {
00027 
00028 //----------------------------------------------------------------------
00029 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults)
00030     : LinkParams(init_defaults),
00031       segment_ack_enabled_(true),
00032       negative_ack_enabled_(true),
00033       keepalive_interval_(10),
00034       segment_length_(4096)
00035 {
00036 }
00037 
00038 //----------------------------------------------------------------------
00039 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath,
00040                                                const char* cl_name,
00041                                                u_int8_t    cl_version)
00042     : ConnectionConvergenceLayer(logpath, cl_name),
00043       cl_version_(cl_version)
00044 {
00045 }
00046 
00047 //----------------------------------------------------------------------
00048 bool
00049 StreamConvergenceLayer::parse_link_params(LinkParams* lparams,
00050                                           int argc, const char** argv,
00051                                           const char** invalidp)
00052 {
00053     // all subclasses should create a params structure that derives
00054     // from StreamLinkParams
00055     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00056     ASSERT(params != NULL);
00057                                
00058     oasys::OptParser p;
00059 
00060     p.addopt(new oasys::BoolOpt("segment_ack_enabled",
00061                                 &params->segment_ack_enabled_));
00062     
00063     p.addopt(new oasys::BoolOpt("negative_ack_enabled",
00064                                 &params->negative_ack_enabled_));
00065     
00066     p.addopt(new oasys::UIntOpt("keepalive_interval",
00067                                 &params->keepalive_interval_));
00068     
00069     p.addopt(new oasys::UIntOpt("segment_length",
00070                                 &params->segment_length_));
00071     
00072     p.addopt(new oasys::UInt8Opt("cl_version",
00073                                  &cl_version_));
00074     
00075     int count = p.parse_and_shift(argc, argv, invalidp);
00076     if (count == -1) {
00077         return false;
00078     }
00079     argc -= count;
00080 
00081     return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
00082                                                          invalidp);
00083 }
00084 
00085 //----------------------------------------------------------------------
00086 bool
00087 StreamConvergenceLayer::finish_init_link(Link* link, LinkParams* lparams)
00088 {
00089     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00090     ASSERT(params != NULL);
00091 
00092     // make sure to set the reliability bit in the link structure
00093     if (params->segment_ack_enabled_) {
00094         link->set_reliable(true);
00095     }
00096     
00097     return true;
00098 }
00099 
00100 //----------------------------------------------------------------------
00101 void
00102 StreamConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00103 {
00104     ConnectionConvergenceLayer::dump_link(link, buf);
00105     
00106     StreamLinkParams* params =
00107         dynamic_cast<StreamLinkParams*>(link->cl_info());
00108     ASSERT(params != NULL);
00109     
00110     buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
00111     buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
00112     buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
00113     buf->appendf("segment_length: %u\n", params->segment_length_);
00114 }
00115 
00116 //----------------------------------------------------------------------
00117 StreamConvergenceLayer::Connection::Connection(const char* classname,
00118                                                const char* logpath,
00119                                                StreamConvergenceLayer* cl,
00120                                                StreamLinkParams* params,
00121                                                bool active_connector)
00122     : CLConnection(classname, logpath, cl, params, active_connector),
00123       current_inflight_(NULL),
00124       send_segment_todo_(0),
00125       recv_segment_todo_(0),
00126       breaking_contact_(false)
00127 {
00128 }
00129 
00130 //----------------------------------------------------------------------
00131 void
00132 StreamConvergenceLayer::Connection::initiate_contact()
00133 {
00134     log_debug("initiate_contact called");
00135 
00136     // format the contact header
00137     ContactHeader contacthdr;
00138     contacthdr.magic   = htonl(MAGIC);
00139     contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00140     
00141     contacthdr.flags = 0;
00142 
00143     StreamLinkParams* params = stream_lparams();
00144     
00145     if (params->segment_ack_enabled_)
00146         contacthdr.flags |= SEGMENT_ACK_ENABLED;
00147     
00148     if (params->reactive_frag_enabled_)
00149         contacthdr.flags |= REACTIVE_FRAG_ENABLED;
00150     
00151     contacthdr.keepalive_interval = htons(params->keepalive_interval_);
00152 
00153     // copy the contact header into the send buffer
00154     ASSERT(sendbuf_.fullbytes() == 0);
00155     if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
00156         log_warn("send buffer too short: %zu < needed %zu",
00157                  sendbuf_.tailbytes(), sizeof(ContactHeader));
00158         sendbuf_.reserve(sizeof(ContactHeader));
00159     }
00160     
00161     memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
00162     sendbuf_.fill(sizeof(ContactHeader));
00163     
00164     // follow up with the local endpoint id length + data
00165     BundleDaemon* bd = BundleDaemon::instance();
00166     size_t local_eid_len = bd->local_eid().length();
00167     size_t sdnv_len = SDNV::encoding_len(local_eid_len);
00168     
00169     if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
00170         log_warn("send buffer too short: %zu < needed %zu",
00171                  sendbuf_.tailbytes(), sdnv_len + local_eid_len);
00172         sendbuf_.reserve(sdnv_len + local_eid_len);
00173     }
00174     
00175     sdnv_len = SDNV::encode(local_eid_len,
00176                             (u_char*)sendbuf_.end(),
00177                             sendbuf_.tailbytes());
00178     sendbuf_.fill(sdnv_len);
00179     
00180     memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
00181     sendbuf_.fill(local_eid_len);
00182 
00183     // drain the send buffer
00184     note_data_sent();
00185     send_data();
00186 
00187     /*
00188      * Now we initialize the various timers that are used for
00189      * keepalives / idle timeouts to make sure they're not used
00190      * uninitialized.
00191      */
00192     ::gettimeofday(&data_rcvd_, 0);
00193     ::gettimeofday(&data_sent_, 0);
00194     ::gettimeofday(&keepalive_sent_, 0);
00195 
00196 
00197     // XXX/demmer need to add a test for nothing coming back
00198 }
00199 
00200 //----------------------------------------------------------------------
00201 void
00202 StreamConvergenceLayer::Connection::handle_contact_initiation()
00203 {
00204     ASSERT(! contact_up_);
00205 
00206     /*
00207      * First check that we have enough data for the contact header
00208      */
00209     size_t len_needed = sizeof(ContactHeader);
00210     if (recvbuf_.fullbytes() < len_needed) {
00211  tooshort:
00212         log_debug("handle_contact_initiation: not enough data received "
00213                   "(need > %zu, got %zu)",
00214                   len_needed, recvbuf_.fullbytes());
00215         return;
00216     }
00217 
00218     /*
00219      * Now check for enough data for the peer's eid
00220      */
00221     u_int64_t peer_eid_len;
00222     int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
00223                                   sizeof(ContactHeader),
00224                                 recvbuf_.fullbytes() -
00225                                   sizeof(ContactHeader),
00226                                 &peer_eid_len);
00227     if (sdnv_len < 0) {
00228         goto tooshort;
00229     }
00230     
00231     len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
00232     if (recvbuf_.fullbytes() < len_needed) {
00233         goto tooshort;
00234     }
00235     
00236     /*
00237      * Ok, we have enough data, parse the contact header.
00238      */
00239     ContactHeader contacthdr;
00240     memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
00241 
00242     contacthdr.magic              = ntohl(contacthdr.magic);
00243     contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
00244 
00245     recvbuf_.consume(sizeof(ContactHeader));
00246     
00247     /*
00248      * Check for valid magic number and version.
00249      */
00250     if (contacthdr.magic != MAGIC) {
00251         log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
00252                  "-- disconnecting.", contacthdr.magic, MAGIC);
00253         break_contact(ContactEvent::CL_ERROR);
00254         return;
00255     }
00256 
00257     /*
00258      * In this implementation, we can't handle other versions than our
00259      * own, but if the other side presents a higher version, we allow
00260      * it to go through and thereby allow them to downgrade to this
00261      * version.
00262      */
00263     u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00264     if (contacthdr.version < cl_version) {
00265         log_warn("remote sent version %d, expected version %d "
00266                  "-- disconnecting.", contacthdr.version, cl_version);
00267         break_contact(ContactEvent::CL_VERSION);
00268         return;
00269     }
00270 
00271     /*
00272      * Now do parameter negotiation.
00273      */
00274     StreamLinkParams* params = stream_lparams();
00275     
00276     params->keepalive_interval_ =
00277         std::min(params->keepalive_interval_,
00278                  (u_int)contacthdr.keepalive_interval);
00279 
00280     params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
00281                                    (contacthdr.flags & SEGMENT_ACK_ENABLED);
00282     
00283     params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
00284                                      (contacthdr.flags & REACTIVE_FRAG_ENABLED);
00285 
00286     params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
00287                                      (contacthdr.flags & NEGATIVE_ACK_ENABLED);
00288 
00289     /*
00290      * Make sure to readjust poll_timeout in case we have a smaller
00291      * keepalive interval than data timeout
00292      */
00293     if (params->keepalive_interval_ != 0 &&
00294         (params->keepalive_interval_ * 1000) < params->data_timeout_)
00295     {
00296         poll_timeout_ = params->keepalive_interval_ * 1000;
00297     }
00298      
00299     /*
00300      * Now skip the sdnv that encodes the peer's eid length since we
00301      * parsed it above.
00302      */
00303     recvbuf_.consume(sdnv_len);
00304 
00305     /*
00306      * Finally, parse the peer node's eid and give it to the base
00307      * class to handle (i.e. by linking us to a Contact if we don't
00308      * have one).
00309      */
00310     EndpointID peer_eid;
00311     if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
00312         log_err("protocol error: invalid endpoint id '%s' (len %llu)",
00313                 peer_eid.c_str(), U64FMT(peer_eid_len));
00314         break_contact(ContactEvent::CL_ERROR);
00315         return;
00316     }
00317 
00318     find_contact(peer_eid);
00319     recvbuf_.consume(peer_eid_len);
00320 
00321     /*
00322      * Finally, we note that the contact is now up.
00323      */
00324     contact_up();
00325 }
00326 
00327 
00328 //----------------------------------------------------------------------
00329 void
00330 StreamConvergenceLayer::Connection::handle_send_bundle(Bundle* bundle)
00331 {
00332     // push the bundle onto the inflight queue. we'll handle sending
00333     // the bundle out in the callback for transmit_bundle_data
00334     InFlightBundle* inflight = new InFlightBundle(bundle);
00335     inflight->blocks_ = bundle->xmit_blocks_.find_blocks(contact_->link());
00336     ASSERT(inflight->blocks_ != NULL);
00337     inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
00338     inflight_.push_back(inflight);
00339 }
00340 
00341 //----------------------------------------------------------------------
00342 bool
00343 StreamConvergenceLayer::Connection::send_pending_data()
00344 {
00345     // if the outgoing data buffer is full, we can't do anything until
00346     // we poll()
00347     if (sendbuf_.tailbytes() == 0) {
00348         return false;
00349     }
00350 
00351     // if we're in the middle of sending a segment, we need to continue
00352     // sending it. only if we completely send the segment do we fall
00353     // through to send acks, otherwise we return to try to finish it
00354     // again later.
00355     if (send_segment_todo_ != 0) {
00356         ASSERT(current_inflight_ != NULL);        
00357         send_data_todo(current_inflight_);
00358     }
00359     
00360     // see if we're broken or write blocked
00361     if (contact_broken_ || (send_segment_todo_ != 0)) {
00362         return false;
00363     }
00364     
00365     // now check if there are acks we need to send -- even if it
00366     // returns true (i.e. we sent an ack), we continue on and try to
00367     // send some real payload data, otherwise we could get starved by
00368     // arriving data and never send anything out.
00369     bool sent_ack = send_pending_acks();
00370     
00371     // if the connection failed during ack transmission, stop
00372     if (contact_broken_)
00373     {
00374         return sent_ack;
00375     }
00376 
00377     // check if we need to start a new bundle. if we do, then
00378     // start_next_bundle handles the correct return code
00379     bool sent_data;
00380     if (current_inflight_ == NULL) {
00381         sent_data = start_next_bundle();
00382     } else {
00383         // otherwise send the next segment of the current bundle
00384         sent_data = send_next_segment(current_inflight_);
00385     }
00386 
00387     return sent_ack || sent_data;
00388 }
00389 
00390 //----------------------------------------------------------------------
00391 bool
00392 StreamConvergenceLayer::Connection::send_pending_acks()
00393 {
00394     if (contact_broken_ || incoming_.empty()) {
00395         return false; // nothing to do
00396     }
00397     IncomingBundle* incoming = incoming_.front();
00398     DataBitmap::iterator iter = incoming->ack_data_.begin();
00399     bool generated_ack = false;
00400     
00401     // when data segment headers are received, the last bit of the
00402     // segment is marked in ack_data, thus if there's nothing in
00403     // there, we don't need to send out an ack.
00404     if (iter == incoming->ack_data_.end()) {
00405         goto check_done;
00406     }
00407 
00408     // however, we have to be careful to check the recv_data as well
00409     // to make sure we've actually gotten the segment, since the bit
00410     // in ack_data is marked when the segment is begun, not when it's
00411     // completed
00412     while (1) {
00413         size_t rcvd_bytes  = incoming->rcvd_data_.num_contiguous();
00414         size_t ack_len     = *iter + 1;
00415         size_t segment_len = ack_len - incoming->acked_length_;
00416         (void)segment_len;
00417         
00418         if (ack_len > rcvd_bytes) {
00419             log_debug("send_pending_acks: "
00420                       "waiting to send ack length %zu for %zu byte segment "
00421                       "since only received %zu",
00422                       ack_len, segment_len, rcvd_bytes);
00423             break;
00424         }
00425 
00426         // make sure we have space in the send buffer
00427         size_t encoding_len = 1 + SDNV::encoding_len(ack_len);
00428         if (encoding_len > sendbuf_.tailbytes()) {
00429             log_debug("send_pending_acks: "
00430                       "no space for ack in buffer (need %zu, have %zu)",
00431                       encoding_len, sendbuf_.tailbytes());
00432             break;
00433         }
00434         
00435         log_debug("send_pending_acks: "
00436                   "sending ack length %zu for %zu byte segment "
00437                   "[range %u..%u] ack_data *%p",
00438                   ack_len, segment_len, incoming->acked_length_, *iter,
00439                   &incoming->ack_data_);
00440         
00441         *sendbuf_.end() = ACK_SEGMENT;
00442         int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
00443                                sendbuf_.tailbytes() - 1);
00444         ASSERT(encoding_len = len + 1);
00445         sendbuf_.fill(encoding_len);
00446 
00447         generated_ack = true;
00448         incoming->acked_length_ = ack_len;
00449         incoming->ack_data_.clear(*iter);
00450         iter = incoming->ack_data_.begin();
00451         
00452         if (iter == incoming->ack_data_.end()) {
00453             // XXX/demmer this should check if there's another bundle
00454             // with acks we could send
00455             break;
00456         }
00457         
00458         log_debug("send_pending_acks: "
00459                   "found another segment (%u)", *iter);
00460     }
00461     
00462     if (generated_ack) {
00463         send_data();
00464         note_data_sent();
00465     }
00466 
00467     // now, check if a) we've gotten everything we're supposed to
00468     // (i.e. total_length_ isn't zero), and b) we're done with all the
00469     // acks we need to send
00470  check_done:
00471     if ((incoming->total_length_ != 0) &&
00472         (incoming->total_length_ == incoming->acked_length_))
00473     {
00474         log_debug("send_pending_acks: acked all %u bytes of bundle %d",
00475                   incoming->total_length_, incoming->bundle_->bundleid_);
00476         
00477         incoming_.pop_front();
00478         delete incoming;
00479     }
00480     else
00481     {
00482         log_debug("send_pending_acks: "
00483                   "still need to send acks -- acked_range %u",
00484                   incoming->ack_data_.num_contiguous());
00485     }
00486 
00487     // return true if we've sent something
00488     return generated_ack;
00489 }
00490 
00491          
00492 //----------------------------------------------------------------------
00493 bool
00494 StreamConvergenceLayer::Connection::start_next_bundle()
00495 {
00496     ASSERT(current_inflight_ == NULL);
00497 
00498     // find the bundle to start (identified by having nothing yet in
00499     // sent_data) and store it in current_inflight_
00500     InFlightList::iterator iter;
00501     for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
00502         InFlightBundle* inflight = *iter;
00503         
00504         if (contact_broken_)
00505             return false;
00506 
00507         // skip entries that we've sent completely
00508         if (inflight->send_complete_)
00509         {
00510             ASSERT(inflight->sent_data_.num_contiguous() ==
00511                    inflight->total_length_);
00512             
00513             log_debug("start_next_bundle: "
00514                       "transmission of bundle %d already complete, skipping",
00515                       inflight->bundle_->bundleid_);
00516             continue;
00517         }
00518 
00519         // otherwise, we must not have sent anything for this bundle
00520         // at all, so assert that sent_data is empty
00521         ASSERT(inflight->sent_data_.empty());
00522         current_inflight_ = inflight;
00523         break;
00524     }
00525 
00526     // there might not be anything to send, in which case we return
00527     // false to indicate as such
00528     if (current_inflight_ == NULL) {
00529         return false;
00530     }
00531 
00532     // now send the first segment for the bundle
00533     return send_next_segment(current_inflight_);
00534 }
00535 
00536 //----------------------------------------------------------------------
00537 bool
00538 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
00539 {
00540     if (sendbuf_.tailbytes() == 0) {
00541         return false;
00542     }
00543 
00544     ASSERT(send_segment_todo_ == 0);
00545 
00546     StreamLinkParams* params = stream_lparams();
00547 
00548     size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
00549                         inflight->sent_data_.last() + 1;
00550     
00551     if (bytes_sent == inflight->total_length_) {
00552         log_debug("send_next_segment: "
00553                   "already sent all %zu bytes, finishing bundle",
00554                   bytes_sent);
00555         ASSERT(inflight->send_complete_);
00556         return finish_bundle(inflight);
00557     }
00558 
00559     u_int8_t flags = 0;
00560     size_t segment_len;
00561 
00562     if (bytes_sent == 0) {
00563         flags |= BUNDLE_START;
00564     }
00565     
00566     if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
00567         flags |= BUNDLE_END;
00568         segment_len = inflight->total_length_ - bytes_sent;
00569     } else {
00570         segment_len = params->segment_length_;
00571     }
00572     
00573     size_t sdnv_len = SDNV::encoding_len(segment_len);
00574     
00575     if (sendbuf_.tailbytes() < 1 + sdnv_len) {
00576         log_debug("send_next_segment: "
00577                   "not enough space for segment header [need %zu, have %zu]",
00578                   1 + sdnv_len, sendbuf_.tailbytes());
00579         return false;
00580     }
00581     
00582     log_debug("send_next_segment: "
00583               "starting %zu byte segment [block byte range %zu..%zu]",
00584               segment_len, bytes_sent, bytes_sent + segment_len);
00585 
00586     u_char* bp = (u_char*)sendbuf_.end();
00587     *bp++ = DATA_SEGMENT | flags;
00588     int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
00589     ASSERT(cc == (int)sdnv_len);
00590     bp += sdnv_len;
00591     
00592     sendbuf_.fill(1 + sdnv_len);
00593     send_segment_todo_ = segment_len;
00594 
00595     // send_data_todo actually does the deed
00596     return send_data_todo(inflight);
00597 }
00598 
00599 //----------------------------------------------------------------------
00600 bool
00601 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
00602 {
00603     ASSERT(send_segment_todo_ != 0);
00604 
00605     // loop since it may take multiple calls to send on the socket
00606     // before we can actually drain the todo amount
00607     while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
00608         size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
00609                             inflight->sent_data_.last() + 1;
00610         size_t send_len   = std::min(send_segment_todo_, sendbuf_.tailbytes());
00611     
00612         Bundle* bundle       = inflight->bundle_.object();
00613         BlockInfoVec* blocks = inflight->blocks_;
00614 
00615         size_t ret =
00616             BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
00617                                     bytes_sent, send_len,
00618                                     &inflight->send_complete_);
00619         ASSERT(ret == send_len);
00620         sendbuf_.fill(send_len);
00621         inflight->sent_data_.set(bytes_sent, send_len);
00622     
00623         log_debug("send_data_todo: "
00624                   "sent %zu/%zu of current segment from block offset %zu "
00625                   "(%zu todo), updated sent_data *%p",
00626                   send_len, send_segment_todo_, bytes_sent,
00627                   send_segment_todo_ - send_len, &inflight->sent_data_);
00628         
00629         send_segment_todo_ -= send_len;
00630 
00631         note_data_sent();
00632         send_data();
00633         
00634         if (contact_broken_)
00635             return true;
00636     }
00637 
00638     return (send_segment_todo_ == 0);
00639 }
00640 
00641 //----------------------------------------------------------------------
00642 bool
00643 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
00644 {
00645     ASSERT(inflight->send_complete_);
00646     
00647     ASSERT(current_inflight_ == inflight);
00648     current_inflight_ = NULL;
00649     
00650     check_completed(inflight);
00651     check_unblock_link();
00652     
00653     return true;
00654 }
00655 
00656 //----------------------------------------------------------------------
00657 void
00658 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
00659 {
00660     // we can pop the inflight bundle off of the queue and clean it up
00661     // only when both finish_bundle is called (so current_inflight_ no
00662     // longer points to the inflight bundle), and after the final ack
00663     // for the bundle has been received (determined by looking at
00664     // inflight->ack_data_)
00665 
00666     if (current_inflight_ == inflight) {
00667         log_debug("check_completed: bundle %d still waiting for finish_bundle",
00668                   inflight->bundle_->bundleid_);
00669         return;
00670     }
00671 
00672     u_int32_t acked_len = inflight->ack_data_.num_contiguous();
00673     if (acked_len < inflight->total_length_) {
00674         log_debug("check_completed: bundle %d only acked %u/%u",
00675                   inflight->bundle_->bundleid_,
00676                   acked_len, inflight->total_length_);
00677         return;
00678     }
00679 
00680     log_debug("check_completed: bundle %d transmission complete",
00681               inflight->bundle_->bundleid_);
00682     ASSERT(inflight == inflight_.front());
00683     inflight_.pop_front();
00684     delete inflight;
00685 }
00686 
00687 //----------------------------------------------------------------------
00688 void
00689 StreamConvergenceLayer::Connection::send_keepalive()
00690 {
00691     // there's no point in putting another byte in the buffer if
00692     // there's already data waiting to go out, since the arrival of
00693     // that data on the other end will do the same job as the
00694     // keepalive byte
00695     if (sendbuf_.fullbytes() != 0) {
00696         log_debug("send_keepalive: "
00697                   "send buffer has %zu bytes queued, suppressing keepalive",
00698                   sendbuf_.fullbytes());
00699         return;
00700     }
00701     ASSERT(sendbuf_.tailbytes() > 0);
00702 
00703     ::gettimeofday(&keepalive_sent_, 0);
00704 
00705     *(sendbuf_.end()) = KEEPALIVE;
00706     sendbuf_.fill(1);
00707 
00708     // don't note_data_sent() here since keepalive messages shouldn't
00709     // be counted for keeping an idle link open
00710     send_data();
00711 }
00712 //----------------------------------------------------------------------
00713 void
00714 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
00715 {
00716     (void)bundle;
00717 }
00718 
00719 //----------------------------------------------------------------------
00720 void
00721 StreamConvergenceLayer::Connection::handle_poll_timeout()
00722 {
00723     // Allow the BundleDaemon to call for a close of the connection if
00724     // a shutdown is in progress. This must be done to avoid a
00725     // deadlock caused by simultaneous poll_timeout and close_contact
00726     // activities.
00727     //
00728     // Before we return, sleep a bit to avoid continuous
00729     // handle_poll_timeout calls
00730     if (BundleDaemon::shutting_down())
00731     {
00732         sleep(1);
00733         return;
00734     }
00735 
00736     struct timeval now;
00737     u_int elapsed, elapsed2;
00738 
00739     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00740     ASSERT(params != NULL);
00741     
00742     ::gettimeofday(&now, 0);
00743     
00744     // check that it hasn't been too long since we got some data from
00745     // the other side
00746     elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00747     if (elapsed > params->data_timeout_) {
00748         log_info("handle_poll_timeout: no data heard for %d msecs "
00749                  "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
00750                  "-- closing contact",
00751                  elapsed,
00752                  (u_int)keepalive_sent_.tv_sec,
00753                  (u_int)keepalive_sent_.tv_usec,
00754                  (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
00755                  (u_int)now.tv_sec, (u_int)now.tv_usec,
00756                  poll_timeout_);
00757             
00758         break_contact(ContactEvent::BROKEN);
00759         return;
00760     }
00761     
00762     //make sure the contact still exists
00763     ContactManager* cm = BundleDaemon::instance()->contactmgr();
00764     oasys::ScopeLock l(cm->lock(),"StreamConvergenceLayer::Connection::handle_poll_timeout");
00765     if(contact_ == NULL)
00766     {
00767         return;
00768     }
00769 
00770     // check if the connection has been idle for too long
00771     // (on demand links only)
00772     if (contact_->link()->type() == Link::ONDEMAND) {
00773         u_int idle_close_time = contact_->link()->params().idle_close_time_;
00774         
00775         elapsed  = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00776         elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
00777         
00778         if (idle_close_time != 0 &&
00779             (elapsed > idle_close_time * 1000) &&
00780             (elapsed2 > idle_close_time * 1000))
00781         {
00782             log_info("closing idle connection "
00783                      "(no data received for %d msecs or sent for %d msecs)",
00784                      elapsed, elapsed2);
00785             break_contact(ContactEvent::IDLE);
00786             return;
00787         } else {
00788             log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
00789                       elapsed, elapsed2, idle_close_time * 1000);
00790         }
00791     }
00792 
00793     // check if it's time for us to send a keepalive (i.e. that we
00794     // haven't sent some data or another keepalive in at least the
00795     // configured keepalive_interval)
00796     check_keepalive();
00797 }
00798 
00799 //----------------------------------------------------------------------
00800 void
00801 StreamConvergenceLayer::Connection::check_keepalive()
00802 {
00803     struct timeval now;
00804     u_int elapsed, elapsed2;
00805 
00806     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00807     ASSERT(params != NULL);
00808 
00809     ::gettimeofday(&now, 0);
00810     
00811     if (params->keepalive_interval_ != 0) {
00812         elapsed  = TIMEVAL_DIFF_MSEC(now, data_sent_);
00813         elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
00814 
00815         // XXX/demmer this is bogus -- we should really adjust
00816         // poll_timeout to take into account the next time we should
00817         // send a keepalive
00818         // 
00819         // give a 500ms fudge to the keepalive interval to make sure
00820         // we send it when we should
00821         if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
00822         {
00823             send_keepalive();
00824         }
00825     }
00826 
00827     // XXX/demmer this is to fix a strange and not yet understood race
00828     // condition
00829     if (contact_ != NULL &&
00830         contact_->link()->state() == Link::BUSY &&
00831         num_pending_.value == 0)
00832     {
00833         elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_);
00834         if (elapsed > 5000) {
00835             log_warn("0 bundles pending and %d msecs since last xmit, "
00836                      "clearing BUSY state",
00837                      elapsed);
00838 
00839             // see comment in CLConnection.cc why we post at head
00840             BundleDaemon::post_at_head(
00841                 new LinkStateChangeRequest(contact_->link(),
00842                                            Link::AVAILABLE,
00843                                            ContactEvent::UNBLOCKED));
00844         }
00845     }
00846 }
00847 
00848 //----------------------------------------------------------------------
00849 void
00850 StreamConvergenceLayer::Connection::process_data()
00851 {
00852     if (recvbuf_.fullbytes() == 0) {
00853         return;
00854     }
00855 
00856     log_debug("processing up to %zu bytes from receive buffer",
00857               recvbuf_.fullbytes());
00858 
00859     // all data (keepalives included) should be noted since the last
00860     // reception time is used to determine when to generate new
00861     // keepalives
00862     note_data_rcvd();
00863 
00864     // the first thing we need to do is handle the contact initiation
00865     // sequence, i.e. the contact header and the announce bundle. we
00866     // know we need to do this if we haven't yet called contact_up()
00867     if (! contact_up_) {
00868         handle_contact_initiation();
00869         return;
00870     }
00871 
00872     // if a data segment is bigger than the receive buffer. when
00873     // processing a data segment, we mark the unread amount in the
00874     // recv_segment_todo__ field, so if that's not zero, we need to
00875     // drain it, then fall through to handle the rest of the buffer
00876     if (recv_segment_todo_ != 0) {
00877         bool ok = handle_data_todo();
00878         
00879         if (!ok) {
00880             return;
00881         }
00882     }
00883     
00884     // now, drain cl messages from the receive buffer. we peek at the
00885     // first byte and dispatch to the correct handler routine
00886     // depending on the type of the CL message. we don't consume the
00887     // byte yet since there's a possibility that we need to read more
00888     // from the remote side to handle the whole message
00889     while (recvbuf_.fullbytes() != 0) {
00890         if (contact_broken_) return;
00891         
00892         u_int8_t type  = *recvbuf_.start() & 0xf0;
00893         u_int8_t flags = *recvbuf_.start() & 0x0f;
00894 
00895         log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
00896                   recvbuf_.fullbytes());
00897         bool ok;
00898         switch (type) {
00899         case DATA_SEGMENT:
00900             ok = handle_data_segment(flags);
00901             break;
00902         case ACK_SEGMENT:
00903             ok = handle_ack_segment(flags);
00904             break;
00905         case REFUSE_BUNDLE:
00906             ok = handle_refuse_bundle(flags);
00907             break;
00908         case KEEPALIVE:
00909             ok = handle_keepalive(flags);
00910             break;
00911         case SHUTDOWN:
00912             ok = handle_shutdown(flags);
00913             break;
00914         default:
00915             log_err("invalid CL message type code 0x%x (flags 0x%x)",
00916                     type >> 4, flags);
00917             break_contact(ContactEvent::CL_ERROR);
00918             return;
00919         }
00920 
00921         // if there's not enough data in the buffer to handle the
00922         // message, make sure there's space to receive more
00923         if (! ok) {
00924             if (recvbuf_.fullbytes() == recvbuf_.size()) {
00925                 log_warn("process_data: "
00926                          "%zu byte recv buffer full but too small for msg %u... "
00927                          "doubling buffer size",
00928                          recvbuf_.size(), type);
00929                 
00930                 recvbuf_.reserve(recvbuf_.size() * 2);
00931 
00932             } else if (recvbuf_.tailbytes() == 0) {
00933                 // force it to move the full bytes up to the front
00934                 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
00935                 ASSERT(recvbuf_.tailbytes() != 0);
00936             }
00937             
00938             return;
00939         }
00940     }
00941 }
00942 
00943 //----------------------------------------------------------------------
00944 void
00945 StreamConvergenceLayer::Connection::note_data_rcvd()
00946 {
00947     log_debug("noting data_rcvd");
00948     ::gettimeofday(&data_rcvd_, 0);
00949 }
00950 
00951 //----------------------------------------------------------------------
00952 void
00953 StreamConvergenceLayer::Connection::note_data_sent()
00954 {
00955     log_debug("noting data_sent");
00956     ::gettimeofday(&data_sent_, 0);
00957 }
00958 
00959 //----------------------------------------------------------------------
00960 bool
00961 StreamConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
00962 {
00963     if (flags & BUNDLE_START)
00964     {
00965         // make sure we're done with the last bundle if we got a new
00966         // BUNDLE_START flag
00967         if (!incoming_.empty())
00968         {
00969             IncomingBundle* incoming = incoming_.back();
00970             if (incoming->total_length_ == 0) {
00971                 log_err("protocol error: "
00972                         "got BUNDLE_START before bundle completed");
00973                 break_contact(ContactEvent::CL_ERROR);
00974                 return false;
00975             }
00976         }
00977         
00978         log_debug("got BUNDLE_START segment, creating new IncomingBundle");
00979         IncomingBundle* incoming = new IncomingBundle(new Bundle());
00980         incoming_.push_back(incoming);
00981     }
00982     else if (incoming_.empty())
00983     {
00984         log_err("protocol error: "
00985                 "first data segment doesn't have BUNDLE_START flag set");
00986         break_contact(ContactEvent::CL_ERROR);
00987         return false;
00988     }
00989 
00990     // Note that there may be more than one incoming bundle on the
00991     // IncomingList, but it's the one at the back that we're reading
00992     // in data for. Others are waiting for acks to be sent.
00993     IncomingBundle* incoming = incoming_.back();
00994     u_char* bp = (u_char*)recvbuf_.start();
00995 
00996     // Decode the segment length and then call handle_data_todo
00997     u_int32_t segment_len;
00998     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &segment_len);
00999 
01000     if (sdnv_len < 0) {
01001         log_debug("handle_data_segment: too few bytes in buffer for sdnv (%zu)",
01002                   recvbuf_.fullbytes());
01003         return false;
01004     }
01005 
01006     recvbuf_.consume(1 + sdnv_len);
01007     
01008     if (segment_len == 0) {
01009         log_err("protocol error -- zero length segment");
01010         break_contact(ContactEvent::CL_ERROR);
01011         return false;
01012     }
01013 
01014     size_t segment_offset = incoming->rcvd_data_.num_contiguous();
01015     log_debug("handle_data_segment: "
01016               "got segment of length %u at offset %zu ",
01017               segment_len, segment_offset);
01018     
01019     incoming->ack_data_.set(segment_offset + segment_len - 1);
01020 
01021     log_debug("handle_data_segment: "
01022               "updated ack_data (segment_offset %zu) *%p ack_data *%p",
01023               segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01024 
01025 
01026     // if this is the last segment for the bundle, we calculate and
01027     // store the total length in the IncomingBundle structure so
01028     // send_pending_acks knows when we're done.
01029     if (flags & BUNDLE_END)
01030     {
01031         incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
01032                                   segment_len;
01033         
01034         log_debug("got BUNDLE_END: total length %u",
01035                   incoming->total_length_);
01036     }
01037     
01038     recv_segment_todo_ = segment_len;
01039     return handle_data_todo();
01040 }
01041 
01042 //----------------------------------------------------------------------
01043 bool
01044 StreamConvergenceLayer::Connection::handle_data_todo()
01045 {
01046     // We shouldn't get ourselves here unless there's something
01047     // incoming and there's something left to read
01048     ASSERT(!incoming_.empty());
01049     ASSERT(recv_segment_todo_ != 0);
01050     
01051     // Note that there may be more than one incoming bundle on the
01052     // IncomingList. There's always only one (at the back) that we're
01053     // reading in data for, the rest are waiting for acks to go out
01054     IncomingBundle* incoming = incoming_.back();
01055     size_t rcvd_offset    = incoming->rcvd_data_.num_contiguous();
01056     size_t rcvd_len       = recvbuf_.fullbytes();
01057     size_t chunk_len      = std::min(rcvd_len, recv_segment_todo_);
01058 
01059     if (rcvd_len == 0) {
01060         return false; // nothing to do
01061     }
01062     
01063     log_debug("handle_data_todo: "
01064               "reading todo segment %zu/%zu at offset %zu",
01065               chunk_len, recv_segment_todo_, rcvd_offset);
01066 
01067     bool last;
01068     int cc = BundleProtocol::consume(incoming->bundle_.object(),
01069                                      (u_char*)recvbuf_.start(),
01070                                      chunk_len, &last);
01071     if (cc < 0) {
01072         log_err("protocol error parsing bundle data segment");
01073         break_contact(ContactEvent::CL_ERROR);
01074         return false;
01075     }
01076 
01077     ASSERT(cc == (int)chunk_len);
01078 
01079     recv_segment_todo_ -= chunk_len;
01080     recvbuf_.consume(chunk_len);
01081 
01082     incoming->rcvd_data_.set(rcvd_offset, chunk_len);
01083     
01084     log_debug("handle_data_todo: "
01085               "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
01086               rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01087     
01088     if (recv_segment_todo_ == 0) {
01089         check_completed(incoming);
01090         return true; // completed segment
01091     }
01092 
01093     return false;
01094 }
01095 
01096 //----------------------------------------------------------------------
01097 void
01098 StreamConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
01099 {
01100     u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
01101 
01102     // if we don't know the total length yet, we haven't seen the
01103     // BUNDLE_END message
01104     if (incoming->total_length_ == 0) {
01105         return;
01106     }
01107     
01108     u_int32_t formatted_len =
01109         BundleProtocol::total_length(&incoming->bundle_->recv_blocks_);
01110     
01111     log_debug("check_completed: rcvd %u / %u (formatted length %u)",
01112               rcvd_len, incoming->total_length_, formatted_len);
01113 
01114     if (rcvd_len < incoming->total_length_) {
01115         return;
01116     }
01117     
01118     if (rcvd_len > incoming->total_length_) {
01119         log_err("protocol error: received too much data -- "
01120                 "got %u, total length %u",
01121                 rcvd_len, incoming->total_length_);
01122 
01123         // we pretend that we got nothing so the cleanup code in
01124         // CLConnection::close_contact doesn't try to post a received
01125         // event for the bundle
01126 protocol_err:
01127         incoming->rcvd_data_.clear();
01128         break_contact(ContactEvent::CL_ERROR);
01129         return;
01130     }
01131 
01132     // validate that the total length as conveyed by the convergence
01133     // layer matches the length according to the bundle protocol
01134     if (incoming->total_length_ != formatted_len) {
01135         log_err("protocol error: CL total length %u "
01136                 "doesn't match bundle protocol total %u",
01137                 incoming->total_length_, formatted_len);
01138         goto protocol_err;
01139         
01140     }
01141     
01142     BundleDaemon::post(
01143         new BundleReceivedEvent(incoming->bundle_.object(),
01144                                 EVENTSRC_PEER,
01145                                 incoming->total_length_,
01146                                 contact_.object()));
01147 }
01148 
01149 //----------------------------------------------------------------------
01150 bool
01151 StreamConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
01152 {
01153     (void)flags;
01154     u_char* bp = (u_char*)recvbuf_.start();
01155     u_int32_t acked_len;
01156     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
01157     
01158     if (sdnv_len < 0) {
01159         log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
01160                   recvbuf_.fullbytes());
01161         return false;
01162     }
01163 
01164     recvbuf_.consume(1 + sdnv_len);
01165 
01166     if (inflight_.empty()) {
01167         log_err("protocol error: got ack segment with no inflight bundle");
01168         break_contact(ContactEvent::CL_ERROR);
01169         return false;
01170     }
01171 
01172     InFlightBundle* inflight = inflight_.front();
01173 
01174     size_t ack_begin;
01175     DataBitmap::iterator i = inflight->ack_data_.begin();
01176     if (i == inflight->ack_data_.end()) {
01177         ack_begin = 0;
01178     } else {
01179         i.skip_contiguous();
01180         ack_begin = *i + 1;
01181     }
01182 
01183     if (acked_len < ack_begin) {
01184         log_err("protocol error: got ack for length %u but already acked up to %zu",
01185                 acked_len, ack_begin);
01186         break_contact(ContactEvent::CL_ERROR);
01187         return false;
01188     }
01189     
01190     inflight->ack_data_.set(0, acked_len);
01191 
01192     // now check if this was the last ack for the bundle, in which
01193     // case we can pop it off the list and post a
01194     // BundleTransmittedEvent
01195     if (acked_len == inflight->total_length_) {
01196         log_debug("handle_ack_segment: got final ack for %zu byte range -- "
01197                   "acked_len %u, ack_data *%p",
01198                   (size_t)acked_len - ack_begin,
01199                   acked_len, &inflight->ack_data_);
01200 
01201         BundleDaemon::post(
01202             new BundleTransmittedEvent(inflight->bundle_.object(),
01203                                        contact_,
01204                                        contact_->link(),
01205                                        inflight->sent_data_.num_contiguous(),
01206                                        inflight->ack_data_.num_contiguous()));
01207 
01208         // might delete inflight
01209         check_completed(inflight);
01210         
01211     } else {
01212         log_debug("handle_ack_segment: "
01213                   "got acked_len %u (%zu byte range) -- ack_data *%p",
01214                   acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
01215     }
01216 
01217     return true;
01218 }
01219 
01220 //----------------------------------------------------------------------
01221 bool
01222 StreamConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
01223 {
01224     (void)flags;
01225     log_debug("got refuse_bundle message");
01226     log_err("REFUSE_BUNDLE not implemented");
01227     break_contact(ContactEvent::CL_ERROR);
01228     return true;
01229 }
01230 //----------------------------------------------------------------------
01231 bool
01232 StreamConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
01233 {
01234     (void)flags;
01235     log_debug("got keepalive message");
01236     recvbuf_.consume(1);
01237     return true;
01238 }
01239 
01240 //----------------------------------------------------------------------
01241 void
01242 StreamConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
01243 {
01244     // it's possible that we can end up calling break_contact multiple
01245     // times, if for example we have an error when sending out the
01246     // shutdown message below. we simply ignore the multiple calls
01247     if (breaking_contact_) {
01248         return;
01249     }
01250     breaking_contact_ = true;
01251     
01252     // we can only send a shutdown byte if we're not in the middle
01253     // of sending a segment, otherwise the shutdown byte could be
01254     // interpreted as a part of the payload
01255     bool send_shutdown = false;
01256     shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
01257 
01258     switch (reason) {
01259     case ContactEvent::USER:
01260         // if the user is closing this link, we say that we're busy
01261         send_shutdown = true;
01262         shutdown_reason = SHUTDOWN_BUSY;
01263         break;
01264         
01265     case ContactEvent::IDLE:
01266         // if we're idle, indicate as such
01267         send_shutdown = true;
01268         shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
01269         break;
01270         
01271     case ContactEvent::SHUTDOWN:
01272         // if the other side shuts down first, we send the
01273         // corresponding SHUTDOWN byte for a clean handshake, but
01274         // don't give any more reason
01275         send_shutdown = true;
01276         break;
01277         
01278     case ContactEvent::BROKEN:
01279     case ContactEvent::CL_ERROR:
01280         // no shutdown 
01281         send_shutdown = false;
01282         break;
01283 
01284     case ContactEvent::CL_VERSION:
01285         // version mismatch
01286         send_shutdown = true;
01287         shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
01288         break;
01289         
01290     case ContactEvent::INVALID:
01291     case ContactEvent::NO_INFO:
01292     case ContactEvent::RECONNECT:
01293     case ContactEvent::TIMEOUT:
01294     case ContactEvent::UNBLOCKED:
01295         NOTREACHED;
01296         break;
01297     }
01298 
01299     // of course, we can't send anything if we were interrupted in the
01300     // middle of sending a block.
01301     //
01302     // XXX/demmer if we receive a SHUTDOWN byte from the other side,
01303     // we don't have any way of continuing to transmit our own blocks
01304     // and then shut down afterwards
01305     if (send_shutdown && 
01306         sendbuf_.fullbytes() == 0 &&
01307         send_segment_todo_ == 0)
01308     {
01309         log_debug("break_contact: sending shutdown");
01310         char typecode = SHUTDOWN;
01311         if (shutdown_reason != SHUTDOWN_NO_REASON) {
01312             typecode |= SHUTDOWN_HAS_REASON;
01313         }
01314 
01315         // XXX/demmer should we send a reconnect delay??
01316 
01317         *sendbuf_.end() = typecode;
01318         sendbuf_.fill(1);
01319 
01320         if (shutdown_reason != SHUTDOWN_NO_REASON) {
01321             *sendbuf_.end() = shutdown_reason;
01322             sendbuf_.fill(1);
01323         }
01324 
01325         send_data();
01326     }
01327         
01328     CLConnection::break_contact(reason);
01329 }
01330 
01331 //----------------------------------------------------------------------
01332 bool
01333 StreamConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
01334 {
01335     log_debug("got SHUTDOWN byte");
01336     size_t shutdown_len = 1;
01337 
01338     if (flags & SHUTDOWN_HAS_REASON)
01339     {
01340         shutdown_len += 1;
01341     }
01342 
01343     if (flags & SHUTDOWN_HAS_DELAY)
01344     {
01345         shutdown_len += 2;
01346     }
01347 
01348     if (recvbuf_.tailbytes() < shutdown_len)
01349     {
01350         // rare case where there's not enough data in the buffer
01351         // to handle the shutdown message data
01352         return false; 
01353     }
01354 
01355     // now handle the message, first skipping the typecode byte
01356     recvbuf_.consume(1);
01357 
01358     shutdown_reason_t reason = SHUTDOWN_NO_REASON;
01359     if (flags & SHUTDOWN_HAS_REASON)
01360     {
01361         switch (*recvbuf_.start()) {
01362         case SHUTDOWN_NO_REASON:
01363             reason = SHUTDOWN_NO_REASON;
01364             break;
01365         case SHUTDOWN_IDLE_TIMEOUT:
01366             reason = SHUTDOWN_IDLE_TIMEOUT;
01367             break;
01368         case SHUTDOWN_VERSION_MISMATCH:
01369             reason = SHUTDOWN_VERSION_MISMATCH;
01370             break;
01371         case SHUTDOWN_BUSY:
01372             reason = SHUTDOWN_BUSY;
01373             break;
01374         default:
01375             log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
01376         }
01377 
01378         recvbuf_.consume(1);
01379     }
01380 
01381     u_int16_t delay = 0;
01382     if (flags & SHUTDOWN_HAS_DELAY)
01383     {
01384         memcpy(&delay, recvbuf_.start(), 2);
01385         delay = ntohs(delay);
01386         recvbuf_.consume(2);
01387     }
01388 
01389     log_info("got SHUTDOWN (%s) [reconnect delay %u]",
01390              shutdown_reason_to_str(reason), delay);
01391 
01392     break_contact(ContactEvent::SHUTDOWN);
01393     
01394     return false;
01395 }
01396 
01397 } // namespace dtn

Generated on Sat Sep 8 08:43:34 2007 for DTN Reference Implementation by  doxygen 1.5.3