Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import time 
 25  import tempfile 
 26   
 27  import gobject 
 28  import gst 
 29   
 30  from twisted.internet import reactor 
 31   
 32  from flumotion.component import feedcomponent 
 33  from flumotion.common import log, gstreamer, pygobject, messages, errors 
 34  from flumotion.common import documentation, format 
 35  from flumotion.common import eventcalendar, poller 
 36  from flumotion.common.i18n import N_, gettexter 
 37  from flumotion.common.mimetypes import mimeTypeToExtention 
 38  from flumotion.common.pygobject import gsignal 
 39   
 40  #   the flumotion.twisted.flavors is not bundled, and as we only need it for 
 41  #   the interface, we can skip doing the import and thus not create 
 42  #   incompatibilities with workers running old versions of flavors that will be 
 43  #   asked to create diskers importing the IStateCacheableListener from that 
 44  #   module 
 45  # from flumotion.twisted.flavors import IStateCacheableListener 
 46   
 47  # proxy import 
 48  from flumotion.component.component import moods 
 49   
 50  __all__ = ['Disker'] 
 51  __version__ = "$Rev: 8644 $" 
 52  T_ = gettexter() 
 53   
 54  # Disk Usage polling frequency 
 55  DISKPOLL_FREQ = 60 
 56   
 57  # Maximum number of information to store in the filelist 
 58  FILELIST_SIZE = 100 
 59   
 60  """ 
 61  Disker has a property 'ical-schedule'. This allows an ical file to be 
 62  specified in the config and have recordings scheduled based on events. 
 63  This file will be monitored for changes and events reloaded if this 
 64  happens. 
 65   
 66  The filename of a recording started from an ical file will be produced 
 67  via passing the ical event summary through strftime, so that an archive 
 68  can encode the date and time that it was begun. 
 69   
 70  The time that will be given to strftime will be given in the timezone of 
 71  the ical event. In practice this will either be UTC or the local time of 
 72  the machine running the disker, as the ical scheduler does not 
 73  understand arbitrary timezones. 
 74  """ 
 75   
 76   
