Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Feed components, participating in the stream 
 24  """ 
 25   
 26  import os 
 27   
 28  import gst 
 29  import gst.interfaces 
 30  import gobject 
 31   
 32  from twisted.internet import reactor, defer 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.component import component as basecomponent 
 38  from flumotion.component import feed 
 39  from flumotion.common import common, interfaces, errors, log, pygobject, \ 
 40       messages 
 41  from flumotion.common import gstreamer 
 42  from flumotion.common.i18n import N_, gettexter 
 43  from flumotion.common.planet import moods 
 44  from flumotion.common.pygobject import gsignal 
 45   
 46  __version__ = "$Rev: 8808 $" 
 47  T_ = gettexter() 
 48   
 49   
50 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
51 """ 52 I am a component-side medium for a FeedComponent to interface with 53 the manager-side ComponentAvatar. 54 """ 55 implements(interfaces.IComponentMedium) 56 logCategory = 'feedcompmed' 57 remoteLogName = 'feedserver' 58
59 - def __init__(self, component):
60 """ 61 @param component: L{flumotion.component.feedcomponent.FeedComponent} 62 """ 63 basecomponent.BaseComponentMedium.__init__(self, component) 64 65 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) 66 # tuple for remote feeders 67 self._feederPendingConnections = {} # eaterAlias -> cancel thunk 68 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 69 # for remote eaters 70 self._eaterPendingConnections = {} # feederName -> cancel thunk 71 self.logName = component.name
72 73 ### Referenceable remote methods which can be called from manager 74
75 - def remote_attachPadMonitorToFeeder(self, feederName):
76 self.comp.attachPadMonitorToFeeder(feederName)
77
78 - def remote_setGstDebug(self, debug):
79 """ 80 Sets the GStreamer debugging levels based on the passed debug string. 81 82 @since: 0.4.2 83 """ 84 self.debug('Setting GStreamer debug level to %s' % debug) 85 if not debug: 86 return 87 88 for part in debug.split(','): 89 glob = None 90 value = None 91 pair = part.split(':') 92 if len(pair) == 1: 93 # assume only the value 94 value = int(pair[0]) 95 elif len(pair) == 2: 96 glob, value = pair 97 value = int(value) 98 else: 99 self.warning("Cannot parse GStreamer debug setting '%s'." % 100 part) 101 continue 102 103 if glob: 104 try: 105 # value has to be an integer 106 gst.debug_set_threshold_for_name(glob, value) 107 except TypeError: 108 self.warning("Cannot set glob %s to value %s" % ( 109 glob, value)) 110 else: 111 gst.debug_set_default_threshold(value)
112
113 - def remote_eatFrom(self, eaterAlias, fullFeedId, host, port):
114 """ 115 Tell the component the host and port for the FeedServer through which 116 it can connect a local eater to a remote feeder to eat the given 117 fullFeedId. 118 119 Called on by the manager-side ComponentAvatar. 120 """ 121 if self._feederFeedServer.get(eaterAlias): 122 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port): 123 self.debug("Feed:%r is the same as the current one. "\ 124 "Request ignored.", (fullFeedId, host, port)) 125 return 126 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port) 127 return self.connectEater(eaterAlias)
128
129 - def _getAuthenticatorForFeed(self, eaterAliasOrFeedName):
130 # The avatarId on the keycards issued by the authenticator will 131 # identify us to the remote component. Attempt to use our 132 # fullFeedId, for debugging porpoises. 133 if hasattr(self.authenticator, 'copy'): 134 tup = common.parseComponentId(self.authenticator.avatarId) 135 flowName, componentName = tup 136 fullFeedId = common.fullFeedId(flowName, componentName, 137 eaterAliasOrFeedName) 138 return self.authenticator.copy(fullFeedId) 139 else: 140 return self.authenticator
141
142 - def connectEater(self, eaterAlias):
143 """ 144 Connect one of the medium's component's eaters to a remote feed. 145 Called by the component, both on initial connection and for 146 reconnecting. 147 148 @returns: deferred that will fire with a value of None 149 """ 150 # FIXME: There's no indication if the connection was made or not 151 152 def gotFeed((feedId, fd)): 153 self._feederPendingConnections.pop(eaterAlias, None) 154 self.comp.eatFromFD(eaterAlias, feedId, fd)
155 156 if eaterAlias not in self._feederFeedServer: 157 self.debug("eatFrom() hasn't been called yet for eater %s", 158 eaterAlias) 159 # unclear if this function should have a return value at 160 # all... 161 return defer.succeed(None) 162 163 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias] 164 165 cancel = self._feederPendingConnections.pop(eaterAlias, None) 166 if cancel: 167 self.debug('cancelling previous connection attempt on %s', 168 eaterAlias) 169 cancel() 170 171 client = feed.FeedMedium(logName=self.comp.name) 172 173 d = client.requestFeed(host, port, 174 self._getAuthenticatorForFeed(eaterAlias), 175 fullFeedId) 176 self._feederPendingConnections[eaterAlias] = client.stopConnecting 177 d.addCallback(gotFeed) 178 return d
179
180 - def remote_feedTo(self, feederName, fullFeedId, host, port):
181 """ 182 Tell the component to feed the given feed to the receiving component 183 accessible through the FeedServer on the given host and port. 184 185 Called on by the manager-side ComponentAvatar. 186 """ 187 self._eaterFeedServer[fullFeedId] = (host, port) 188 self.connectFeeder(feederName, fullFeedId)
189
190 - def connectFeeder(self, feederName, fullFeedId):
191 """ 192 Tell the component to feed the given feed to the receiving component 193 accessible through the FeedServer on the given host and port. 194 195 Called on by the manager-side ComponentAvatar. 196 """ 197 198 def gotFeed((fullFeedId, fd)): 199 self._eaterPendingConnections.pop(feederName, None) 200 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
201 202 if fullFeedId not in self._eaterFeedServer: 203 self.debug("feedTo() hasn't been called yet for feeder %s", 204 feederName) 205 # unclear if this function should have a return value at 206 # all... 207 return defer.succeed(None) 208 209 host, port = self._eaterFeedServer[fullFeedId] 210 211 # probably should key on feederName as well 212 cancel = self._eaterPendingConnections.pop(fullFeedId, None) 213 if cancel: 214 self.debug('cancelling previous connection attempt on %s', 215 feederName) 216 cancel() 217 218 client = feed.FeedMedium(logName=self.comp.name) 219 220 d = client.sendFeed(host, port, 221 self._getAuthenticatorForFeed(feederName), 222 fullFeedId) 223 self._eaterPendingConnections[feederName] = client.stopConnecting 224 d.addCallback(gotFeed) 225 return d 226
227 - def remote_provideMasterClock(self, port):
228 """ 229 Tells the component to start providing a master clock on the given 230 UDP port. 231 Can only be called if setup() has been called on the component. 232 233 The IP address returned is the local IP the clock is listening on. 234 235 @returns: (ip, port, base_time) 236 @rtype: tuple of (str, int, long) 237 """ 238 self.debug('remote_provideMasterClock(port=%r)' % port) 239 return self.comp.provide_master_clock(port)
240
241 - def remote_getMasterClockInfo(self):
242 """ 243 Return the clock master info created by a previous call 244 to provideMasterClock. 245 246 @returns: (ip, port, base_time) 247 @rtype: tuple of (str, int, long) 248 """ 249 return self.comp.get_master_clock()
250
251 - def remote_setMasterClock(self, ip, port, base_time):
252 return self.comp.set_master_clock(ip, port, base_time)
253
254 - def remote_effect(self, effectName, methodName, *args, **kwargs):
255 """ 256 Invoke the given methodName on the given effectName in this component. 257 The effect should implement effect_(methodName) to receive the call. 258 """ 259 self.debug("calling %s on effect %s" % (methodName, effectName)) 260 if not effectName in self.comp.effects: 261 raise errors.UnknownEffectError(effectName) 262 effect = self.comp.effects[effectName] 263 if not hasattr(effect, "effect_%s" % methodName): 264 raise errors.NoMethodError("%s on effect %s" % (methodName, 265 effectName)) 266 method = getattr(effect, "effect_%s" % methodName) 267 try: 268 result = method(*args, **kwargs) 269 except TypeError: 270 msg = "effect method %s did not accept %s and %s" % ( 271 methodName, args, kwargs) 272 self.debug(msg) 273 raise errors.RemoteRunError(msg) 274 self.debug("effect: result: %r" % result) 275 return result
276
277 - def remote_dumpGstreamerDotFile(self, filename):
278 self.comp.dump_gstreamer_debug_dot_file(filename)
279 280 from feedcomponent010 import FeedComponent 281 282 FeedComponent.componentMediumClass = FeedComponentMedium 283 284
285 -class ParseLaunchComponent(FeedComponent):
286 """A component using gst-launch syntax 287 288 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 289 @cvar checkOffset: whether to check continuity of offsets for 290 eaters 291 """ 292 293 DELIMITER = '@' 294 295 # can be set by subclasses 296 checkTimestamp = False 297 checkOffset = False 298 299 # keep these as class variables for the tests 300 FDSRC_TMPL = 'fdsrc name=%(name)s' 301 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 302 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\ 303 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\ 304 'recover-policy=1' 305 EATER_TMPL = None 306
307 - def init(self):
308 if not gstreamer.get_plugin_version('coreelements'): 309 raise errors.MissingElementError('identity') 310 if not gstreamer.element_factory_has_property('identity', 311 'check-imperfect-timestamp'): 312 self.checkTimestamp = False 313 self.checkOffset = False 314 self.addMessage( 315 messages.Info(T_(N_( 316 "You will get more debugging information " 317 "if you upgrade to GStreamer 0.10.13 or later.")))) 318 319 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL 320 if self.checkTimestamp or self.checkOffset: 321 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE" 322 if self.checkTimestamp: 323 self.EATER_TMPL += " check-imperfect-timestamp=1" 324 if self.checkOffset: 325 self.EATER_TMPL += " check-imperfect-offset=1"
326 327 ### FeedComponent interface implementations 328
329 - def create_pipeline(self):
330 try: 331 unparsed = self.get_pipeline_string(self.config['properties']) 332 except errors.MissingElementError, e: 333 self.warning('Missing %s element' % e.args[0]) 334 m = messages.Error(T_(N_( 335 "The worker does not have the '%s' element installed.\n" 336 "Please install the necessary plug-in and restart " 337 "the component.\n"), e.args[0])) 338 self.addMessage(m) 339 raise errors.ComponentSetupHandledError(e) 340 341 self.pipeline_string = self.parse_pipeline(unparsed) 342 343 try: 344 pipeline = gst.parse_launch(self.pipeline_string) 345 except gobject.GError, e: 346 self.warning('Could not parse pipeline: %s' % e.message) 347 m = messages.Error(T_(N_( 348 "GStreamer error: could not parse component pipeline.")), 349 debug=e.message) 350 self.addMessage(m) 351 raise errors.PipelineParseError(e.message) 352 353 return pipeline
354
355 - def set_pipeline(self, pipeline):
356 FeedComponent.set_pipeline(self, pipeline) 357 if self.checkTimestamp or self.checkOffset: 358 watchElements = dict([ 359 (e.elementName + '-identity', e) 360 for e in self.eaters.values()]) 361 self.install_eater_continuity_watch(watchElements) 362 self.configure_pipeline(self.pipeline, self.config['properties'])
363 364 ### ParseLaunchComponent interface for subclasses 365
366 - def get_pipeline_string(self, properties):
367 """ 368 Method that must be implemented by subclasses to produce the 369 gstparse string for the component's pipeline. Subclasses should 370 not chain up; this method raises a NotImplemented error. 371 372 Returns: a new pipeline string representation. 373 """ 374 raise NotImplementedError('subclasses should implement ' 375 'get_pipeline_string')
376
377 - def configure_pipeline(self, pipeline, properties):
378 """ 379 Method that can be implemented by subclasses if they wish to 380 interact with the pipeline after it has been created and set 381 on the component. 382 383 This could include attaching signals and bus handlers. 384 """ 385 pass
386 387 ### private methods 388
389 - def add_default_eater_feeder(self, pipeline):
390 if len(self.eaters) == 1: 391 eater = 'eater:' + self.eaters.keys()[0] 392 if eater not in pipeline: 393 pipeline = '@' + eater + '@ ! ' + pipeline 394 if len(self.feeders) == 1: 395 feeder = 'feeder:' + self.feeders.keys()[0] 396 if feeder not in pipeline: 397 pipeline = pipeline + ' ! @' + feeder + '@' 398 return pipeline
399
400 - def parse_tmpl(self, pipeline, templatizers):
401 """ 402 Expand the given pipeline string representation by substituting 403 blocks between '@' with a filled-in template. 404 405 @param pipeline: a pipeline string representation with variables 406 @param templatizers: A dict of prefix => procedure. Template 407 blocks in the pipeline will be replaced 408 with the result of calling the procedure 409 with what is left of the template after 410 taking off the prefix. 411 @returns: a new pipeline string representation. 412 """ 413 assert pipeline != '' 414 415 # verify the template has an even number of delimiters 416 if pipeline.count(self.DELIMITER) % 2 != 0: 417 raise TypeError("'%s' contains an odd number of '%s'" 418 % (pipeline, self.DELIMITER)) 419 420 out = [] 421 for i, block in enumerate(pipeline.split(self.DELIMITER)): 422 # when splitting, the even-indexed members will remain, and 423 # the odd-indexed members are the blocks to be substituted 424 if i % 2 == 0: 425 out.append(block) 426 else: 427 block = block.strip() 428 try: 429 pos = block.index(':') 430 except ValueError: 431 raise TypeError("Template %r has no colon" % (block, )) 432 prefix = block[:pos+1] 433 if prefix not in templatizers: 434 raise TypeError("Template %r has invalid prefix %r" 435 % (block, prefix)) 436 out.append(templatizers[prefix](block[pos+1:])) 437 return ''.join(out)
438
439 - def parse_pipeline(self, pipeline):
440 pipeline = " ".join(pipeline.split()) 441 self.debug('Creating pipeline, template is %s', pipeline) 442 443 if pipeline == '' and not self.eaters: 444 raise TypeError("Need a pipeline or a eater") 445 446 if pipeline == '': 447 # code of dubious value 448 assert self.eaters 449 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 450 451 pipeline = self.add_default_eater_feeder(pipeline) 452 pipeline = self.parse_tmpl(pipeline, 453 {'eater:': self.get_eater_template, 454 'feeder:': self.get_feeder_template}) 455 456 self.debug('pipeline is %s', pipeline) 457 assert self.DELIMITER not in pipeline 458 459 return pipeline
460
461 - def get_eater_template(self, eaterAlias):
462 queue = self.get_queue_string(eaterAlias) 463 elementName = self.eaters[eaterAlias].elementName 464 465 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
466
467 - def get_feeder_template(self, feederName):
468 elementName = self.feeders[feederName].elementName 469 return self.FEEDER_TMPL % {'name': elementName}
470
471 - def get_queue_string(self, eaterAlias):
472 """ 473 Return a parse-launch string to join the fdsrc eater element and 474 the depayer, for example '!' or '! queue !'. The string may have 475 no format strings. 476 """ 477 return '!'
478
479 - def get_eater_srcpad(self, eaterAlias):
480 """ 481 Method that returns the source pad of the final element in an eater. 482 483 @returns: the GStreamer source pad of the final element in an eater 484 @rtype: L{gst.Pad} 485 """ 486 e = self.eaters[eaterAlias] 487 identity = self.get_element(e.elementName + '-identity') 488 depay = self.get_element(e.depayName) 489 srcpad = depay.get_pad("src") 490 if identity: 491 srcpad = identity.get_pad("src") 492 return srcpad
493 494
495 -class Effect(log.Loggable):
496 """ 497 I am a part of a feed component for a specific group 498 of functionality. 499 500 @ivar name: name of the effect 501 @type name: string 502 @ivar component: component owning the effect 503 @type component: L{FeedComponent} 504 """ 505 logCategory = "effect" 506
507 - def __init__(self, name):
508 """ 509 @param name: the name of the effect 510 """ 511 self.name = name 512 self.setComponent(None)
513
514 - def setComponent(self, component):
515 """ 516 Set the given component as the effect's owner. 517 518 @param component: the component to set as an owner of this effect 519 @type component: L{FeedComponent} 520 """ 521 self.component = component 522 self.setUIState(component and component.uiState or None)
523
524 - def setUIState(self, state):
525 """ 526 Set the given UI state on the effect. This method is ideal for 527 adding keys to the UI state. 528 529 @param state: the UI state for the component to use. 530 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 531 """ 532 self.uiState = state
533
534 - def getComponent(self):
535 """ 536 Get the component owning this effect. 537 538 @rtype: L{FeedComponent} 539 """ 540 return self.component
541 542
543 -class PostProcEffect (Effect):
544 """ 545 I am an effect that is plugged in the pipeline to do a post processing 546 job and can be chained to other effect of the same class. 547 548 @ivar name: name of the effect 549 @type name: string 550 @ivar component: component owning the effect 551 @type component: L{FeedComponent} 552 @ivar sourcePad: pad of the source after which I'm plugged 553 @type sourcePad: L{GstPad} 554 @ivar effectBin: gstreamer bin doing the post processing effect 555 @type source: L{GstBin} 556 @ivar pipeline: pipeline holding the gstreamer elements 557 @type pipeline: L{GstPipeline} 558 559 """ 560 logCategory = "effect" 561
562 - def __init__(self, name, sourcePad, effectBin, pipeline):
563 """ 564 @param name: the name of the effect 565 @param sourcePad: pad of the source after which I'm plugged 566 @param effectBin: gstreamer bin doing the post processing effect 567 @param pipeline: pipeline holding the gstreamer elements 568 """ 569 Effect.__init__(self, name) 570 self.sourcePad = sourcePad 571 self.effectBin = effectBin 572 self.pipeline = pipeline 573 self.plugged = False
574
575 - def plug(self):
576 """ 577 Plug the effect in the pipeline unlinking the source element with it's 578 downstream peer 579 """ 580 if self.plugged == True: 581 return 582 # Unlink the source pad of the source element after which we need 583 # are going to be plugged 584 peerSinkPad = self.sourcePad 585 peerSrcPad = peerSinkPad.get_peer() 586 peerSinkPad.unlink(peerSrcPad) 587 588 # Add the deinterlacer bin to the pipeline 589 self.effectBin.set_state(gst.STATE_PLAYING) 590 self.pipeline.add(self.effectBin) 591 592 # link it with the element src pad and its peer's sink pad 593 peerSinkPad.link(self.effectBin.get_pad('sink')) 594 self.effectBin.get_pad('src').link(peerSrcPad) 595 self.plugged = True
596 597
598 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
599 """ 600 This class provides for multi-input ParseLaunchComponents, such as muxers, 601 with a queue attached to each input. 602 """ 603 QUEUE_SIZE_BUFFERS = 16 604 LINK_MUXER = True 605
606 - def get_muxer_string(self, properties):
607 """ 608 Return a gst-parse description of the muxer, which 609 must be named 'muxer' 610 """ 611 raise errors.NotImplementedError("Implement in a subclass")
612
613 - def get_queue_string(self, eaterAlias):
614 name = self.eaters[eaterAlias].elementName 615 return ("! queue name=%s-queue max-size-buffers=%d !" 616 % (name, self.QUEUE_SIZE_BUFFERS))
617
618 - def get_pipeline_string(self, properties):
619 eaters = self.config.get('eater', {}) 620 sources = self.config.get('source', []) 621 if eaters == {} and sources != []: 622 # for upgrade without manager restart 623 feeds = [] 624 for feed in sources: 625 if not ':' in feed: 626 feed = '%s:default' % feed 627 feeds.append(feed) 628 eaters = {'default': [(x, 'default') for x in feeds]} 629 630 pipeline = '' 631 for e in eaters: 632 for feed, alias in eaters[e]: 633 pipeline += '@ eater:%s @ ' % alias 634 if self.LINK_MUXER: 635 pipeline += ' ! muxer. ' 636 637 pipeline += self.get_muxer_string(properties) + ' ' 638 639 return pipeline
640
641 - def unblock_eater(self, eaterAlias):
642 # Firstly, ensure that any push in progress is guaranteed to return, 643 # by temporarily enlarging the queue 644 queuename = self.eaters[eaterAlias].elementName + '-queue' 645 queue = self.pipeline.get_by_name(queuename) 646 647 size = queue.get_property("max-size-buffers") 648 queue.set_property("max-size-buffers", size + 1) 649 650 # So, now it's guaranteed to return. However, we want to return the 651 # queue size to its original value. Doing this in a thread-safe manner 652 # is rather tricky... 653 654 def _block_cb(pad, blocked): 655 # This is called from streaming threads, but we don't do anything 656 # here so it's safe. 657 pass
658 659 def _underrun_cb(element): 660 # Called from a streaming thread. The queue element does not hold 661 # the queue lock when this is called, so we block our sinkpad, 662 # then re-check the current level. 663 pad = element.get_pad("sink") 664 pad.set_blocked_async(True, _block_cb) 665 level = element.get_property("current-level-buffers") 666 if level < self.QUEUE_SIZE_BUFFERS: 667 element.set_property('max-size-buffers', 668 self.QUEUE_SIZE_BUFFERS) 669 element.disconnect(signalid) 670 pad.set_blocked_async(False, _block_cb)
671 672 signalid = queue.connect("underrun", _underrun_cb) 673 674
675 -class ReconfigurableComponent(ParseLaunchComponent):
676 677 disconnectedPads = False 678
680 """Should be overrided by subclasses to provide the pipeline the 681 component uses. 682 """ 683 return ""
684
685 - def init(self):
686 self.EATER_TMPL += ' ! queue name=input-%(name)s' 687 self._reset_count = 0 688 689 self.uiState.addKey('reset-count', 0)
690
691 - def setup_completed(self):
694 695 # Public methods 696
697 - def get_output_elements(self):
698 return [self.get_element(f.elementName + '-pay') 699 for f in self.feeders.values()]
700
701 - def get_input_elements(self):
702 return [self.get_element('input-' + f.elementName) 703 for f in self.eaters.values()]
704
705 - def get_base_pipeline_string(self):
706 raise NotImplementedError('Subclasses should implement ' 707 'get_base_pipeline_string')
708
709 - def get_eater_srcpad(self, eaterAlias):
710 e = self.eaters[eaterAlias] 711 inputq = self.get_element('input-' + e.elementName) 712 return inputq.get_pad('src')
713 714 # Private methods 715
716 - def _install_changes_probes(self):
717 """ 718 Add the event probes that will check for the flumotion-reset event. 719 720 Those will trigger the pipeline's blocking and posterior reload 721 """ 722 # FIXME: Add documentation 723 724 def input_reset_event(pad, event): 725 if event.type != gst.EVENT_CUSTOM_DOWNSTREAM: 726 return True 727 if event.get_structure().get_name() != 'flumotion-reset': 728 return True 729 if self.disconnectedPads: 730 return False 731 732 self.log('RESET: in reset event received on input pad %r', pad) 733 self._reset_count = len(self.feeders) 734 # Block all the eaters and send an eos downstream the pipeline to 735 # drain all the elements. It will also unlink the pipeline from the 736 # input queues. 737 self._block_eaters() 738 # Do not propagate the event. It is sent from the other side of 739 # the pipeline after it has been drained. 740 return False
741 742 def output_reset_event(pad, event): 743 if event.type != gst.EVENT_EOS: 744 return True 745 746 self.log('RESET: out reset event received on output pad %r', pad) 747 # TODO: Can we use EVENT_FLUSH_{START,STOP} for the same purpose? 748 # The component only waits for the first eos to come. After that 749 # all the elements inside the pipeline will be down and won't 750 # process any more events. 751 # Pads cannot be blocked from the streaming thread. They have to be 752 # manipulated from outside according gstreamer's documentation 753 self._reset_count -= 1 754 if self._reset_count > 0: 755 return False 756 757 self._send_reset_event() 758 reactor.callFromThread(self._on_pipeline_drained) 759 # Do not let the eos pass. 760 return False
761 762 self.log('RESET: installing event probes for detecting changes') 763 # Listen for incoming flumotion-reset events on eaters 764 for elem in self.get_input_elements(): 765 self.debug('RESET: adding event probe for %s', elem.get_name()) 766 elem.get_pad('sink').add_event_probe(input_reset_event) 767 768 for elem in self.get_output_elements(): 769 self.debug('RESET: adding event probe for %s', elem.get_name()) 770 elem.get_pad('sink').add_event_probe(output_reset_event) 771
772 - def _block_eaters(self):
773 """ 774 Function that blocks all the identities of the eaters 775 """ 776 for elem in self.get_input_elements(): 777 pad = elem.get_pad('src') 778 self.debug("RESET: Blocking pad %s", pad) 779 pad.set_blocked_async(True, self._on_eater_blocked)
780
781 - def _unblock_eaters(self):
782 for elem in self.get_input_elements(): 783 pad = elem.get_pad('src') 784 self.debug("RESET: Unblocking pad %s", pad) 785 pad.set_blocked_async(False, self._on_eater_blocked)
786
787 - def _send_reset_event(self):
788 event = gst.event_new_custom(gst.EVENT_CUSTOM_DOWNSTREAM, 789 gst.Structure('flumotion-reset')) 790 791 for elem in self.get_output_elements(): 792 pad = elem.get_pad('sink') 793 pad.send_event(event)
794 808
809 - def _remove_pipeline(self, pipeline, element, end, done=None):
810 if done is None: 811 done = [] 812 if not element: 813 return 814 if element in done: 815 return 816 if element in end: 817 return 818 819 for src in element.src_pads(): 820 self.log('going to start by pad %r', src) 821 if not src.get_peer(): 822 continue 823 peer = src.get_peer().get_parent() 824 self._remove_pipeline(pipeline, peer, end, done) 825 done.append(peer) 826 element.unlink(peer) 827 828 self.log("RESET: removing old element %s from pipeline", element) 829 element.set_state(gst.STATE_NULL) 830 pipeline.remove(element)
831
832 - def _rebuild_pipeline(self):
833 # TODO: Probably this would be easier and clearer if we used a bin to 834 # wrap the component's functionality.Then the component would only need 835 # to reset the bin and connect the resulting pads to the {eat,feed}ers. 836 837 self.log('RESET: Going to rebuild the pipeline') 838 839 base_pipe = self._get_base_pipeline_string() 840 841 # Place a fakesrc element so we can know from where to start 842 # rebuilding the pipeline. 843 fake_pipeline = 'fakesrc name=start ! %s' % base_pipe 844 pipeline = gst.parse_launch(fake_pipeline) 845 846 def move_element(element, orig, dest): 847 if not element: 848 return 849 if element in done: 850 return 851 852 to_link = [] 853 done.append(element) 854 self.log("RESET: going to remove %s", element) 855 for src in element.src_pads(): 856 self.log("RESET: got src pad element %s", src) 857 if not src.get_peer(): 858 continue 859 peer = src.get_peer().get_parent() 860 to_link.append(peer) 861 862 move_element(to_link[-1], orig, dest) 863 864 self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK]) 865 orig.remove(element) 866 dest.add(element) 867 868 self.log("RESET: new element %s added to the pipeline", element) 869 for peer in to_link: 870 self.log("RESET: linking peers %s -> %s", element, peer) 871 element.link(peer)
872 873 done = [] 874 start = pipeline.get_by_name('start').get_pad('src').get_peer() 875 move_element(start.get_parent(), pipeline, self.pipeline) 876 877 # Link eaters to the first element in the pipeline 878 # By now we there can only be two situations: 879 # 1. Encoders, where there is only one eater connected to the encoder 880 # 2. Muxers, where multiple eaters are connected directly to the muxer 881 # TODO: Probably we'd like the link process to check the caps 882 if len(self.get_input_elements()) == 1: 883 elem = self.get_input_elements()[0] 884 self.log("RESET: linking eater %r to %r", elem, done[0]) 885 elem.link(done[0]) 886 887 # Link the last element in the pipeline to the feeders. 888 if len(self.get_output_elements()) == 1: 889 elem = self.get_output_elements()[0] 890 self.log("RESET: linking %r to feeder %r", done[-1], elem) 891 done[-1].link(elem) 892 893 self.configure_pipeline(self.pipeline, self.config['properties']) 894 self.pipeline.set_state(gst.STATE_PLAYING) 895 self._unblock_eaters() 896 897 resets = self.uiState.get('reset-count') 898 self.uiState.set('reset-count', resets+1) 899 900 # Callbacks 901
902 - def _on_pad_blocked(self, pad, blocked):
903 self.log("RESET: Pad %s %s", pad, 904 (blocked and "blocked") or "unblocked")
905
906 - def _on_eater_blocked(self, pad, blocked):
907 self._on_pad_blocked(pad, blocked) 908 if blocked: 909 peer = pad.get_peer() 910 peer.send_event(gst.event_new_eos())
911 #self._unlink_pads(pad.get_parent(), [gst.PAD_SRC]) 912
913 - def _on_pipeline_drained(self):
914 self.debug('RESET: Proceed to unlink pipeline') 915 start = self.get_input_elements() 916 end = self.get_output_elements() 917 done = [] 918 for element in start: 919 element = element.get_pad('src').get_peer().get_parent() 920 self._remove_pipeline(self.pipeline, element, end, done) 921 self._rebuild_pipeline()
922 923
924 -class EncoderComponent(ParseLaunchComponent):
925 """ 926 Component that is reconfigured when new changes arrive through the 927 flumotion-reset event (sent by the fms producer). 928 """ 929 pass
930 931
932 -class MuxerComponent(MultiInputParseLaunchComponent):
933 """ 934 This class provides for multi-input ParseLaunchComponents, such as muxers, 935 that handle flumotion-reset events for reconfiguration. 936 """ 937 938 LINK_MUXER = False 939 942
943 - def configure_pipeline(self, pipeline, properties):
944 """ 945 Method not overridable by muxer subclasses. 946 """ 947 # link the muxers' sink pads when data comes in so we get compatible 948 # sink pads with input data 949 # gone are the days when we know we only have one pad template in 950 # muxers 951 self.fired_eaters = 0 952 self._probes = {} # depay element -> id 953 954 def buffer_probe_cb(a, b, depay, eaterAlias): 955 pad = depay.get_pad("src") 956 caps = pad.get_negotiated_caps() 957 if not caps: 958 return False 959 srcpad_to_link = self.get_eater_srcpad(eaterAlias) 960 muxer = self.pipeline.get_by_name("muxer") 961 self.debug("Trying to get compatible pad for pad %r with caps %s", 962 srcpad_to_link, caps) 963 linkpad = self.get_link_pad(muxer, srcpad_to_link, caps) 964 self.debug("Got link pad %r", linkpad) 965 if not linkpad: 966 m = messages.Error(T_(N_( 967 "The incoming data is not compatible with this muxer.")), 968 debug="Caps %s not compatible with this muxer." % ( 969 caps.to_string())) 970 self.addMessage(m) 971 # this is the streaming thread, cannot set state here 972 # so we do it in the mainloop 973 reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL) 974 return True 975 srcpad_to_link.link(linkpad) 976 depay.get_pad("src").remove_buffer_probe(self._probes[depay]) 977 if srcpad_to_link.is_blocked(): 978 self.is_blocked_cb(srcpad_to_link, True) 979 else: 980 srcpad_to_link.set_blocked_async(True, self.is_blocked_cb) 981 return True
982 983 for e in self.eaters: 984 depay = self.get_element(self.eaters[e].depayName) 985 self._probes[depay] = \ 986 depay.get_pad("src").add_buffer_probe( 987 buffer_probe_cb, depay, e)
988
989 - def is_blocked_cb(self, pad, is_blocked):
990 if is_blocked: 991 self.fired_eaters = self.fired_eaters + 1 992 if self.fired_eaters == len(self.eaters): 993 self.debug("All pads are now blocked") 994 self.disconnectedPads = False 995 for e in self.eaters: 996 srcpad = self.get_eater_srcpad(e) 997 srcpad.set_blocked_async(False, self.is_blocked_cb)
998