00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <errno.h>
00022 #include <sys/types.h>
00023 #include <sys/stat.h>
00024 #include <oasys/debug/DebugUtils.h>
00025 #include <oasys/io/FileUtils.h>
00026 #include <oasys/thread/SpinLock.h>
00027 #include <oasys/util/ScratchBuffer.h>
00028 #include <oasys/util/StringBuffer.h>
00029
00030 #include "BundlePayload.h"
00031 #include "storage/BundleStore.h"
00032
00033 namespace dtn {
00034
00035
00036 bool BundlePayload::test_no_remove_ = false;
00037
00038
00039 BundlePayload::BundlePayload(oasys::SpinLock* lock)
00040 : Logger("BundlePayload", "/dtn/bundle/payload"),
00041 location_(DISK), length_(0),
00042 cur_offset_(0), base_offset_(0), lock_(lock)
00043 {
00044 }
00045
00046
00047 void
00048 BundlePayload::init(int bundleid, location_t location)
00049 {
00050 location_ = location;
00051
00052 logpathf("/dtn/bundle/payload/%d", bundleid);
00053
00054
00055 if (location == MEMORY || location == NODATA) {
00056 return;
00057 }
00058
00059
00060
00061 BundleStore* bs = BundleStore::instance();
00062
00063
00064
00065
00066
00067
00068
00069 if (bs->payload_dir() == "NO_PAYLOAD_FILES") {
00070 location_ = MEMORY;
00071 return;
00072 }
00073
00074 oasys::StringBuffer path("%s/bundle_%d.dat",
00075 bs->payload_dir().c_str(), bundleid);
00076
00077
00078 file_.logpathf("%s/file", logpath_);
00079
00080 int open_errno = 0;
00081 int err = file_.open(path.c_str(), O_EXCL | O_CREAT | O_RDWR,
00082 S_IRUSR | S_IWUSR, &open_errno);
00083
00084 if (err < 0 && open_errno == EEXIST)
00085 {
00086 log_err("payload file %s already exists: overwriting and retrying",
00087 path.c_str());
00088
00089 err = file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
00090 }
00091
00092 if (err < 0)
00093 {
00094 log_crit("error opening payload file %s: %s",
00095 path.c_str(), strerror(errno));
00096 return;
00097 }
00098
00099 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00100 if (fd != file_.fd()) {
00101 PANIC("duplicate entry in open fd cache");
00102 }
00103 unpin_file();
00104 }
00105
00106
00107 void
00108 BundlePayload::init_from_store(int bundleid)
00109 {
00110 location_ = DISK;
00111
00112 BundleStore* bs = BundleStore::instance();
00113 oasys::StringBuffer path("%s/bundle_%d.dat",
00114 bs->payload_dir().c_str(), bundleid);
00115
00116 file_.logpathf("%s/file", logpath_);
00117
00118 if (file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR) < 0)
00119 {
00120
00121
00122
00123
00124 log_crit("error opening payload file %s: %s",
00125 path.c_str(), strerror(errno));
00126 location_ = NODATA;
00127 return;
00128 }
00129
00130 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00131 if (fd != file_.fd()) {
00132 PANIC("duplicate entry in open fd cache");
00133 }
00134 unpin_file();
00135 }
00136
00137
00138 BundlePayload::~BundlePayload()
00139 {
00140 if (location_ == DISK && file_.is_open()) {
00141 BundleStore::instance()->payload_fdcache()->close(file_.path());
00142 file_.set_fd(-1);
00143
00144 if (!test_no_remove_)
00145 {
00146 file_.unlink();
00147 }
00148 }
00149 }
00150
00151
00152 void
00153 BundlePayload::serialize(oasys::SerializeAction* a)
00154 {
00155 a->process("length", (u_int32_t*)&length_);
00156 a->process("base_offset", (u_int32_t*)&base_offset_);
00157 }
00158
00159
00160 void
00161 BundlePayload::set_length(size_t length)
00162 {
00163 oasys::ScopeLock l(lock_, "BundlePayload::set_length");
00164 length_ = length;
00165 if (location_ == MEMORY) {
00166 data_.reserve(length);
00167 data_.set_len(length);
00168 }
00169 }
00170
00171
00172
00173 void
00174 BundlePayload::pin_file() const
00175 {
00176 if (location_ != DISK) {
00177 return;
00178 }
00179
00180 BundleStore* bs = BundleStore::instance();
00181 int fd = bs->payload_fdcache()->get_and_pin(file_.path());
00182
00183 if (fd == -1) {
00184 if (file_.reopen(O_RDWR) < 0) {
00185 log_err("error reopening file %s: %s",
00186 file_.path(), strerror(errno));
00187 return;
00188 }
00189
00190 cur_offset_ = 0;
00191
00192 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00193 if (fd != file_.fd()) {
00194 PANIC("duplicate entry in open fd cache");
00195 }
00196
00197 } else {
00198 ASSERT(fd == file_.fd());
00199 }
00200 }
00201
00202
00203 void
00204 BundlePayload::unpin_file() const
00205 {
00206 if (location_ != DISK) {
00207 return;
00208 }
00209
00210 BundleStore::instance()->payload_fdcache()->unpin(file_.path());
00211 }
00212
00213
00214 void
00215 BundlePayload::truncate(size_t length)
00216 {
00217 oasys::ScopeLock l(lock_, "BundlePayload::truncate");
00218
00219 ASSERT(length <= length_);
00220 length_ = length;
00221 cur_offset_ = length;
00222
00223 switch (location_) {
00224 case MEMORY:
00225 data_.set_len(length);
00226 break;
00227 case DISK:
00228 pin_file();
00229 file_.truncate(length);
00230 unpin_file();
00231 break;
00232 case NODATA:
00233 NOTREACHED;
00234 }
00235 }
00236
00237
00238 void
00239 BundlePayload::copy_file(oasys::FileIOClient* dst) const
00240 {
00241 ASSERT(location_ == DISK);
00242 pin_file();
00243 file_.lseek(0, SEEK_SET);
00244 file_.copy_contents(dst, length());
00245 unpin_file();
00246 }
00247
00248
00249 bool
00250 BundlePayload::replace_with_file(const char* path)
00251 {
00252 oasys::ScopeLock l(lock_, "BundlePayload::replace_with_file");
00253
00254 ASSERT(location_ == DISK);
00255 std::string payload_path = file_.path();
00256
00257
00258 BundleStore* bs = BundleStore::instance();
00259 bs->payload_fdcache()->close(file_.path());
00260 file_.unlink();
00261
00262
00263 int err = ::link(path, payload_path.c_str());
00264 if (err == 0) {
00265 log_debug("replace_with_file: successfully created link to %s", path);
00266
00267
00268
00269 file_.set_path(payload_path);
00270 if (file_.reopen(O_RDWR) < 0) {
00271 log_err("replace_with_file: error reopening file: %s",
00272 strerror(errno));
00273 return false;
00274 }
00275
00276 } else {
00277
00278 err = errno;
00279 if (err != EXDEV) {
00280 log_err("error linking to path '%s': %s", path, strerror(err));
00281 return false;
00282 }
00283
00284
00285 log_debug("replace_with_file: link failed: %s", strerror(err));
00286
00287 oasys::FileIOClient src;
00288 int fd = src.open(path, O_RDONLY, &err);
00289 if (fd < 0) {
00290 log_err("error opening path '%s' for reading: %s",
00291 path, strerror(err));
00292 return false;
00293 }
00294
00295 file_.set_path(payload_path);
00296 if (file_.reopen(O_RDWR | O_CREAT, S_IRUSR | S_IWUSR) < 0) {
00297 log_err("replace_with_file: error reopening file: %s",
00298 strerror(err));
00299 return false;
00300 }
00301
00302 src.copy_contents(&file_);
00303 src.close();
00304 }
00305
00306 set_length(oasys::FileUtils::size(file_.path()));
00307
00308
00309 ASSERT(file_.fd() != -1);
00310 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00311 if (fd != file_.fd()) {
00312 PANIC("duplicate entry in open fd cache");
00313 }
00314 unpin_file();
00315 return true;
00316 }
00317
00318
00319 void
00320 BundlePayload::internal_write(const u_char* bp, size_t offset, size_t len)
00321 {
00322
00323 if (location_ == DISK) {
00324 ASSERT(file_.is_open());
00325 }
00326 ASSERT(lock_->is_locked_by_me());
00327 ASSERT(length_ >= (offset + len));
00328
00329 switch (location_) {
00330 case MEMORY:
00331 memcpy(data_.buf() + offset, bp, len);
00332 break;
00333 case DISK:
00334
00335 if (cur_offset_ != offset) {
00336 file_.lseek(offset, SEEK_SET);
00337 cur_offset_ = offset;
00338 }
00339 file_.writeall((char*)bp, len);
00340 cur_offset_ += len;
00341 break;
00342 case NODATA:
00343 NOTREACHED;
00344 }
00345 }
00346
00347
00348 void
00349 BundlePayload::set_data(const u_char* bp, size_t len)
00350 {
00351 set_length(len);
00352 write_data(bp, 0, len);
00353 }
00354
00355
00356 void
00357 BundlePayload::set_data(const std::string& data)
00358 {
00359 set_data((const u_char*)(data.data()), data.length());
00360 }
00361
00362
00363 void
00364 BundlePayload::append_data(const u_char* bp, size_t len)
00365 {
00366 oasys::ScopeLock l(lock_, "BundlePayload::append_data");
00367
00368 size_t old_length = length_;
00369 set_length(length_ + len);
00370
00371 pin_file();
00372 internal_write(bp, old_length, len);
00373 unpin_file();
00374 }
00375
00376
00377 void
00378 BundlePayload::write_data(const u_char* bp, size_t offset, size_t len)
00379 {
00380 oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00381
00382 ASSERT(length_ >= (len + offset));
00383 pin_file();
00384 internal_write(bp, offset, len);
00385 unpin_file();
00386 }
00387
00388
00389 void
00390 BundlePayload::write_data(const BundlePayload& src, size_t src_offset,
00391 size_t len, size_t dst_offset)
00392 {
00393 oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00394
00395 log_debug("write_data: file=%s length_=%zu src_offset=%zu "
00396 "dst_offset=%zu len %zu",
00397 file_.path(),
00398 length_, src_offset, dst_offset, len);
00399
00400 ASSERT(length_ >= dst_offset + len);
00401 ASSERT(src.length() >= src_offset + len);
00402
00403
00404
00405
00406
00407
00408
00409 oasys::ScratchBuffer<u_char*, 1024> buf(len);
00410 const u_char* bp = src.read_data(src_offset, len, buf.buf());
00411
00412 pin_file();
00413 internal_write(bp, dst_offset, len);
00414 unpin_file();
00415 }
00416
00417
00418 const u_char*
00419 BundlePayload::read_data(size_t offset, size_t len, u_char* buf)
00420 {
00421 oasys::ScopeLock l(lock_, "BundlePayload::read_data");
00422
00423 ASSERTF(length_ >= (offset + len),
00424 "length=%zu offset=%zu len=%zu",
00425 length_, offset, len);
00426
00427 ASSERT(buf != NULL);
00428
00429 switch(location_) {
00430 case MEMORY:
00431 memcpy(buf, data_.buf() + offset, len);
00432 break;
00433
00434 case DISK:
00435 pin_file();
00436
00437
00438 if (offset != cur_offset_) {
00439 file_.lseek(offset, SEEK_SET);
00440 }
00441
00442 file_.readall((char*)buf, len);
00443 cur_offset_ = offset + len;
00444
00445 unpin_file();
00446 break;
00447
00448 case NODATA:
00449 NOTREACHED;
00450 }
00451
00452 return buf;
00453 }
00454
00455
00456 }