FileConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <sys/types.h>
00019 #include <sys/stat.h>
00020 #include <dirent.h>
00021 #include <errno.h>
00022 #include <fcntl.h>
00023 #include <unistd.h>
00024 #include <netinet/in.h>
00025 
00026 #include <oasys/io/IO.h>
00027 #include <oasys/util/StringBuffer.h>
00028 #include <oasys/util/URL.h>
00029 
00030 #include "FileConvergenceLayer.h"
00031 #include "bundling/Bundle.h"
00032 #include "bundling/BundleEvent.h"
00033 #include "bundling/BundleList.h"
00034 #include "bundling/BundleProtocol.h"
00035 #include "bundling/BundleDaemon.h"
00036 
00037 namespace dtn {
00038 
00039 /******************************************************************************
00040  *
00041  * FileConvergenceLayer
00042  *
00043  *****************************************************************************/
00044 FileConvergenceLayer::FileConvergenceLayer()
00045     : ConvergenceLayer("FileConvergenceLayer", "file")
00046 {
00047 }
00048 
00052 bool
00053 FileConvergenceLayer::extract_dir(const char* nexthop, std::string* dirp)
00054 {
00055 
00056     PANIC("XXX/demmer fix this implementation");
00057     
00058     oasys::URL url(nexthop);
00059     
00060     if (! url.valid()) {
00061         log_err("extract_dir: next hop ssp '%s' not a valid url", nexthop);
00062         return false;
00063     }
00064 
00065     // the ssp part of the URL should be of the form:
00066     // /path1/path2
00067 
00068     // validate that the "host" part of the url is empty, i.e. that
00069     // the filesystem path is absolute
00070     if (url.host_.length() != 0) {
00071         log_err("interface eid '%s' specifies a non-absolute path",
00072                 nexthop);
00073         return false;
00074     }
00075 
00076     // and make sure there wasn't a port that was parsed out
00077     if (url.port_ != 0) {
00078         log_err("interface eid '%s' specifies a port", nexthop);
00079         return false;
00080     }
00081 
00082     dirp->assign("/");
00083     dirp->append(url.path_);
00084     return true;
00085 }
00086 
00091 bool
00092 FileConvergenceLayer::validate_dir(const std::string& dir)
00093 {
00094     struct stat st;
00095     if (stat(dir.c_str(), &st) != 0) {
00096         log_err("error running stat on %s: %s", dir.c_str(), strerror(errno));
00097         return false;
00098     }
00099 
00100     if (!S_ISDIR(st.st_mode)) {
00101         log_err("error: %s not a directory", dir.c_str());
00102         return false;
00103     }
00104 
00105     // XXX/demmer check permissions
00106 
00107     return true;
00108 }
00109 
00113 bool
00114 FileConvergenceLayer::interface_up(Interface* iface,
00115                                    int argc, const char* argv[])
00116 {
00117     (void)iface;
00118     (void)argc;
00119     (void)argv;
00120     
00121     NOTIMPLEMENTED;
00122     
00123 //     // parse out the directory from the interface
00124 //     std::string dir;
00125 //     if (!extract_dir(iface->eid().c_str(), &dir)) {
00126 //         return false;
00127 //     }
00128     
00129 //     // make sure the directory exists and is readable / executable
00130 //     if (!validate_dir(dir)) {
00131 //         return false;
00132 //     }
00133     
00134 //     // XXX/demmer parse argv for frequency
00135 //     int secs_per_scan = 5;
00136 
00137 //     // create a new thread to scan for new bundle files
00138 //     Scanner* scanner = new Scanner(secs_per_scan, dir);
00139 //     scanner->start();
00140 
00141 //     // store the new scanner in the cl specific part of the interface
00142 //     iface->set_cl_info(scanner);
00143 
00144     
00145     return true;
00146 }
00147 
00151 bool
00152 FileConvergenceLayer::interface_down(Interface* iface)
00153 {
00154     CLInfo *cli = iface->cl_info();
00155     Scanner *scanner = (Scanner *)cli;
00156     scanner->stop();
00157 
00158     // We cannot "delete scanner;" because it is still running
00159     // right now. oasys::Thread::thread_run deletes the Scanner object
00160     // when Scanner::run() returns 
00161 
00162     return true;
00163 }
00164  
00168 bool
00169 FileConvergenceLayer::open_contact(const ContactRef& contact)
00170 {
00171     (void)contact;
00172     // XXX/demmer fixme
00173     
00174     // parse out the directory from the contact
00175 //     std::string dir;
00176 //     if (!extract_dir(contact->nexthop(), &dir)) {
00177 //         return false;
00178 //     }
00179     
00180 //     // make sure the directory exists and is readable / executable
00181 //     if (!validate_dir(dir)) {
00182 //         return false;
00183 //     }
00184 
00185     return true;
00186 }
00187 
00191 bool
00192 FileConvergenceLayer::close_contact(const ContactRef& contact)
00193 {
00194     (void)contact;
00195     // nothing to do
00196     return true;
00197 }
00198     
00202 void
00203 FileConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00204 {
00205     (void)contact;
00206     (void)bundle;
00207     
00208     // XXX/demmer fix this at some point
00209     NOTIMPLEMENTED;
00210 
00211 #ifdef notimplemented
00212     std::string dir;
00213     if (!extract_dir(contact->nexthop(), &dir)) {
00214         PANIC("contact should have already been validated");
00215     }
00216 
00217     FileHeader filehdr;
00218     int iovcnt = BundleProtocol::MAX_IOVCNT + 2;
00219     struct iovec iov[iovcnt];
00220 
00221     filehdr.version = CURRENT_VERSION;
00222     
00223     oasys::StringBuffer fname("%s/bundle-XXXXXX", dir.c_str());
00224     
00225     iov[0].iov_base = (char*)&filehdr;
00226     iov[0].iov_len  = sizeof(FileHeader);
00227 
00228     // fill in the bundle header portion
00229     u_int16_t header_len =
00230         BundleProtocol::format_header_blocks(bundle, &iov[1], &iovcnt);
00231 
00232     // fill in the file header
00233     size_t payload_len = bundle->payload_.length();
00234     filehdr.header_length = htons(header_len);
00235     filehdr.bundle_length = htonl(header_len + payload_len);
00236 
00237     // and tack on the payload (adding one to iovcnt for the
00238     // FileHeader, then one for the payload)
00239     iovcnt++;
00240     PANIC("XXX/demmer fix me");
00241     //iov[iovcnt].iov_base = (void*)bundle->payload_.data();
00242     iov[iovcnt].iov_len  = payload_len;
00243     iovcnt++;
00244 
00245     // open the bundle file 
00246     int fd = mkstemp(fname.c_str());
00247     if (fd == -1) {
00248         log_err("error opening temp file in %s: %s",
00249                 fname.c_str(), strerror(errno));
00250         // XXX/demmer report error here?
00251         return;
00252     }
00253 
00254     log_debug("opened temp file %s for bundle id %d "
00255               "fd %d header_length %zu payload_length %zu",
00256               fname.c_str(), bundle->bundleid_, fd,
00257               header_len, payload_len);
00258 
00259     // now write everything out
00260     int total = sizeof(FileHeader) + header_len + payload_len;
00261     int cc = oasys::IO::writevall(fd, iov, iovcnt, logpath_);
00262     if (cc != total) {
00263         log_err("error writing out bundle (wrote %d/%d): %s",
00264                 cc, total, strerror(errno));
00265     }
00266 
00267     // free up the iovec data
00268     BundleProtocol::free_header_iovmem(bundle, &iov[1], iovcnt - 2);
00269         
00270     // close the file descriptor
00271     close(fd);
00272 
00273     // cons up a transmission event and pass it to the router
00274     bool acked = false;
00275     // XXX/demmer total_len
00276     BundleDaemon::post(
00277         new BundleTransmittedEvent(bundle, contact, total_len, acked));
00278         
00279     log_debug("bundle id %d successfully transmitted", bundle->bundleid_);
00280 #endif // notimplemented
00281 }
00282 
00283 /******************************************************************************
00284  *
00285  * FileConvergenceLayer::Scanner
00286  *
00287  *****************************************************************************/
00288 FileConvergenceLayer::Scanner::Scanner(int secs_per_scan, 
00289                                        const std::string& dir)
00290     : Logger("FileConvergenceLayer::Scanner",
00291              "/dtn/cl/file/scanner"), 
00292       Thread("FileConvergenceLayer::Scanner"), 
00293       secs_per_scan_(secs_per_scan), 
00294       dir_(dir), 
00295       run_(true)
00296 {
00297     set_flag(DELETE_ON_EXIT);
00298 }
00299 
00303 void
00304 FileConvergenceLayer::Scanner::run()
00305 {
00306     // XXX/demmer fix me 
00307     NOTIMPLEMENTED;
00308     
00309     /*
00310     FileHeader filehdr;
00311     DIR* dir = opendir(dir_.c_str());
00312     struct dirent* dirent;
00313     const char* fname;
00314     u_char* buf;
00315     int fd;
00316 
00317     if (!dir) {
00318         // XXX/demmer signal cl somehow?
00319         log_err("error in opendir");
00320         return;
00321     }
00322 
00323     while (run_) {
00324         seekdir(dir, 0);
00325 
00326         while ((dirent = readdir(dir)) != 0) {
00327             fname = dirent->d_name;
00328 
00329             if ((fname[0] == '.') &&
00330                 ((fname[1] == '\0') ||
00331                  (fname[1] == '.' && fname[2] == '\0')))
00332             {
00333                 continue;
00334             }
00335             
00336             log_debug("scan found file %s", fname);
00337 
00338             // cons up the full path
00339             oasys::StringBuffer path("%s/%s", dir_.c_str(), fname);
00340 
00341             // malloc a buffer for it, open a file descriptor, and
00342             // read in the header
00343             if ((fd = open(path.c_str(), 0)) == -1) {
00344                 log_err("error opening file %s: %s", path.c_str(), strerror(errno));
00345                 continue;
00346             }
00347 
00348             int cc = oasys::IO::readall(fd, (char*)&filehdr, sizeof(FileHeader));
00349             if (cc != sizeof(FileHeader)) {
00350                 log_warn("can't read in FileHeader (read %d/%zu): %s",
00351                          cc, sizeof(FileHeader), strerror(errno));
00352                 continue;
00353             }
00354 
00355             if (filehdr.version != CURRENT_VERSION) {
00356                 log_warn("framing protocol version mismatch: %d != current %d",
00357                          filehdr.version, CURRENT_VERSION);
00358                 continue;
00359             }
00360 
00361             u_int16_t header_len = ntohs(filehdr.header_length);
00362             size_t bundle_len = ntohl(filehdr.bundle_length);
00363             
00364             log_debug("found bundle file %s: header_length %u bundle_length %zu",
00365                       path.c_str(), header_len, bundle_len);
00366 
00367             // read in and parse the headers
00368             buf = (u_char*)malloc(header_len);
00369             cc = oasys::IO::readall(fd, (char*)buf, header_len);
00370             if (cc != header_len) {
00371                 log_err("error reading file %s header (read %d/%d): %s",
00372                         path.c_str(), cc, header_len, strerror(errno));
00373                 free(buf);
00374                 continue;
00375             }
00376 
00377             Bundle* bundle = new Bundle();
00378             if (! BundleProtocol::parse_header_blocks(bundle, buf, header_len)) {
00379                 log_err("error parsing bundle headers in file %s", path.c_str());
00380                 free(buf);
00381                 delete bundle;
00382                 continue;
00383             }
00384             free(buf);
00385 
00386             // Now validate the lengths
00387             size_t payload_len = bundle->payload_.length();
00388             if (bundle_len != header_len + payload_len) {
00389                 log_err("error in bundle lengths in file %s: "
00390                         "bundle_length %zu, header_length %u, payload_length %zu",
00391                         path.c_str(), bundle_len, header_len, payload_len);
00392                 delete bundle;
00393                 continue;
00394             }
00395 
00396             // Looks good, now read in and assign the data
00397             buf = (u_char*)malloc(payload_len);
00398             cc = oasys::IO::readall(fd, (char*)buf, payload_len);
00399             if (cc != (int)payload_len) {
00400                 log_err("error reading file %s payload (read %d/%zu): %s",
00401                         path.c_str(), cc, payload_len, strerror(errno));
00402                 delete bundle;
00403                 continue;
00404             }
00405             bundle->payload_.set_data(buf, payload_len);
00406             free(buf);
00407             
00408             // close the file descriptor and remove the file
00409             if (close(fd) != 0) {
00410                 log_err("error closing file %s: %s",
00411                         path.c_str(), strerror(errno));
00412             }
00413             
00414             if (unlink(path.c_str()) != 0) {
00415                 log_err("error removing file %s: %s",
00416                         path.c_str(), strerror(errno));
00417             }
00418 
00419             // all set, notify the router
00420             // XXX/demmer need length here
00421             BundleDaemon::post(
00422                 new BundleReceivedEvent(bundle, EVENTSRC_PEER));
00423         }
00424             
00425         sleep(secs_per_scan_);
00426     }
00427     */
00428     log_info("exiting");
00429 }
00430 
00434 void FileConvergenceLayer::Scanner::stop() {
00435     run_ = false;
00436 }
00437 
00438 } // namespace dtn

Generated on Sat Sep 8 08:43:27 2007 for DTN Reference Implementation by  doxygen 1.5.3