Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module strategy_base
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.strategy_base

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  import stat 
  23  from cStringIO import StringIO 
  24  import time 
  25   
  26  from twisted.internet import defer, reactor, abstract 
  27   
  28  from flumotion.common import log 
  29   
  30  from flumotion.component.misc.httpserver import fileprovider 
  31  from flumotion.component.misc.httpserver import mimetypes 
  32  from flumotion.component.misc.httpserver import cachestats 
  33  from flumotion.component.misc.httpserver.httpcached import common 
  34  from flumotion.component.misc.httpserver.httpcached import resource_manager 
  35   
  36  EXP_TABLE_CLEANUP_PERIOD = 30 
  37  MAX_RESUME_COUNT = 20 
  38   
  39  # A RemoteProducer will not be able to 
  40  # produce faster than 6.25 Mibit/s (6.55 Mbit/s) 
  41  PRODUCING_PERIOD = 0.08 
  42   
  43   
44 -class ConditionError(Exception):
45 """ 46 Raised when a request used by a caching session 47 was using a conditional retrieval and it fails. 48 """ 49
50 - def __init__(self, *args, **kwargs):
51 self.code = kwargs.pop("code", None) 52 Exception.__init__(self, *args, **kwargs)
53
54 - def __str__(self):
55 return "<%s: %s>" % (type(self).__name__, repr(self.code))
56 57
58 -class CachingStrategy(log.Loggable):
59 """ 60 Base class for all caching strategies. 61 62 Handles the cache lookup, cache expiration checks, 63 statistics gathering and caching sessions managment. 64 """ 65 66 logCategory = "base-caching" 67
68 - def __init__(self, cachemgr, reqmgr, ttl):
69 self.cachemgr = cachemgr 70 self.reqmgr = reqmgr 71 self.ttl = ttl 72 73 self._identifiers = {} # {IDENTIFIER: CachingSession} 74 self._etimes = {} # {IDENTIFIER: EXPIRATION_TIME} 75 76 self._cleanupCall = None
77
78 - def setup(self):
79 self._startCleanupLoop() 80 return self.reqmgr.setup()
81
82 - def cleanup(self):
83 self._stopCleanupLoop() 84 self.reqmgr.cleanup() 85 for session in self._identifiers.values(): 86 session.cancel() 87 return self
88
89 - def getSourceFor(self, url, stats):
90 identifier = self.cachemgr.getIdentifier(url.path) 91 session = self._identifiers.get(identifier, None) 92 if session is not None: 93 self.debug("Caching session found for '%s'", url) 94 95 if (session.getState() in 96 (CachingSession.DETACHED, CachingSession.CACHED)): 97 stats.onStarted(session.size, cachestats.CACHE_HIT) 98 elif (session.getState() in 99 (CachingSession.REQUESTING, CachingSession.BUFFERING, 100 CachingSession.CACHING)): 101 stats.onStarted(session.size, cachestats.TEMP_HIT) 102 else: 103 stats.onStarted(session.size, cachestats.CACHE_MISS) 104 105 # Wait to know session info like mtime and size 106 d = session.waitInfo() 107 d.addCallback(RemoteSource, stats) 108 return d 109 110 self.log("Looking for cached file for '%s'", url) 111 d = defer.Deferred() 112 d.addCallback(self.cachemgr.openCacheFile) 113 d.addErrback(self._cachedFileError, url) 114 d.addCallback(self._gotCachedFile, url, identifier, stats) 115 116 d.callback(url.path) 117 118 return d
119
120 - def requestData(self, url, offset=None, size=None, mtime=None):
121 requester = BlockRequester(self.reqmgr, url, mtime) 122 return requester.retrieve(offset, size)
123
124 - def getSessions(self):
125 return self._identifiers.values()
126
127 - def keepCacheAlive(self, identifier, ttl=None):
128 self._etimes[identifier] = time.time() + (ttl or self.ttl)
129 130 ### To Be Overridden ### 131
132 - def _onCacheMiss(self, url, stats):
133 raise NotImplementedError()
134
135 - def _onCacheOutdated(self, url, identifier, cachedFile, stats):
136 raise NotImplementedError()
137 138 ### Protected Methods ### 139
140 - def _startCleanupLoop(self):
141 assert self._cleanupCall is None, "Already started" 142 self._cleanupCall = reactor.callLater(EXP_TABLE_CLEANUP_PERIOD, 143 self._cleanupLoop)
144
145 - def _stopCleanupLoop(self):
146 if self._cleanupCall: 147 self._cleanupCall.cancel() 148 self._cleanupCall = None
149
150 - def _cleanupLoop(self):
151 self._cleanupCall = None 152 self._cleanupExpirationTable() 153 self._startCleanupLoop()
154
155 - def _cleanupExpirationTable(self):
156 now = time.time() 157 expired = [i for i, e in self._etimes.items() if e < now] 158 for ident in expired: 159 del self._etimes[ident]
160
161 - def _onNewSession(self, session):
162 identifier = session.identifier 163 old = self._identifiers.get(identifier, None) 164 if old is not None: 165 old.cancel() 166 self._identifiers[session.identifier] = session
167
168 - def _onSessionCanceled(self, session):
169 if self._identifiers[session.identifier] == session: 170 del self._identifiers[session.identifier]
171
172 - def _onResourceCached(self, session):
173 self.keepCacheAlive(session.identifier) 174 del self._identifiers[session.identifier]
175
176 - def _onResourceError(self, session, error):
177 del self._identifiers[session.identifier]
178
179 - def _cachedFileError(self, failure, url):
180 if failure.check(fileprovider.FileError): 181 self.debug("Error looking for cached file for '%s'", url) 182 return None 183 return failure
184
185 - def _gotCachedFile(self, cachedFile, url, identifier, stats):
186 if cachedFile is not None: 187 self.log("Opened cached file '%s'", cachedFile.name) 188 etime = self._etimes.get(identifier, None) 189 if etime and (etime > time.time()): 190 stats.onStarted(cachedFile.stat[stat.ST_SIZE], 191 cachestats.CACHE_HIT) 192 return CachedSource(identifier, url, cachedFile, stats) 193 self.debug("Cached file may have expired '%s'", cachedFile.name) 194 return self._onCacheOutdated(url, identifier, cachedFile, stats) 195 self.debug("Resource not cached '%s'", url) 196 return self._onCacheMiss(url, stats)
197 198
199 -class CachedSource(resource_manager.DataSource):
200 """ 201 Data source that read data directly from a localy cached file. 202 """ 203 204 mimetypes = mimetypes.MimeTypes() 205
206 - def __init__(self, ident, url, cachedFile, stats):
207 self.identifier = ident 208 self.url = url 209 self._file = cachedFile 210 self.stats = stats 211 212 self.mimeType = self.mimetypes.fromPath(url.path) 213 self.mtime = cachedFile.stat[stat.ST_MTIME] 214 self.size = cachedFile.stat[stat.ST_SIZE] 215 216 self._current = cachedFile.tell()
217
218 - def produce(self, consumer, offset):
219 # A producer for a cached file is not really convenient 220 # because it's better used pulling than pushing. 221 return None
222
223 - def read(self, offset, size):
224 if offset != self._current: 225 self._file.seek(offset) 226 data = self._file.read(size) 227 size = len(data) 228 self.stats.onBytesRead(0, size, 0) 229 self._current = offset + size 230 return data
231
232 - def close(self):
233 self.stats.onClosed() 234 self._file.close() 235 self._file = None
236 237
238 -class BaseRemoteSource(resource_manager.DataSource):
239 """ 240 Base class for resource not yet cached. 241 It offers a push producer, it delegates read operations 242 to the session and start a block pipelining if the session 243 cannot serve the requested data. 244 Updates the cache statistics. 245 """ 246 247 strategy = None 248 session = None 249 stats = None 250
251 - def produce(self, consumer, offset):
252 return RemoteProducer(consumer, self.session, offset, self.stats)
253
254 - def read(self, offset, size):
255 if offset >= self.size: 256 return "" # EOF 257 data = self.session.read(offset, size) 258 if data is not None: 259 # Adjust the cache/source values to take copy into account 260 # FIXME: ask sebastien if he is on crack or LSD 261 size = len(data) 262 diff = min(self.session._correction, size) 263 self.session._correction -= diff 264 self.stats.onBytesRead(0, size, diff) # from cache 265 return data 266 d = self.strategy.requestData(self.url, offset, size, self.mtime) 267 d.addCallback(self._requestDataCb) 268 d.addErrback(self._requestDataFailed) 269 return d
270
271 - def _requestDataFailed(self, failure):
272 if failure.check(fileprovider.FileOutOfDate): 273 self.session.cancel() 274 return failure
275
276 - def _requestDataCb(self, data):
277 self.stats.onBytesRead(len(data), 0, 0) # from remote source 278 return data
279 280
281 -class RemoteSource(BaseRemoteSource):
282 """ 283 Simple remote source. 284 """ 285
286 - def __init__(self, session, stats):
287 self.session = session 288 self.stats = stats 289 290 self.strategy = session.strategy 291 self.identifier = session.identifier 292 self.url = session.url 293 self.mimeType = session.mimeType 294 self.mtime = session.mtime 295 self.size = session.size 296 297 session.addref()
298
299 - def close(self):
300 self.stats.onClosed() 301 self.session.delref() 302 self.session = None
303 304
305 -class BaseCachingSession(object):
306 """ 307 Base class of caching sessions. 308 Just an interface to be implemented or inherited 309 by all caching sessions. 310 """ 311 312 strategy = None 313 url = None 314 size = 0 315 mtime = None 316 mimeType = None 317
318 - def read(self, offset, size):
319 return None
320
321 - def cancel(self):
322 raise NotImplementedError()
323
324 - def addref(self):
325 raise NotImplementedError()
326
327 - def delref(self):
328 raise NotImplementedError()
329 330
331 -class CachingSession(BaseCachingSession, log.Loggable):
332 """ 333 Caches a stream locally in a temporary file. 334 The already cached data can be read from the session. 335 336 Can be canceled, meaning the session is not valid anymore. 337 338 Can be aborted, meaning the session will stop caching locally 339 but is still valid. 340 341 The caching operation can be started at any moment, but the 342 session have to receive the stream info before it can be used 343 with a RemoteSource instance. 344 345 It can recover request failures up to MAX_RESUME_COUNT times. 346 """ 347 348 logCategory = "caching-session" 349 350 (PIPELINING, 351 REQUESTING, 352 BUFFERING, 353 CACHING, 354 CACHED, 355 DETACHED, 356 CLOSED, 357 CANCELED, 358 ABORTED, 359 ERROR) = range(10) 360 361 mimetypes = mimetypes.MimeTypes() 362
363 - def __init__(self, strategy, url, cache_stats, ifModifiedSince=None):
364 self.strategy = strategy 365 self.url = url 366 self.identifier = strategy.cachemgr.getIdentifier(url.path) 367 368 self.ifModifiedSince = ifModifiedSince 369 self.cache_stats = cache_stats 370 371 self._refcount = 0 372 self._state = self.PIPELINING 373 self._request = None 374 375 self._infoDefers = [] 376 self._startedDefers = [] 377 self._finishedDefers = [] 378 self._errorValue = None 379 380 self._file = None 381 self._bytes = 0 382 self._correction = 0 383 384 self._resumes = MAX_RESUME_COUNT 385 386 self.logName = common.log_id(self) # To be able to track the instance 387 388 self.strategy._onNewSession(self) 389 390 self.log("Caching session created for %s", url)
391
392 - def isActive(self):
393 return (self._state < self.CLOSED) or (self._state == self.ABORTED)
394
395 - def getState(self):
396 return self._state
397
398 - def cache(self):
399 """ 400 Starts caching the remote resource locally. 401 """ 402 if self._state != self.PIPELINING: 403 return 404 405 self._state = self.REQUESTING 406 407 self.debug("Caching requested for %s", self.url) 408 self.cache_stats.onCopyStarted() 409 410 self._firstRetrieve()
411
412 - def waitInfo(self):
413 if self._state < self.BUFFERING: 414 d = defer.Deferred() 415 self._infoDefers.append(d) 416 return d 417 if self._state <= self.CLOSED: 418 return defer.succeed(self) 419 return defer.fail(self._errorValue)
420
421 - def waitStarted(self):
422 if self._state <= self.REQUESTING: 423 d = defer.Deferred() 424 self._startedDefers.append(d) 425 return d 426 if self._state <= self.CLOSED: 427 return defer.succeed(self) 428 return defer.fail(self._errorValue)
429
430 - def waitFinished(self):
431 if self._state < self.DETACHED: 432 d = defer.Deferred() 433 self._finishedDefers.append(d) 434 return d 435 if self._state <= self.CLOSED: 436 return defer.succeed(self) 437 return defer.fail(self._errorValue)
438
439 - def read(self, offset, size):
440 if self._state == self.CANCELED: 441 raise fileprovider.FileOutOfDate("File out of date") 442 if self._state == self.ABORTED: 443 return None 444 if self._state >= self.CLOSED: 445 raise fileprovider.FileClosedError("Session Closed") 446 447 if self._file is None: 448 return None 449 450 if min(self.size, offset + size) > self._bytes: 451 return None 452 453 self._file.seek(offset) 454 return self._file.read(size)
455
456 - def cancel(self):
457 """ 458 After calling this method the session cannot be used anymore. 459 """ 460 if self._state < self.REQUESTING or self._state >= self.CACHED: 461 return 462 463 self.log("Canceling caching session for %s", self.url) 464 465 self.strategy._onSessionCanceled(self) 466 self.cache_stats.onCopyCancelled(self.size, self._bytes) 467 468 self._close() 469 470 error = fileprovider.FileOutOfDate("File out of date") 471 self._fireError(error) 472 473 if self._request: 474 self.debug("Caching canceled for %s (%d/%d Bytes ~ %d %%)", 475 self.url, self._bytes, self.size, 476 self.size and int(self._bytes * 100 / self.size)) 477 self._request.cancel() 478 self._request = None 479 else: 480 self.debug("Caching canceled before starting to cache") 481 482 self._state = self.CANCELED
483
484 - def abort(self):
485 """ 486 After calling this method the session will just stop caching 487 and return None when trying to read. Used when pipelining is wanted. 488 """ 489 if self._state < self.REQUESTING or self._state >= self.CACHED: 490 return 491 492 self.log("Aborting caching session for %s", self.url) 493 494 self.strategy._onSessionCanceled(self) 495 self.cache_stats.onCopyCancelled(self.size, self._bytes) 496 497 self._close() 498 499 error = fileprovider.FileError("Caching aborted") 500 self._fireError(error) 501 502 if self._request: 503 self.debug("Caching aborted for %s", self.url) 504 self._request.cancel() 505 self._request = None 506 else: 507 self.debug("Caching aborted before starting to cache") 508 509 self._state = self.ABORTED
510
511 - def addref(self):
512 self._refcount += 1
513
514 - def delref(self):
515 self._refcount -= 1 516 if self._refcount == 0: 517 if self._state == self.DETACHED: 518 # not referenced, so no we can close the file 519 self.log("Detached session not referenced anymore") 520 self._close()
521
522 - def isref(self):
523 return self._refcount > 0
524 525 ### StreamConsumer ### 526
527 - def serverError(self, getter, code, message):
528 self.warning("Session request error %s (%s) for %s using %s:%s", 529 message, code, self.url, getter.host, getter.port) 530 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT): 531 if self._resumes > 0: 532 self._resumes -= 1 533 if self._state > self.REQUESTING: 534 # We already have request info 535 offset = self._bytes 536 size = self.size - self._bytes 537 self.debug("Resuming retrieval from offset %d with " 538 "size %d of %s (%d tries left)", offset, size, 539 self.url, self._resumes) 540 541 self._resumeRetrieve(offset, size) 542 return 543 else: 544 # We don't have any info, e must retry from scratch 545 self.debug("Resuming retrieval from start of %s " 546 "(%d tries left)", self.url, self._resumes) 547 self._firstRetrieve() 548 return 549 self.debug("Too much resuming intents, stopping " 550 "after %d of %s bytes of %s", 551 self._bytes, self.size, self.url) 552 self._close() 553 self._error(fileprovider.UnavailableError(message))
554
555 - def conditionFail(self, getter, code, message):
556 if code == common.STREAM_MODIFIED: 557 # Modified file detected during recovery 558 self.log("Modifications detected during recovery of %s", self.url) 559 self.cancel() 560 return 561 self.log("Unexpected HTTP condition failed: %s", message) 562 self._close() 563 self._error(ConditionError(message, code=code))
564
565 - def streamNotAvailable(self, getter, code, message):
566 self.log("Stream to be cached is not available: %s", message) 567 self._close() 568 if code == common.STREAM_NOTFOUND: 569 self._error(fileprovider.NotFoundError(message)) 570 elif code == common.STREAM_FORBIDDEN: 571 self._error(fileprovider.AccessError(message)) 572 else: 573 self._error(fileprovider.FileError(message))
574
575 - def onInfo(self, getter, info):
576 if self._state == self.BUFFERING: 577 # We are resuming while waiting for a temporary file, 578 # so we still don't want to accumulate data 579 self._request.pause() 580 return 581 582 if self._state != self.REQUESTING: 583 # Already canceled, or recovering from disconnection 584 return 585 586 if info.size != (info.length - self._bytes): 587 self.log("Unexpected stream size: %s / %s bytes " 588 "(Already got %s bytes)", 589 info.size, info.length, self._bytes) 590 self._close() 591 msg = "Unexpected resource size: %d" % info.size 592 self._error(fileprovider.FileError(msg)) 593 return 594 595 self._state = self.BUFFERING 596 597 self.mimeType = self.mimetypes.fromPath(self.url.path) 598 self.mtime = info.mtime 599 self.size = info.size 600 601 self.log("Caching session with type %s, size %s, mtime %s for %s", 602 self.mimeType, self.size, self.mtime, self.url) 603 604 self._file = StringIO() # To wait until we got the real one 605 606 self.log("Requesting temporary file for %s", self.url) 607 d = self.strategy.cachemgr.newTempFile(self.url.path, info.size, 608 info.mtime) 609 610 # But we don't want to accumulate data 611 # but it is possible to receive a small amount of data 612 # even after calling pause(), so we need buffering. 613 self._request.pause() 614 615 # We have got meta data, so callback 616 self._fireInfo(self) 617 self._fireStarted(self) 618 619 self.debug("Start buffering %s", self.url) 620 d.addCallback(self._gotTempFile)
621
622 - def _gotTempFile(self, tempFile):
623 if self._state not in (self.BUFFERING, self.CACHED): 624 # Already canceled 625 if tempFile: 626 tempFile.close() 627 return 628 629 if tempFile is None: 630 self.warning("Temporary file creation failed, " 631 "aborting caching of %s", self.url) 632 self.abort() 633 return 634 635 self.log("Got temporary file for %s", self.url) 636 637 self.debug("Start caching %s", self.url) 638 639 data = self._file.getvalue() 640 self._file = tempFile 641 tempFile.write(data) 642 643 if self._request is not None: 644 # We still have a request, so we want more data of it 645 self._request.resume() 646 647 if self._state == self.CACHED: 648 # Already got all the data 649 self._real_complete() 650 else: 651 self._state = self.CACHING
652
653 - def onData(self, getter, data):
654 assert self._state in (self.BUFFERING, self.CACHING), "Not caching" 655 self._file.seek(self._bytes) 656 size = len(data) 657 try: 658 self._file.write(data) 659 except Exception, e: 660 self.warning("Error writing in temporary file: %s", e) 661 self.debug("Got %s / %s bytes, would be %s with %s more", 662 self._bytes, self.size, self._bytes + size, size) 663 self.abort() 664 else: 665 self._bytes += size 666 self._correction += size
667
668 - def streamDone(self, getter):
669 assert self._state in (self.BUFFERING, self.CACHING), "Not caching" 670 self._request = None 671 self._complete()
672
673 - def _error(self, error):
674 assert self._state < self.CANCELED, "Wrong state for errors" 675 676 self.log("Caching error for %s: %s", self.url, error) 677 678 self._state = self.ERROR 679 680 self.strategy._onResourceError(self, error) 681 self.strategy = None 682 self._request = None 683 684 self._fireError(error)
685
686 - def _fireInfo(self, value):
687 defers = list(self._infoDefers) 688 # Prevent multiple deferred firing due to reentrence 689 self._infoDefers = [] 690 for d in defers: 691 d.callback(value)
692
693 - def _fireStarted(self, value):
694 defers = list(self._startedDefers) 695 # Prevent multiple deferred firing due to reentrence 696 self._startedDefers = [] 697 for d in defers: 698 d.callback(value)
699
700 - def _fireFinished(self, value):
701 defers = list(self._finishedDefers) 702 # Prevent multiple deferred firing due to reentrence 703 self._finishedDefers = [] 704 for d in defers: 705 d.callback(value)
706
707 - def _fireError(self, error):
708 self._errorValue = error 709 defers = list(self._infoDefers) 710 defers.extend(self._startedDefers) 711 defers.extend(self._finishedDefers) 712 # Prevent multiple deferred firing due to reentrence 713 self._infoDefers = [] 714 self._startedDefers = [] 715 self._finishedDefers = [] 716 for d in defers: 717 d.errback(error)
718
719 - def _close(self):
720 if self._state >= self.CLOSED: 721 return 722 723 self.log("Closing caching session for %s", self.url) 724 725 if self._state >= self.BUFFERING: 726 self._file.close() 727 self._file = None 728 729 self._state = self.CLOSED
730
731 - def _complete(self):
732 assert self._state in (self.CACHING, self.BUFFERING), "Not caching" 733 self.debug("Finished caching %s (%d Bytes)", self.url, self.size) 734 735 oldstate = self._state 736 self._state = self.CACHED 737 738 if oldstate != self.BUFFERING: 739 self._real_complete()
740
741 - def _real_complete(self):
742 assert self._state == self.CACHED, "Not cached" 743 self._state = self.DETACHED 744 self.log("Caching session detached for %s", self.url) 745 746 self._file.complete() 747 748 self.strategy._onResourceCached(self) 749 self.strategy = None 750 751 if not self.isref(): 752 # Not referenced anymore by sources, so close the session 753 self.log("Caching session not referenced, it can be closed") 754 self._close() 755 756 self.cache_stats.onCopyFinished(self.size) 757 self._fireFinished(self)
758
759 - def _firstRetrieve(self):
760 since = self.ifModifiedSince 761 self._request = self.strategy.reqmgr.retrieve(self, self.url, 762 ifModifiedSince=since) 763 self.log("Retrieving data using %s", self._request.logName)
764
765 - def _resumeRetrieve(self, offset, size):
766 reqmgr = self.strategy.reqmgr 767 req = reqmgr.retrieve(self, self.url, 768 ifUnmodifiedSince=self.mtime, 769 start=offset, size=size) 770 self._request = req 771 self.log("Retrieving data using %s", self._request.logName)
772 773
774 -class RemoteProducer(common.StreamConsumer, log.Loggable):
775 """ 776 Offers a IPushProducer interface to a caching session. 777 It starts producing data from the specified point. 778 779 If the data is already cached by the session, 780 it produce data with a reactor loop reading the data 781 from the session by block. 782 783 If the data is not yet cached, it starts a request 784 using the request manager and pipeline the data 785 to the specified consumer. 786 787 It can recover request failures up to MAX_RESUME_COUNT times. 788 789 It's not used yet in the context of http-server. 790 Until now, the simulations show that using a producer with 791 long-lived HTTP requests instead of short lived block request 792 is less efficient and produce bigger latency for the clients. 793 At least when used with HTTP proxies. 794 """ 795 796 logCategory = "pipe-producer" 797
798 - def __init__(self, consumer, session, offset, stats):
799 self.consumer = consumer 800 self.offset = offset 801 self.session = session 802 self.stats = stats 803 self.reqmgr = session.strategy.reqmgr 804 805 self.logName = common.log_id(self) # To be able to track the instance 806 807 self._pipelining = False 808 self._paused = False 809 self._request = None 810 self._produced = 0 811 self._resumes = MAX_RESUME_COUNT 812 self._call = None 813 814 session.addref() 815 816 self.log("Starting producing data with session %s from %s", 817 self.session.logName, self.session.url) 818 819 consumer.registerProducer(self, True) # Push producer 820 self._produce()
821 822 ### IPushProducer Methods ### 823
824 - def resumeProducing(self):
825 if self.consumer is None: 826 # Already stopped 827 return 828 829 self._paused = False 830 831 if self._pipelining: 832 # Doing pipelining 833 if self._request: 834 # Just resuming current request 835 self._request.resume() 836 else: 837 # Start a new one 838 self._pipeline() 839 else: 840 # Producing from session 841 self._produce()
842
843 - def pauseProducing(self):
844 if self.consumer is None: 845 # Already stopped 846 return 847 848 self._paused = True 849 850 if self._pipelining: 851 # Doing pipelining 852 if self._request: 853 self._request.pause() 854 else: 855 # Producing from session 856 self._stop()
857
858 - def stopProducing(self):
859 self.log("Ask to stop producing %s", self.session.url) 860 self._terminate()
861 862 ### common.StreamConsumer Methods ### 863
864 - def serverError(self, getter, code, message):
865 if self._request is None: 866 # Already terminated 867 return 868 self._request = None 869 870 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT): 871 self.warning("Producer request error %s (%s) for %s " 872 "(%s tries left)", message, code, 873 self.session.url, self._resumes) 874 875 if self._resumes > 0: 876 self._resumes -= 1 877 if self._paused: 878 self.log("Producer paused, waiting to recover pipelining " 879 "(%d tries left)", self._resumes) 880 else: 881 self.log("Recovering pipelining (%d tries left)", 882 self._resumes) 883 self._pipeline() 884 return 885 886 self.debug("Too much resuming intents, stopping " 887 "after %d of %s", self._bytes, self.size) 888 889 self._terminate()
890
891 - def conditionFail(self, getter, code, message):
892 if self._request is None: 893 # Already terminated 894 return 895 self._request = None 896 self.warning("Modifications detected while producing %s", 897 self.session.url) 898 self._terminate()
899
900 - def streamNotAvailable(self, getter, code, message):
901 if self._request is None: 902 # Already terminated 903 return 904 self._request = None 905 self.warning("%s detected while producing %s", 906 message, self.session.url) 907 self._terminate()
908
909 - def onData(self, getter, data):
910 if self._request is None: 911 # Already terminated 912 return 913 self._write(data)
914
915 - def streamDone(self, getter):
916 if self._request is None: 917 # Already terminated 918 return 919 self.log("Pipelining finished") 920 self._terminate()
921 922 ### Private Methods ### 923
924 - def _produce(self):
925 self._call = None 926 if self.consumer is None: 927 # Already terminated 928 return 929 930 data = self.session.read(self.offset + self._produced, 931 abstract.FileDescriptor.bufferSize) 932 933 if data is None: 934 # The session can't serve the data, start pipelining 935 self._pipeline() 936 return 937 938 if data == "": 939 # No more data 940 self.log("All data served from session") 941 self._terminate() 942 return 943 944 self._write(data) 945 946 self._call = reactor.callLater(PRODUCING_PERIOD, self._produce)
947
948 - def _write(self, data):
949 size = len(data) 950 self._produced += size 951 self.consumer.write(data)
952
953 - def _stop(self):
954 if self._call is not None: 955 self._call.cancel() 956 self._call = None
957
958 - def _pipeline(self):
959 if not self.session.isActive(): 960 self.log("Session %s not active anymore (%s), " 961 "aborting production of %s", 962 self.session.logName, 963 self.session._state, 964 self.session.url) 965 self._terminate() 966 return 967 968 self._pipelining = True 969 970 offset = self.offset + self._produced 971 size = self.session.size - offset 972 mtime = self.session.mtime 973 974 if size == 0: 975 self.log("No more data to be retrieved, pipelining finished") 976 self._terminate() 977 return 978 979 self.debug("Producing %s bytes from offset %d of %s", 980 size, offset, self.session.url) 981 982 self._request = self.reqmgr.retrieve(self, self.session.url, 983 start=offset, size=size, 984 ifUnmodifiedSince=mtime) 985 self.log("Retrieving data using %s", self._request.logName)
986
987 - def _terminate(self):
988 if self._request: 989 # Doing pipelining 990 self._request.cancel() 991 self._request = None 992 993 self._stop() # Stopping producing from session 994 995 expected = self.session.size - self.offset 996 if self._produced != expected: 997 self.warning("Only produced %s of the %s bytes " 998 "starting at %s of %s", 999 self._produced, expected, 1000 self.offset, self.session.url) 1001 else: 1002 self.log("Finished producing %s bytes starting at %s of %s", 1003 self._produced, self.offset, self.session.url) 1004 1005 self.consumer.unregisterProducer() 1006 self.consumer.finish() 1007 self.consumer = None 1008 1009 self.session.delref() 1010 self.session = None
1011 1012
1013 -class BlockRequester(common.StreamConsumer, log.Loggable):
1014 """ 1015 Retrieves a block of data using a range request. 1016 A modification time can be specified for the retrieval to 1017 fail if the requested file modification time changed. 1018 1019 The data is returned as a block by triggering the deferred 1020 returned by calling the retrieve method. 1021 1022 It can recover request failures up to MAX_RESUME_COUNT times. 1023 """ 1024 1025 logCategory = "block-requester" 1026
1027 - def __init__(self, reqmgr, url, mtime=None):
1028 self.reqmgr = reqmgr 1029 self._url = url 1030 self._mtime = mtime 1031 self._data = None 1032 self._deferred = None 1033 self._offset = None 1034 self._size = None 1035 self._resumes = MAX_RESUME_COUNT 1036 1037 self.logName = common.log_id(self) # To be able to track the instance
1038
1039 - def retrieve(self, offset, size):
1040 assert self._deferred is None, "Already retrieving" 1041 self._deferred = defer.Deferred() 1042 self._data = [] 1043 self._offset = offset 1044 self._size = size 1045 self._curr = 0 1046 1047 self._retrieve() 1048 1049 return self._deferred
1050
1051 - def serverError(self, getter, code, message):
1052 assert self._deferred is not None, "Not retrieving anything" 1053 if code == common.RANGE_NOT_SATISFIABLE: 1054 # Simulate EOF 1055 self._deferred.callback("") 1056 self._cleanup() 1057 return 1058 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT): 1059 self.warning("Block request error: %s (%s)", message, code) 1060 if self._resumes > 0: 1061 self._resumes -= 1 1062 self.debug("Resuming block retrieval from offset %d " 1063 "with size %d (%d tries left)", 1064 self._offset, self._size, self._resumes) 1065 1066 self._retrieve() 1067 return 1068 self.debug("Too much resuming intents, stopping " 1069 "after %d of %d", self._offset, self._size) 1070 self._deferred.errback(fileprovider.FileError(message)) 1071 self._cleanup()
1072
1073 - def conditionFail(self, getter, code, message):
1074 assert self._deferred is not None, "Not retrieving anything" 1075 self._deferred.errback(fileprovider.FileOutOfDate(message)) 1076 self._cleanup()
1077
1078 - def streamNotAvailable(self, getter, code, message):
1079 assert self._deferred is not None, "Not retrieving anything" 1080 error = fileprovider.FileOutOfDate(message) 1081 self._deferred.errback(error) 1082 self._cleanup()
1083
1084 - def onData(self, getter, data):
1085 size = len(data) 1086 self._offset += size 1087 self._size -= size 1088 self._data.append(data)
1089
1090 - def streamDone(self, getter):
1091 data = "".join(self._data) 1092 self._deferred.callback(data) 1093 self._cleanup()
1094
1095 - def _retrieve(self):
1096 self.reqmgr.retrieve(self, self._url, start=self._offset, 1097 size=self._size, ifUnmodifiedSince=self._mtime)
1098
1099 - def _cleanup(self):
1100 self._deferred = None 1101 self._data = None
1102