77 -class DiskerMedium(feedcomponent.FeedComponentMedium):
78 # called when admin ui wants to stop recording. call changeFilename to 79 # restart 80
81 - def remote_stopRecording(self):
82 self.comp.stopRecording()
83 84 # called when admin ui wants to change filename (this starts recording if 85 # the disker isn't currently writing to disk) 86
87 - def remote_changeFilename(self, filenameTemplate=None):
88 self.comp.changeFilename(filenameTemplate)
89
90 - def remote_scheduleRecordings(self, icalData):
91 icalFile = tempfile.TemporaryFile() 92 icalFile.write(icalData) 93 icalFile.seek(0) 94 95 self.comp.stopRecording() 96 97 self.comp.scheduleRecordings(icalFile) 98 icalFile.close()
99 100 # called when admin ui wants updated state (current filename info) 101
102 - def remote_notifyState(self):
103 self.comp.update_ui_state()
104 105
106 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
107 componentMediumClass = DiskerMedium 108 checkOffset = True 109 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false' 110 file = None 111 directory = None 112 location = None 113 caps = None 114 last_tstamp = None 115 116 _startFilenameTemplate = None # template to use when starting off recording 117 _startTimeTuple = None # time tuple of event when starting 118 _rotateTimeDelayedCall = None 119 _pollDiskDC = None # _pollDisk delayed calls 120 _symlinkToLastRecording = None 121 _symlinkToCurrentRecording = None 122 123 # see the commented out import statement for IStateCacheableListener at 124 # the beginning of this file 125 # implements(IStateCacheableListener) 126 127 ### BaseComponent methods 128
129 - def init(self):
130 self._can_schedule = (eventcalendar.HAS_ICALENDAR and 131 eventcalendar.HAS_DATEUTIL) 132 self.uiState.addKey('filename', None) 133 self.uiState.addKey('recording', False) 134 self.uiState.addKey('can-schedule', self._can_schedule) 135 self.uiState.addKey('has-schedule', False) 136 self.uiState.addKey('rotate-type', None) 137 self.uiState.addKey('disk-free', None) 138 # list of (dt (in UTC, without tzinfo), which, content) 139 self.uiState.addListKey('next-points') 140 self.uiState.addListKey('filelist') 141 142 self._diskPoller = poller.Poller(self._pollDisk, 143 DISKPOLL_FREQ, 144 start=False)
145 146 ### uiState observer triggers 147
148 - def observerAppend(self, observer, num):
149 # PB may not have finished setting up its state and doing a 150 # remoteCall immediately here may cause some problems to the other 151 # side. For us to send the initial disk usage value with no 152 # noticeable delay, we will do it in a callLater with a timeout 153 # value of 0 154 self.debug("observer has started watching us, starting disk polling") 155 if not self._diskPoller.running and not self._pollDiskDC: 156 self._pollDiskDC = reactor.callLater(0, 157 self._diskPoller.start, 158 immediately=True) 159 # Start the BaseComponent pollers 160 feedcomponent.ParseLaunchComponent.observerAppend(self, observer, num)
161
162 - def observerRemove(self, observer, num):
163 if num == 0: 164 # cancel delayed _pollDisk calls if there's any 165 if self._pollDiskDC: 166 self._pollDiskDC.cancel() 167 self._pollDiskDC = None 168 169 self.debug("no more observers left, shutting down disk polling") 170 self._diskPoller.stop() 171 # Stop the BaseComponent pollers 172 feedcomponent.ParseLaunchComponent.observerRemove(self, observer, num)
173 174 ### ParseLaunchComponent methods 175
176 - def get_pipeline_string(self, properties):
177 directory = properties['directory'] 178 179 self.directory = directory 180 181 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 182 183 rotateType = properties.get('rotate-type', 'none') 184 185 # validate rotate-type and size/time properties first 186 if not rotateType in ['none', 'size', 'time']: 187 m = messages.Error(T_(N_( 188 "The configuration property 'rotate-type' should be set to " 189 "'size', time', or 'none', not '%s'. " 190 "Please fix the configuration."), 191 rotateType), mid='rotate-type') 192 self.addMessage(m) 193 raise errors.ComponentSetupHandledError() 194 195 # size and time types need the property specified 196 if rotateType in ['size', 'time']: 197 if rotateType not in properties.keys(): 198 m = messages.Error(T_(N_( 199 "The configuration property '%s' should be set. " 200 "Please fix the configuration."), 201 rotateType), mid='rotate-type') 202 self.addMessage(m) 203 raise errors.ComponentSetupHandledError() 204 205 # now act on the properties 206 if rotateType == 'size': 207 self.setSizeRotate(properties['size']) 208 self.uiState.set('rotate-type', 209 'every %sB' % \ 210 format.formatStorage(properties['size'])) 211 elif rotateType == 'time': 212 self.setTimeRotate(properties['time']) 213 self.uiState.set('rotate-type', 214 'every %s' % \ 215 format.formatTime(properties['time'])) 216 else: 217 self.uiState.set('rotate-type', 'disabled') 218 # FIXME: should add a way of saying "do first cycle at this time" 219 220 return self.pipe_template
221
222 - def configure_pipeline(self, pipeline, properties):
223 self.debug('configure_pipeline for disker') 224 self._symlinkToLastRecording = \ 225 properties.get('symlink-to-last-recording', None) 226 self._symlinkToCurrentRecording = \ 227 properties.get('symlink-to-current-recording', None) 228 self._recordAtStart = properties.get('start-recording', True) 229 self._defaultFilenameTemplate = properties.get('filename', 230 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 231 self._startFilenameTemplate = self._defaultFilenameTemplate 232 icalfn = properties.get('ical-schedule') 233 if self._can_schedule and icalfn: 234 self.scheduleRecordings(open(icalfn, 'r')) 235 elif icalfn: 236 # ical schedule is set, but self._can_schedule is False 237 238 def missingModule(moduleName): 239 m = messages.Error(T_(N_( 240 "An iCal file has been specified for scheduling, " 241 "but the '%s' module is not installed.\n"), moduleName), 242 mid='error-python-%s' % moduleName) 243 documentation.messageAddPythonInstall(m, moduleName) 244 self.debug(m) 245 self.addMessage(m)
246 247 if not eventcalendar.HAS_ICALENDAR: 248 missingModule('icalendar') 249 if not eventcalendar.HAS_DATEUTIL: 250 missingModule('dateutil') 251 # self._can_schedule is False, so one of the above surely happened 252 raise errors.ComponentSetupHandledError() 253 254 sink = self.get_element('fdsink') 255 256 if gstreamer.element_factory_has_property('multifdsink', 257 'resend-streamheader'): 258 sink.set_property('resend-streamheader', False) 259 else: 260 self.debug("resend-streamheader property not available, " 261 "resending streamheader when it changes in the caps") 262 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 263 # connect to client-removed so we can detect errors in file writing 264 sink.connect('client-removed', self._client_removed_cb) 265 266 # set event probe if we should react to video mark events 267 react_to_marks = properties.get('react-to-stream-markers', False) 268 if react_to_marks: 269 pfx = properties.get('stream-marker-filename-prefix', '%03d.') 270 self._markerPrefix = pfx 271 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
272 273 ### our methods 274
275 - def _pollDisk(self):
276 # Figure out the remaining disk space where the disker is saving 277 # files to 278 self._pollDiskDC = None 279 s = None 280 try: 281 s = os.statvfs(self.directory) 282 except Exception, e: 283 self.debug('failed to figure out disk space: %s', 284 log.getExceptionMessage(e)) 285 286 if not s: 287 free = None 288 else: 289 free = format.formatStorage(s.f_frsize * s.f_bavail) 290 291 if self.uiState.get('disk-free') != free: 292 self.debug("disk usage changed, reporting to observers") 293 self.uiState.set('disk-free', free)
294
295 - def setTimeRotate(self, time):
296 """ 297 @param time: duration of file (in seconds) 298 """ 299 if self._rotateTimeDelayedCall: 300 self._rotateTimeDelayedCall.cancel() 301 self._rotateTimeDelayedCall = reactor.callLater( 302 time, self._rotateTimeCallLater, time)
303
304 - def setSizeRotate(self, size):
305 """ 306 @param size: size of file (in bytes) 307 """ 308 reactor.callLater(5, self._rotateSizeCallLater, size)
309
310 - def _rotateTimeCallLater(self, time):
311 self.changeFilename() 312 313 # reschedule ourselves indefinitely 314 self._rotateTimeDelayedCall = reactor.callLater( 315 time, self._rotateTimeCallLater, time)
316
317 - def _rotateSizeCallLater(self, size):
318 if not self.location: 319 self.warning('Cannot rotate file, no file location set') 320 else: 321 if os.stat(self.location).st_size > size: 322 self.changeFilename() 323 324 # Add a new one 325 reactor.callLater(5, self._rotateSizeCallLater, size)
326
327 - def getMime(self):
328 if self.caps: 329 return self.caps.get_structure(0).get_name()
330 331 # FIXME: is this method used anywhere ? 332
333 - def get_content_type(self):
334 mime = self.getMime() 335 if mime == 'multipart/x-mixed-replace': 336 mime += ";boundary=ThisRandomString" 337 return mime
338
339 - def scheduleRecordings(self, icalFile):
340 self.uiState.set('has-schedule', True) 341 self.debug('Parsing iCalendar file %s' % icalFile) 342 from flumotion.component.base import scheduler 343 try: 344 self.icalScheduler = scheduler.ICalScheduler(icalFile) 345 self.icalScheduler.subscribe(self.eventInstanceStarted, 346 self.eventInstanceEnded) 347 # FIXME: this should be handled through the subscription 348 # handlers; for that, we should subscribe before the calendar 349 # gets added 350 cal = self.icalScheduler.getCalendar() 351 eventInstances = cal.getActiveEventInstances() 352 if eventInstances: 353 instance = eventInstances[0] 354 content = instance.event.content 355 self.info('Event %s is in progress, start recording' % 356 content) 357 self._startFilenameTemplate = content 358 self._startTimeTuple = instance.start.utctimetuple() 359 self._recordAtStart = True 360 else: 361 self.info('No events in progress') 362 self._recordAtStart = False 363 self._updateNextPoints() 364 except (ValueError, IndexError, KeyError), e: 365 m = messages.Warning(T_(N_( 366 "Error parsing ical file %s, so not scheduling any" 367 " events." % icalFile)), 368 debug=log.getExceptionMessage(e), mid="error-parsing-ical") 369 self.addMessage(m)
370
371 - def changeFilename(self, filenameTemplate=None, timeTuple=None):
372 """ 373 @param filenameTemplate: strftime format string to decide filename 374 @param timeTuple: a valid time tuple to pass to strftime, 375 defaulting to time.localtime(). 376 """ 377 mime = self.getMime() 378 ext = mimeTypeToExtention(mime) 379 380 self.stopRecording() 381 382 sink = self.get_element('fdsink') 383 if sink.get_state() == gst.STATE_NULL: 384 sink.set_state(gst.STATE_READY) 385 386 filename = "" 387 if not filenameTemplate: 388 filenameTemplate = self._defaultFilenameTemplate 389 filename = "%s.%s" % (format.strftime(filenameTemplate, 390 timeTuple or time.localtime()), ext) 391 self.location = os.path.join(self.directory, filename) 392 393 # only overwrite existing files if it was last changed before the 394 # start of this event; ie. if it is a recording of a previous event 395 location = self.location 396 i = 1 397 while os.path.exists(location): 398 mtimeTuple = time.gmtime(os.stat(location).st_mtime) 399 if mtimeTuple <= timeTuple: 400 self.info( 401 "Existing recording %s from previous event, overwriting", 402 location) 403 break 404 405 self.info( 406 "Existing recording %s from current event, changing name", 407 location) 408 location = self.location + '.' + str(i) 409 i += 1 410 self.location = location 411 412 self.info("Changing filename to %s", self.location) 413 try: 414 self.file = open(self.location, 'wb') 415 except IOError, e: 416 self.warning("Failed to open output file %s: %s", 417 self.location, log.getExceptionMessage(e)) 418 m = messages.Error(T_(N_( 419 "Failed to open output file '%s' for writing. " 420 "Check permissions on the file."), self.location)) 421 self.addMessage(m) 422 return 423 self._recordingStarted(self.file, self.location) 424 sink.emit('add', self.file.fileno()) 425 self.last_tstamp = time.time() 426 self.uiState.set('filename', self.location) 427 self.uiState.set('recording', True) 428 429 if self._symlinkToCurrentRecording: 430 self._updateSymlink(self.location, 431 self._symlinkToCurrentRecording)
432 456
457 - def stopRecording(self):
458 sink = self.get_element('fdsink') 459 if sink.get_state() == gst.STATE_NULL: 460 sink.set_state(gst.STATE_READY) 461 462 if self.file: 463 self.file.flush() 464 sink.emit('remove', self.file.fileno()) 465 self._recordingStopped(self.file, self.location) 466 self.file = None 467 self.uiState.set('filename', None) 468 self.uiState.set('recording', False) 469 try: 470 size = format.formatStorage(os.stat(self.location).st_size) 471 except EnvironmentError, e: 472 # catch File not found, permission denied, disk problems 473 size = "unknown" 474 475 # Limit number of entries on filelist, remove the oldest entry 476 fl = self.uiState.get('filelist', otherwise=[]) 477 if FILELIST_SIZE == len(fl): 478 self.uiState.remove('filelist', fl[0]) 479 480 self.uiState.append('filelist', (self.last_tstamp, 481 self.location, 482 size)) 483 484 if self._symlinkToLastRecording: 485 self._updateSymlink(self.location, 486 self._symlinkToLastRecording)
487
488 - def _notify_caps_cb(self, pad, param):
489 caps = pad.get_negotiated_caps() 490 if caps == None: 491 return 492 493 caps_str = gstreamer.caps_repr(caps) 494 self.debug('Got caps: %s' % caps_str) 495 496 new = True 497 if not self.caps == None: 498 self.warning('Already had caps: %s, replacing' % caps_str) 499 new = False 500 501 self.debug('Storing caps: %s' % caps_str) 502 self.caps = caps 503 504 if new and self._recordAtStart: 505 reactor.callLater(0, self.changeFilename, 506 self._startFilenameTemplate, self._startTimeTuple)
507 508 # multifdsink::client-removed 509
510 - def _client_removed_cb(self, element, arg0, client_status):
511 # treat as error if we were removed because of GST_CLIENT_STATUS_ERROR 512 # FIXME: can we use the symbol instead of a numeric constant ? 513 if client_status == 4: 514 # since we get called from the streaming thread, hand off handling 515 # to the reactor's thread 516 reactor.callFromThread(self._client_error_cb)
517
518 - def _client_error_cb(self):
519 self.file.close() 520 self.file = None 521 522 self.setMood(moods.sad) 523 messageId = "error-writing-%s" % self.location 524 m = messages.Error(T_(N_( 525 "Error writing to file '%s'."), self.location), 526 mid=messageId, priority=40) 527 self.addMessage(m)
528
529 - def eventInstanceStarted(self, eventInstance):
530 self.debug('starting recording of %s', eventInstance.event.content) 531 self.changeFilename(eventInstance.event.content, 532 eventInstance.start.timetuple()) 533 self._updateNextPoints()
534
535 - def eventInstanceEnded(self, eventInstance):
536 self.debug('ending recording of %s', eventInstance.event.content) 537 self.stopRecording() 538 self._updateNextPoints()
539
540 - def _updateNextPoints(self):
541 # query the scheduler for what the next points are in its window 542 # and set it on the UI state 543 544 current = self.uiState.get('next-points')[:] 545 points = self.icalScheduler.getPoints() 546 new = [] 547 548 # twisted says 'Currently can't jelly datetime objects with tzinfo', 549 # so convert all to UTC then remove tzinfo. 550 551 def _utcAndStripTZ(dt): 552 from flumotion.common import eventcalendar 553 return dt.astimezone(eventcalendar.UTC).replace(tzinfo=None)
554 555 for p in points: 556 dtUTC = _utcAndStripTZ(p.dt) 557 dtStart = p.eventInstance.start.replace(tzinfo=None) 558 new.append((dtUTC, p.which, 559 format.strftime(p.eventInstance.event.content, 560 dtStart.timetuple()))) 561 562 for t in current: 563 if t not in new: 564 self.debug('removing tuple %r from next-points', t) 565 self.uiState.remove('next-points', t) 566 567 for t in new: 568 if t not in current: 569 self.debug('appending tuple %r to next-points', t) 570 self.uiState.append('next-points', t) 571
572 - def _recordingStarted(self, file, location):
573 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 574 # make sure plugs are configured with our socket, see #732 575 if socket not in self.plugs: 576 return 577 for plug in self.plugs[socket]: 578 self.debug('invoking recordingStarted on ' 579 'plug %r on socket %s', plug, socket) 580 plug.recordingStarted(file, location)
581
582 - def _recordingStopped(self, file, location):
583 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 584 # make sure plugs are configured with our socket, see #732 585 if socket not in self.plugs: 586 return 587 for plug in self.plugs[socket]: 588 self.debug('invoking recordingStopped on ' 589 'plug %r on socket %s', plug, socket) 590 plug.recordingStopped(file, location)
591 592 ### marker methods 593
594 - def _markers_event_probe(self, element, event):
595 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM: 596 evt_struct = event.get_structure() 597 if evt_struct.get_name() == 'FluStreamMark': 598 if evt_struct['action'] == 'start': 599 self._onMarkerStart(evt_struct['prog_id']) 600 elif evt_struct['action'] == 'stop': 601 self._onMarkerStop() 602 return True
603
604 - def _onMarkerStop(self):
605 self.stopRecording()
606
607 - def _onMarkerStart(self, data):
608 tmpl = self._defaultFilenameTemplate 609 if self._markerPrefix: 610 try: 611 tmpl = '%s%s' % (self._markerPrefix % data, 612 self._defaultFilenameTemplate) 613 except TypeError, err: 614 m = messages.Warning(T_(N_('Failed expanding filename prefix: ' 615 '%r <-- %r.'), 616 self._markerPrefix, data), 617 mid='expand-marker-prefix') 618 self.addMessage(m) 619 self.warning('Failed expanding filename prefix: ' 620 '%r <-- %r; %r' % 621 (self._markerPrefix, data, err)) 622 self.changeFilename(tmpl)
623
624 - def do_stop(self):
625 if self._pollDiskDC: 626 self._pollDiskDC.cancel() 627 self._pollDiskDC = None 628 self._diskPoller.stop()
629