00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <sys/poll.h>
00022 #include <stdlib.h>
00023
00024 #include <oasys/io/NetUtils.h>
00025 #include <oasys/util/OptParser.h>
00026 #include <oasys/util/HexDumpBuffer.h>
00027
00028 #include "TCPConvergenceLayer.h"
00029 #include "IPConvergenceLayerUtils.h"
00030 #include "bundling/BundleDaemon.h"
00031 #include "contacts/ContactManager.h"
00032
00033 namespace dtn {
00034
00035 TCPConvergenceLayer::TCPLinkParams
00036 TCPConvergenceLayer::default_link_params_(true);
00037
00038
00039 TCPConvergenceLayer::TCPLinkParams::TCPLinkParams(bool init_defaults)
00040 : StreamLinkParams(init_defaults),
00041 hexdump_(false),
00042 local_addr_(INADDR_ANY),
00043 remote_addr_(INADDR_NONE),
00044 remote_port_(TCPCL_DEFAULT_PORT)
00045 {
00046 }
00047
00048
00049 TCPConvergenceLayer::TCPConvergenceLayer()
00050 : StreamConvergenceLayer("TCPConvergenceLayer", "tcp", TCPCL_VERSION)
00051 {
00052 }
00053
00054
00055 ConnectionConvergenceLayer::LinkParams*
00056 TCPConvergenceLayer::new_link_params()
00057 {
00058 return new TCPLinkParams(default_link_params_);
00059 }
00060
00061
00062 bool
00063 TCPConvergenceLayer::parse_link_params(LinkParams* lparams,
00064 int argc, const char** argv,
00065 const char** invalidp)
00066 {
00067 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00068 ASSERT(params != NULL);
00069
00070 oasys::OptParser p;
00071
00072 p.addopt(new oasys::BoolOpt("hexdump", ¶ms->hexdump_));
00073 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
00074
00075 int count = p.parse_and_shift(argc, argv, invalidp);
00076 if (count == -1) {
00077 return false;
00078 }
00079 argc -= count;
00080
00081 if (params->local_addr_ == INADDR_NONE) {
00082 log_err("invalid local address setting of INADDR_NONE");
00083 return false;
00084 }
00085
00086
00087 return StreamConvergenceLayer::parse_link_params(lparams, argc, argv,
00088 invalidp);
00089 }
00090
00091
00092 void
00093 TCPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
00094 {
00095 ASSERT(link != NULL);
00096 ASSERT(!link->isdeleted());
00097 ASSERT(link->cl_info() != NULL);
00098
00099 StreamConvergenceLayer::dump_link(link, buf);
00100
00101 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(link->cl_info());
00102 ASSERT(params != NULL);
00103
00104 buf->appendf("local_addr: %s\n", intoa(params->local_addr_));
00105 buf->appendf("remote_addr: %s\n", intoa(params->remote_addr_));
00106 buf->appendf("remote_port: %d\n", params->remote_port_);
00107 }
00108
00109
00110 bool
00111 TCPConvergenceLayer::set_link_defaults(int argc, const char* argv[],
00112 const char** invalidp)
00113 {
00114 return parse_link_params(&default_link_params_, argc, argv, invalidp);
00115 }
00116
00117
00118 bool
00119 TCPConvergenceLayer::parse_nexthop(const LinkRef& link, LinkParams* lparams)
00120 {
00121 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00122 ASSERT(params != NULL);
00123
00124 if (params->remote_addr_ == INADDR_NONE || params->remote_port_ == 0)
00125 {
00126 if (! IPConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(),
00127 ¶ms->remote_addr_,
00128 ¶ms->remote_port_)) {
00129 return false;
00130 }
00131 }
00132
00133 if (params->remote_addr_ == INADDR_ANY ||
00134 params->remote_addr_ == INADDR_NONE)
00135 {
00136 log_warn("can't lookup hostname in next hop address '%s'",
00137 link->nexthop());
00138 return false;
00139 }
00140
00141
00142 if (params->remote_port_ == 0) {
00143 params->remote_port_ = TCPCL_DEFAULT_PORT;
00144 }
00145
00146 return true;
00147 }
00148
00149
00150 CLConnection*
00151 TCPConvergenceLayer::new_connection(const LinkRef& link, LinkParams* p)
00152 {
00153 (void)link;
00154 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(p);
00155 ASSERT(params != NULL);
00156 return new Connection(this, params);
00157 }
00158
00159
00160 bool
00161 TCPConvergenceLayer::interface_up(Interface* iface,
00162 int argc, const char* argv[])
00163 {
00164 log_debug("adding interface %s", iface->name().c_str());
00165 in_addr_t local_addr = INADDR_ANY;
00166 u_int16_t local_port = TCPCL_DEFAULT_PORT;
00167
00168 oasys::OptParser p;
00169 p.addopt(new oasys::InAddrOpt("local_addr", &local_addr));
00170 p.addopt(new oasys::UInt16Opt("local_port", &local_port));
00171
00172 const char* invalid = NULL;
00173 if (! p.parse(argc, argv, &invalid)) {
00174 log_err("error parsing interface options: invalid option '%s'",
00175 invalid);
00176 return false;
00177 }
00178
00179
00180 if (local_addr == INADDR_NONE) {
00181 log_err("invalid local address setting of INADDR_NONE");
00182 return false;
00183 }
00184
00185 if (local_port == 0) {
00186 log_err("invalid local port setting of 0");
00187 return false;
00188 }
00189
00190
00191 Listener* listener = new Listener(this);
00192 listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00193
00194 int ret = listener->bind(local_addr, local_port);
00195
00196
00197
00198 if (ret != 0 && errno == EADDRINUSE) {
00199 listener->logf(oasys::LOG_WARN,
00200 "WARNING: error binding to requested socket: %s",
00201 strerror(errno));
00202 listener->logf(oasys::LOG_WARN,
00203 "waiting for 10 seconds then trying again");
00204 sleep(10);
00205
00206 ret = listener->bind(local_addr, local_port);
00207 }
00208
00209 if (ret != 0) {
00210 return false;
00211 }
00212
00213
00214 listener->listen();
00215 listener->start();
00216
00217
00218
00219 iface->set_cl_info(listener);
00220
00221 return true;
00222 }
00223
00224
00225 bool
00226 TCPConvergenceLayer::interface_down(Interface* iface)
00227 {
00228 Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00229 ASSERT(listener != NULL);
00230 listener->stop();
00231 delete listener;
00232 return true;
00233 }
00234
00235
00236 void
00237 TCPConvergenceLayer::dump_interface(Interface* iface,
00238 oasys::StringBuffer* buf)
00239 {
00240 Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00241 ASSERT(listener != NULL);
00242
00243 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00244 intoa(listener->local_addr()), listener->local_port());
00245 }
00246
00247
00248 TCPConvergenceLayer::Listener::Listener(TCPConvergenceLayer* cl)
00249 : TCPServerThread("TCPConvergenceLayer::Listener",
00250 "/dtn/cl/tcp/listener"),
00251 cl_(cl)
00252 {
00253 logfd_ = false;
00254 }
00255
00256
00257 void
00258 TCPConvergenceLayer::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00259 {
00260 log_debug("new connection from %s:%d", intoa(addr), port);
00261
00262 Connection* conn =
00263 new Connection(cl_, &TCPConvergenceLayer::default_link_params_,
00264 fd, addr, port);
00265 conn->start();
00266 }
00267
00268
00269 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00270 TCPLinkParams* params)
00271 : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00272 cl->logpath(), cl, params,
00273 true )
00274 {
00275 logpathf("%s/conn/%p", cl->logpath(), this);
00276
00277
00278 oasys::StringBuffer nexthop("%s:%d",
00279 intoa(params->remote_addr_),
00280 params->remote_port_);
00281 set_nexthop(nexthop.c_str());
00282
00283
00284 sock_ = new oasys::TCPClient(logpath_);
00285
00286
00287
00288
00289
00290
00291 sock_->logpathf("%s/sock", logpath_);
00292 sock_->set_logfd(false);
00293
00294 sock_->init_socket();
00295 sock_->set_nonblocking(true);
00296
00297
00298
00299
00300 if (params->local_addr_ != INADDR_ANY)
00301 {
00302 if (sock_->bind(params->local_addr_, 0) != 0) {
00303 log_err("error binding to %s: %s",
00304 intoa(params->local_addr_),
00305 strerror(errno));
00306 }
00307 }
00308 }
00309
00310
00311 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00312 TCPLinkParams* params,
00313 int fd,
00314 in_addr_t remote_addr,
00315 u_int16_t remote_port)
00316 : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00317 cl->logpath(), cl, params,
00318 false )
00319 {
00320 logpathf("%s/conn/%p", cl->logpath(), this);
00321
00322
00323 oasys::StringBuffer nexthop("%s:%d", intoa(remote_addr), remote_port);
00324 set_nexthop(nexthop.c_str());
00325
00326 sock_ = new oasys::TCPClient(fd, remote_addr, remote_port, logpath_);
00327 sock_->set_logfd(false);
00328 sock_->set_nonblocking(true);
00329 }
00330
00331
00332 TCPConvergenceLayer::Connection::~Connection()
00333 {
00334 delete sock_;
00335 }
00336
00337
00338 void
00339 TCPConvergenceLayer::Connection::serialize(oasys::SerializeAction *a)
00340 {
00341 TCPLinkParams *params = tcp_lparams();
00342 if (! params) return;
00343
00344 a->process("hexdump", ¶ms->hexdump_);
00345 a->process("local_addr", oasys::InAddrPtr(¶ms->local_addr_));
00346 a->process("remote_addr", oasys::InAddrPtr(¶ms->remote_addr_));
00347 a->process("remote_port", ¶ms->remote_port_);
00348
00349
00350 a->process("segment_ack_enabled", ¶ms->segment_ack_enabled_);
00351 a->process("negative_ack_enabled", ¶ms->negative_ack_enabled_);
00352 a->process("keepalive_interval", ¶ms->keepalive_interval_);
00353 a->process("segment_length", ¶ms->segment_length_);
00354
00355
00356 a->process("reactive_frag_enabled", ¶ms->reactive_frag_enabled_);
00357 a->process("sendbuf_length", ¶ms->sendbuf_len_);
00358 a->process("recvbuf_length", ¶ms->recvbuf_len_);
00359 a->process("data_timeout", ¶ms->data_timeout_);
00360 }
00361
00362
00363 void
00364 TCPConvergenceLayer::Connection::initialize_pollfds()
00365 {
00366 sock_pollfd_ = &pollfds_[0];
00367 num_pollfds_ = 1;
00368
00369 sock_pollfd_->fd = sock_->fd();
00370 sock_pollfd_->events = POLLIN;
00371
00372 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00373 ASSERT(params != NULL);
00374
00375 poll_timeout_ = params->data_timeout_;
00376
00377 if (params->keepalive_interval_ != 0 &&
00378 (params->keepalive_interval_ * 1000) < params->data_timeout_)
00379 {
00380 poll_timeout_ = params->keepalive_interval_ * 1000;
00381 }
00382 }
00383
00384
00385 void
00386 TCPConvergenceLayer::Connection::connect()
00387 {
00388
00389
00390 if (! cl_->parse_nexthop(contact_->link(), params_)) {
00391 log_info("can't resolve nexthop address '%s'",
00392 contact_->link()->nexthop());
00393 break_contact(ContactEvent::BROKEN);
00394 return;
00395 }
00396
00397
00398 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00399 ASSERT(params != NULL);
00400 sock_->set_remote_addr(params->remote_addr_);
00401 sock_->set_remote_port(params->remote_port_);
00402
00403
00404
00405
00406 log_debug("connect: connecting to %s:%d...",
00407 intoa(sock_->remote_addr()), sock_->remote_port());
00408 ASSERT(contact_ == NULL || contact_->link()->isopening());
00409 ASSERT(sock_->state() != oasys::IPSocket::ESTABLISHED);
00410 int ret = sock_->connect(sock_->remote_addr(), sock_->remote_port());
00411
00412 if (ret == 0) {
00413 log_debug("connect: succeeded immediately");
00414 ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00415
00416 initiate_contact();
00417
00418 } else if (ret == -1 && errno == EINPROGRESS) {
00419 log_debug("connect: EINPROGRESS returned, waiting for write ready");
00420 sock_pollfd_->events |= POLLOUT;
00421
00422 } else {
00423 log_info("connection attempt to %s:%d failed... %s",
00424 intoa(sock_->remote_addr()), sock_->remote_port(),
00425 strerror(errno));
00426 break_contact(ContactEvent::BROKEN);
00427 }
00428 }
00429
00430
00431 void
00432 TCPConvergenceLayer::Connection::accept()
00433 {
00434 ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00435
00436 log_debug("accept: got connection from %s:%d...",
00437 intoa(sock_->remote_addr()), sock_->remote_port());
00438 initiate_contact();
00439 }
00440
00441
00442 void
00443 TCPConvergenceLayer::Connection::disconnect()
00444 {
00445 if (sock_->state() != oasys::IPSocket::CLOSED) {
00446 sock_->close();
00447 }
00448 }
00449
00450
00451 void
00452 TCPConvergenceLayer::Connection::handle_poll_activity()
00453 {
00454 if (sock_pollfd_->revents & POLLHUP) {
00455 log_info("remote socket closed connection -- returned POLLHUP");
00456 break_contact(ContactEvent::BROKEN);
00457 return;
00458 }
00459
00460 if (sock_pollfd_->revents & POLLERR) {
00461 log_info("error condition on remote socket -- returned POLLERR");
00462 break_contact(ContactEvent::BROKEN);
00463 return;
00464 }
00465
00466
00467
00468
00469 if (sock_pollfd_->revents & POLLOUT)
00470 {
00471 log_debug("poll returned write ready, clearing POLLOUT bit");
00472 sock_pollfd_->events &= ~POLLOUT;
00473
00474 if (sock_->state() == oasys::IPSocket::CONNECTING) {
00475 int result = sock_->async_connect_result();
00476 if (result == 0 && sendbuf_.fullbytes() == 0) {
00477 log_debug("delayed_connect to %s:%d succeeded",
00478 intoa(sock_->remote_addr()), sock_->remote_port());
00479 initiate_contact();
00480
00481 } else {
00482 log_info("connection attempt to %s:%d failed... %s",
00483 intoa(sock_->remote_addr()), sock_->remote_port(),
00484 strerror(errno));
00485 break_contact(ContactEvent::BROKEN);
00486 }
00487
00488 return;
00489 }
00490
00491 send_data();
00492 }
00493
00494
00495 if (contact_broken_)
00496 {
00497 return;
00498 }
00499
00500
00501 if (sock_pollfd_->revents & POLLIN) {
00502 recv_data();
00503 process_data();
00504
00505
00506
00507 if (recvbuf_.tailbytes() == 0) {
00508 log_err("process_data left no space in recvbuf!!");
00509 }
00510
00511 if (contact_up_ && ! contact_broken_) {
00512 check_keepalive();
00513 }
00514
00515 }
00516
00517 }
00518
00519
00520 void
00521 TCPConvergenceLayer::Connection::send_data()
00522 {
00523
00524
00525
00526 ASSERT(! contact_broken_);
00527
00528 u_int towrite = sendbuf_.fullbytes();
00529 if (params_->test_write_limit_ != 0) {
00530 towrite = std::min(towrite, params_->test_write_limit_);
00531 }
00532
00533 log_debug("send_data: trying to drain %u bytes from send buffer...",
00534 towrite);
00535 ASSERT(towrite > 0);
00536
00537 int cc = sock_->write(sendbuf_.start(), towrite);
00538 if (cc > 0) {
00539 log_debug("send_data: wrote %d/%zu bytes from send buffer",
00540 cc, sendbuf_.fullbytes());
00541 if (tcp_lparams()->hexdump_) {
00542 oasys::HexDumpBuffer hex;
00543 hex.append((u_char*)sendbuf_.start(), cc);
00544 log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
00545 }
00546
00547 sendbuf_.consume(cc);
00548
00549 if (sendbuf_.fullbytes() != 0) {
00550 log_debug("send_data: incomplete write, setting POLLOUT bit");
00551 sock_pollfd_->events |= POLLOUT;
00552
00553 } else {
00554 if (sock_pollfd_->events & POLLOUT) {
00555 log_debug("send_data: drained buffer, clearing POLLOUT bit");
00556 sock_pollfd_->events &= ~POLLOUT;
00557 }
00558 }
00559 } else if (errno == EWOULDBLOCK) {
00560 log_debug("send_data: write returned EWOULDBLOCK, setting POLLOUT bit");
00561 sock_pollfd_->events |= POLLOUT;
00562
00563 } else {
00564 log_info("send_data: remote connection unexpectedly closed: %s",
00565 strerror(errno));
00566 break_contact(ContactEvent::BROKEN);
00567 }
00568 }
00569
00570
00571 void
00572 TCPConvergenceLayer::Connection::recv_data()
00573 {
00574
00575
00576
00577 ASSERT(! contact_broken_);
00578
00579
00580 if (recvbuf_.tailbytes() == 0) {
00581 log_err("no space in receive buffer to accept data!!!");
00582 return;
00583 }
00584
00585 if (params_->test_read_delay_ != 0) {
00586 log_debug("recv_data: sleeping for test_read_delay msecs %u",
00587 params_->test_read_delay_);
00588
00589 usleep(params_->test_read_delay_ * 1000);
00590 }
00591
00592 u_int toread = recvbuf_.tailbytes();
00593 if (params_->test_read_limit_ != 0) {
00594 toread = std::min(toread, params_->test_read_limit_);
00595 }
00596
00597 log_debug("recv_data: draining up to %u bytes into recv buffer...", toread);
00598 int cc = sock_->read(recvbuf_.end(), toread);
00599 if (cc < 1) {
00600 log_info("remote connection unexpectedly closed");
00601 break_contact(ContactEvent::BROKEN);
00602 return;
00603 }
00604
00605 log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
00606 cc, recvbuf_.fullbytes());
00607 if (tcp_lparams()->hexdump_) {
00608 oasys::HexDumpBuffer hex;
00609 hex.append((u_char*)recvbuf_.end(), cc);
00610 log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
00611 }
00612 recvbuf_.fill(cc);
00613 }
00614
00615 }