1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import stat
25 import tempfile
26 import threading
27 import time
28
29 from twisted.internet import defer, reactor, threads, abstract
30
31 from flumotion.common import log, format, common, python
32 from flumotion.component.misc.httpserver import cachestats
33 from flumotion.component.misc.httpserver import cachemanager
34 from flumotion.component.misc.httpserver import fileprovider
35 from flumotion.component.misc.httpserver import localpath
36 from flumotion.component.misc.httpserver.fileprovider import FileClosedError
37 from flumotion.component.misc.httpserver.fileprovider import FileError
38 from flumotion.component.misc.httpserver.fileprovider import NotFoundError
39
40
41 SEEK_SET = 0
42 FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize
43 MAX_LOGNAME_SIZE = 30
44
45
46 LOG_CATEGORY = "fileprovider-localcached"
47
48
49 errnoLookup = {errno.ENOENT: fileprovider.NotFoundError,
50 errno.EISDIR: fileprovider.CannotOpenError,
51 errno.EACCES: fileprovider.AccessError}
52
53
71
72
75 """
76
77 WARNING: Currently does not work properly in combination with rate-control.
78
79 I'm caching the files taken from a mounted
80 network file system to a shared local directory.
81 Multiple instances can share the same cache directory,
82 but it's recommended to use slightly different values
83 for the property cleanup-high-watermark.
84 I'm using the directory access time to know when
85 the cache usage changed and keep an estimation
86 of the cache usage for statistics.
87
88 I'm creating a unique thread to do the file copying block by block,
89 for all files to be copied to the cache.
90 Using a thread instead of a reactor.callLater 'loop' allow for
91 higher copy throughput and do not slow down the mail loop when
92 lots of files are copied at the same time.
93 Simulations with real request logs show that using a thread
94 gives better results than the equivalent asynchronous implementation.
95 """
96
97 logCategory = LOG_CATEGORY
98
100 props = args['properties']
101 self._sourceDir = props.get('path')
102 cacheDir = props.get('cache-dir')
103 cacheSizeInMB = props.get('cache-size')
104 if cacheSizeInMB is not None:
105 cacheSize = cacheSizeInMB * 10 ** 6
106 else:
107 cacheSize = None
108 cleanupEnabled = props.get('cleanup-enabled')
109 cleanupHighWatermark = props.get('cleanup-high-watermark')
110 cleanupLowWatermark = props.get('cleanup-low-watermark')
111
112 self._sessions = {}
113 self._index = {}
114
115 self.stats = cachestats.CacheStatistics()
116
117 self.cache = cachemanager.CacheManager(self.stats,
118 cacheDir, cacheSize,
119 cleanupEnabled,
120 cleanupHighWatermark,
121 cleanupLowWatermark)
122
123 common.ensureDir(self._sourceDir, "source")
124
125
126 self._thread = CopyThread(self)
127
128 - def start(self, component):
129 self.debug('Starting cachedprovider plug for component %r', component)
130 d = self.cache.setUp()
131 d.addCallback(lambda x: self._thread.start())
132 return d
133
134 - def stop(self, component):
135 self.debug('Stopping cachedprovider plug for component %r', component)
136 self._thread.stop()
137 dl = []
138 for s in self._index.values():
139 d = s.close()
140 if d:
141 dl.append(d)
142 if len(dl) != 0:
143 return defer.DeferredList(dl)
144
150
153
155 if self._sourceDir is None:
156 return None
157 return LocalPath(self, self._sourceDir)
158
159
160
161
163 """
164 Returns a log name for a path, shortened to a maximum size
165 specified by the global variable MAX_LOGNAME_SIZE.
166 The log name will be the filename part of the path postfixed
167 by the id in brackets if id is not None.
168 """
169 filename = os.path.basename(path)
170 basename, postfix = os.path.splitext(filename)
171 if id is not None:
172 postfix += "[%s]" % id
173 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix)
174 if len(basename) > prefixMaxLen:
175 basename = basename[:prefixMaxLen-1] + "*"
176 return basename + postfix
177
179 return self._index.get(path, None)
180
188
193
199
207
215
218
221
222
223 -class LocalPath(localpath.LocalPath, log.Loggable):
251
252
254
255 logCategory = LOG_CATEGORY
256
258 threading.Thread.__init__(self)
259 self.plug = plug
260 self._running = True
261 self._event = threading.Event()
262
264 self._running = False
265 self._event.set()
266 self.join()
267
270
273
289
290
293
294
296 """
297 I'm serving a file at the same time I'm copying it
298 from the network file system to the cache.
299 If the client ask for data not yet copied, the source file
300 read operation is delegated the the copy thread as an asynchronous
301 operation because file seeking/reading is not thread safe.
302
303 The copy session have to open two times the temporary file,
304 one for read-only and one for write only,
305 because closing a read/write file change the modification time.
306 We want the modification time to be set to a known value
307 when the copy is finished even keeping read access to the file.
308
309 The session manage a reference counter to know how many TempFileDelegate
310 instances are using the session to delegate read operations.
311 This is done for two reasons:
312 - To avoid circular references by have the session manage
313 a list of delegate instances.
314 - If not cancelled, sessions should not be deleted
315 when no delegates reference them anymore. So weakref cannot be used.
316 """
317
318 logCategory = LOG_CATEGORY
319
320 - def __init__(self, plug, sourcePath, sourceFile, sourceInfo):
321 self.plug = plug
322 self.logName = plug.getLogName(sourcePath, sourceFile.fileno())
323 self.copying = None
324 self.sourcePath = sourcePath
325 self.tempPath = plug.cache.getTempPath(sourcePath)
326 self.cachePath = plug.cache.getCachePath(sourcePath)
327
328 self.mtime = sourceInfo[stat.ST_MTIME]
329 self.size = sourceInfo[stat.ST_SIZE]
330 self._sourceFile = sourceFile
331 self._cancelled = False
332 self._wTempFile = None
333 self._rTempFile = None
334 self._allocTag = None
335 self._waitCancel = None
336
337 self._pending = []
338 self._refCount = 0
339 self._copied = 0
340 self._correction = 0
341 self._startCopyingDefer = self._startCopying()
342
346
347 - def read(self, position, size, stats):
348
349 if self._rTempFile:
350
351
352 if (self._copied is None) or ((position + size) <= self._copied):
353 try:
354 self._rTempFile.seek(position)
355 data = self._rTempFile.read(size)
356
357 size = len(data)
358
359
360
361
362
363 diff = min(self._correction, size)
364 self._correction -= diff
365 stats.onBytesRead(0, size, diff)
366 return data
367 except Exception, e:
368 self.warning("Failed to read from temporary file: %s",
369 log.getExceptionMessage(e))
370 self._cancelSession()
371
372 if self._sourceFile is None:
373 raise FileError("File caching error, cannot proceed")
374
375 try:
376
377
378
379
380 if self.copying:
381
382
383
384 d = defer.Deferred()
385
386 def updateStats(data):
387 stats.onBytesRead(len(data), 0, 0)
388 return data
389
390 d.addCallback(updateStats)
391 self._pending.append((position, size, d))
392 return d
393
394 self._sourceFile.seek(position)
395 data = self._sourceFile.read(size)
396 stats.onBytesRead(len(data), 0, 0)
397 return data
398 except IOError, e:
399 cls = errnoLookup.get(e.errno, FileError)
400 raise cls("Failed to read source file: %s" % str(e))
401
404
406 self._refCount -= 1
407
408
409 if (self._refCount == 1) and self._cancelled:
410
411 self._cancelCopy(False, True)
412
413 if (self._refCount == 0) and (self._copied is None):
414 self.close()
415
423
425 if self._startCopyingDefer:
426 d = self._startCopyingDefer
427 self._startCopyingDefer = None
428 d.addCallback(lambda _: self._close())
429 return d
430
432 if not (self.copying and self._pending):
433
434 return False
435
436 position, size, d = self._pending.pop(0)
437 self._sourceFile.seek(position)
438 data = self._sourceFile.read(size)
439
440 reactor.callFromThread(d.callback, data)
441 return len(self._pending) > 0
442
485
486
487
488
492
494 if not (self._cancelled or self._allocTag is None):
495 self.plug.cache.releaseCacheSpace(self._allocTag)
496 self._allocTag = None
497
499 if not self._cancelled:
500 self.log("Canceling copy session")
501
502 self._cancelled = True
503
504
505 if self._refCount <= 1:
506
507 self._cancelCopy(False, True)
508
510 self._allocTag = tag
511
512 if not tag:
513
514 self._cancelSession()
515 return
516 self.plug.stats.onCopyStarted()
517
518 try:
519 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY)
520 self.log("Created transient file '%s'", transientPath)
521 self._wTempFile = os.fdopen(fd, "wb")
522 self.log("Opened temporary file for writing [fd %d]",
523 self._wTempFile.fileno())
524 self._rTempFile = file(transientPath, "rb")
525 self.log("Opened temporary file for reading [fd %d]",
526 self._rTempFile.fileno())
527 except IOError, e:
528 self.warning("Failed to open temporary file: %s",
529 log.getExceptionMessage(e))
530 self._cancelSession()
531 return
532
533 try:
534 self.log("Truncating temporary file to size %d", self.size)
535 self._wTempFile.truncate(self.size)
536 except IOError, e:
537 self.warning("Failed to truncate temporary file: %s",
538 log.getExceptionMessage(e))
539 self._cancelSession()
540 return
541
542 try:
543 self.log("Renaming transient file to '%s'", self.tempPath)
544 os.rename(transientPath, self.tempPath)
545 except IOError, e:
546 self.warning("Failed to rename transient temporary file: %s",
547 log.getExceptionMessage(e))
548
549 self.debug("Start caching '%s' [fd %d]",
550 self.sourcePath, self._sourceFile.fileno())
551
552 self.copying = True
553 self.plug.activateSession(self)
554
563
565 if self.copying:
566 self.log("Canceling file copy")
567 if self._waitCancel:
568
569 return
570 self.debug("Cancel caching '%s' [fd %d]",
571 self.sourcePath, self._sourceFile.fileno())
572
573
574
575 self._waitCancel = (closeSource, closeTempWrite)
576 return
577
578 if closeSource:
579 self._closeSourceFile()
580 if closeTempWrite:
581 self._closeWriteTempFile()
582
607
609 if self._sourceFile is None:
610 return
611
612 self.debug("Finished caching '%s' [fd %d]",
613 self.sourcePath, self._sourceFile.fileno())
614 self.plug.stats.onCopyFinished(self.size)
615
616
617 self._copied = None
618
619 self._closeSourceFile()
620 self._closeWriteTempFile()
621
622 try:
623 mtime = self.mtime
624 atime = int(time.time())
625 self.log("Setting temporary file modification time to %d", mtime)
626
627 os.utime(self.tempPath, (atime, mtime))
628 except OSError, e:
629 if e.errno == errno.ENOENT:
630
631 self._releaseCacheSpace()
632 else:
633 self.warning("Failed to update modification time of temporary "
634 "file: %s", log.getExceptionMessage(e))
635 self._cancelSession()
636 try:
637 self.log("Renaming temporary file to '%s'", self.cachePath)
638 os.rename(self.tempPath, self.cachePath)
639 except OSError, e:
640 if e.errno == errno.ENOENT:
641 self._releaseCacheSpace()
642 else:
643 self.warning("Failed to rename temporary file: %s",
644 log.getExceptionMessage(e))
645 self._cancelSession()
646
647 for position, size, d in self._pending:
648 try:
649 self._rTempFile.seek(position)
650 data = self._rTempFile.read(size)
651 d.callback(data)
652 except Exception, e:
653 self.warning("Failed to read from temporary file: %s",
654 log.getExceptionMessage(e))
655 d.errback(e)
656 self._pending = []
657 if self._refCount == 0:
658
659 self.close()
660
675
677 if self._sourceFile is not None:
678 self.log("Closing source file [fd %d]", self._sourceFile.fileno())
679 try:
680 try:
681 self._sourceFile.close()
682 finally:
683 self._sourceFile = None
684 except IOError, e:
685 self.warning("Failed to close source file: %s",
686 log.getExceptionMessage(e))
687
689 if self._rTempFile is not None:
690 self.log("Closing temporary file for reading [fd %d]",
691 self._rTempFile.fileno())
692 try:
693 try:
694 self._rTempFile.close()
695 finally:
696 self._rTempFile = None
697 except IOError, e:
698 self.warning("Failed to close temporary file for reading: %s",
699 log.getExceptionMessage(e))
700
702 if self._wTempFile is not None:
703
704 if not self._cancelled and self._copied is not None:
705 self._removeTempFile()
706 self.log("Closing temporary file for writing [fd %d]",
707 self._wTempFile.fileno())
708 try:
709 try:
710 self._wTempFile.close()
711 finally:
712 self._wTempFile = None
713 except Exception, e:
714 self.warning("Failed to close temporary file for writing: %s",
715 log.getExceptionMessage(e))
716
717
719
720 logCategory = LOG_CATEGORY
721
730
732 return self._position
733
734 - def seek(self, offset):
735 self._position = offset
736
737 - def read(self, size, stats):
738 assert not self._reading, "Simultaneous read not supported"
739 d = self._session.read(self._position, size, stats)
740 if isinstance(d, defer.Deferred):
741 self._reading = True
742 return d.addCallback(self._cbGotData)
743 self._position += len(d)
744 return d
745
747 if self._session is not None:
748 self._session.decRef()
749 self._session = None
750
751
752
753
755 self._reading = False
756 self._position += len(data)
757 return data
758
759
805
806
808
809 - def read(self, size, stats):
813
818
819
820 -class CachedFile(fileprovider.File, log.Loggable):
849
857
859 return "<CachedFile '%s'>" % self._path
860
865
870
875
876 - def seek(self, offset):
880
881 - def read(self, size):
894
900
903
906
907
908
909
917
919 sourcePath = self._path
920 self.log("Selecting delegate for source file %r [fd %d]",
921 sourcePath, sourceFile.fileno())
922
923 self.logName = self.plug.getLogName(self._path, sourceFile.fileno())
924
925 cachedPath = self.plug.cache.getCachePath(sourcePath)
926 try:
927 cachedFile, cachedInfo = open_stat(cachedPath)
928 self.log("Opened cached file [fd %d]", cachedFile.fileno())
929 except NotFoundError:
930 self.debug("Did not find cached file '%s'", cachedPath)
931 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
932 except FileError, e:
933 self.debug("Failed to open cached file: %s", str(e))
934 self._removeCachedFile(cachedPath)
935 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
936
937 self.debug("Found cached file '%s'", cachedPath)
938 sourceTime = sourceInfo[stat.ST_MTIME]
939 cacheTime = cachedInfo[stat.ST_MTIME]
940 if sourceTime != cacheTime:
941
942 self.debug("Cached file out-of-date (%d != %d)",
943 sourceTime, cacheTime)
944 self.stats.onCacheOutdated()
945 self.plug.outdateCopySession(sourcePath)
946 self._removeCachedFile(cachedPath)
947 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
948 self._closeSourceFile(sourceFile)
949
950 self.debug("Serving cached file '%s'", cachedPath)
951 delegate = CachedFileDelegate(self.plug, cachedPath,
952 cachedFile, cachedInfo)
953 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT)
954 return delegate
955
957 try:
958 os.remove(cachePath)
959 self.debug("Deleted cached file '%s'", cachePath)
960 except OSError, e:
961 if e.errno != errno.ENOENT:
962 self.warning("Error deleting cached file: %s", str(e))
963
964 - def _tryTempFile(self, sourcePath, sourceFile, sourceInfo):
982
983 - def _cacheFile(self, sourcePath, sourceFile, sourceInfo):
990