1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 parsing of manager configuration files
24 """
25
26 import operator
27 import warnings
28
29 from flumotion.common import log, errors, common, registry
30 from flumotion.common import config as fluconfig
31 from flumotion.common.xmlwriter import cmpComponentType, XMLWriter
32 from flumotion.configure import configure
33
34 __version__ = "$Rev: 8776 $"
35
36
39
40
42
43 def parseFeedId(feedId):
44 if feedId.find(':') == -1:
45 return "%s:default" % feedId
46 else:
47 return feedId
48
49 eaterConfig = conf.get('eater', {})
50 sourceConfig = conf.get('source', [])
51 if eaterConfig == {} and sourceConfig != []:
52 eaters = registry.getRegistry().getComponent(
53 conf.get('type')).getEaters()
54 eatersDict = {}
55 eatersTuple = [(None, parseFeedId(s)) for s in sourceConfig]
56 eatersDict = buildEatersDict(eatersTuple, eaters)
57 conf['eater'] = eatersDict
58
59 if sourceConfig:
60 sources = []
61 for s in sourceConfig:
62 sources.append(parseFeedId(s))
63 conf['source'] = sources
64
65
67 eaters = dict(conf.get('eater', {}))
68 concat = lambda lists: reduce(list.__add__, lists, [])
69 if not reduce(lambda x, y: y and isinstance(x, tuple),
70 concat(eaters.values()),
71 True):
72 for eater in eaters:
73 aliases = []
74 feeders = eaters[eater]
75 for i in range(len(feeders)):
76 val = feeders[i]
77 if isinstance(val, tuple):
78 feedId, alias = val
79 aliases.append(val[1])
80 else:
81 feedId = val
82 alias = eater
83 while alias in aliases:
84 log.warning('config', "Duplicate alias %s for "
85 "eater %s, uniquifying", alias, eater)
86 alias += '-bis'
87 aliases.append(alias)
88 feeders[i] = (feedId, val)
89 conf['eater'] = eaters
90
91 UPGRADERS = [upgradeEaters, upgradeAliases]
92 CURRENT_VERSION = len(UPGRADERS)
93
94
96 """Build a eaters dict suitable for forming part of a component
97 config.
98
99 @param eatersList: List of eaters. For example,
100 [('default', 'othercomp:feeder', 'foo')] says
101 that our eater 'default' will be fed by the feed
102 identified by the feedId 'othercomp:feeder', and
103 that it has the alias 'foo'. Alias is optional.
104 @type eatersList: List of (eaterName, feedId, eaterAlias?)
105 @param eaterDefs: The set of allowed and required eaters
106 @type eaterDefs: List of
107 L{flumotion.common.registry.RegistryEntryEater}
108 @returns: Dict of eaterName => [(feedId, eaterAlias)]
109 """
110
111 def parseEaterTuple(tup):
112
113 def parse(eaterName, feedId, eaterAlias=None):
114 if eaterAlias is None:
115 eaterAlias = eaterName
116 return (eaterName, feedId, eaterAlias)
117 return parse(*tup)
118
119 eaters = {}
120 for eater, feedId, alias in [parseEaterTuple(t) for t in eatersList]:
121 if eater is None:
122 if not eaterDefs:
123 raise errors.ConfigError(
124 "Feed %r cannot be connected, component has no eaters" %
125 (feedId, ))
126
127 eater = eaterDefs[0].getName()
128 if alias is None:
129 alias = eater
130 feeders = eaters.get(eater, [])
131 if feedId in feeders:
132 raise errors.ConfigError(
133 "Already have a feedId %s eating from %s" %
134 (feedId, eater))
135 while alias in [a for f, a in feeders]:
136 log.debug('config', "Duplicate alias %s for eater %s, "
137 "uniquifying", alias, eater)
138 alias += '-bis'
139
140 feeders.append((feedId, alias))
141 eaters[eater] = feeders
142 for e in eaterDefs:
143 eater = e.getName()
144 if e.getRequired() and not eater in eaters:
145 raise errors.ConfigError("Component wants to eat on %s,"
146 " but no feeders specified."
147 % (e.getName(), ))
148 if not e.getMultiple() and len(eaters.get(eater, [])) > 1:
149 raise errors.ConfigError("Component does not support multiple "
150 "sources feeding %s (%r)"
151 % (eater, eaters[eater]))
152 aliases = reduce(list.__add__,
153 [[x[1] for x in tups] for tups in eaters.values()],
154 [])
155
156
157 while aliases:
158 alias = aliases.pop()
159 if alias in aliases:
160 raise errors.ConfigError("Duplicate alias: %s" % (alias, ))
161
162 return eaters
163
164
166 """Build a virtual feeds dict suitable for forming part of a
167 component config.
168
169 @param feedPairs: List of virtual feeds, as name-feederName pairs. For
170 example, [('bar:baz', 'qux')] defines one
171 virtual feed 'bar:baz', which is provided by
172 the component's 'qux' feed.
173 @type feedPairs: List of (feedId, feedName) -- both strings.
174 @param feeders: The feeders exported by this component, from the
175 registry.
176 @type feeders: List of str.
177 """
178 ret = {}
179 for virtual, real in feedPairs:
180 if real not in feeders:
181 raise errors.ConfigError('virtual feed maps to unknown feeder: '
182 '%s -> %s' % (virtual, real))
183 try:
184 common.parseFeedId(virtual)
185 except:
186 raise errors.ConfigError('virtual feed name not a valid feedId: %s'
187 % (virtual, ))
188 ret[virtual] = real
189 return ret
190
191
192 -def dictDiff(old, new, onlyOld=None, onlyNew=None, diff=None,
193 keyBase=None):
194 """Compute the difference between two config dicts.
195
196 @returns: 3 tuple: (onlyOld, onlyNew, diff) where:
197 onlyOld is a list of (key, value), representing key-value
198 pairs that are only in old;
199 onlyNew is a list of (key, value), representing key-value
200 pairs that are only in new;
201 diff is a list of (key, oldValue, newValue), representing
202 keys with different values in old and new; and
203 key is a tuple of strings representing the recursive key
204 to get to a value. For example, ('foo', 'bar') represents
205 the value d['foo']['bar'] on a dict d.
206 """
207
208
209 if onlyOld is None:
210 onlyOld = []
211 onlyNew = []
212 diff = []
213 keyBase = ()
214
215 for k in old:
216 key = (keyBase + (k, ))
217 if k not in new:
218 onlyOld.append((key, old[k]))
219 elif old[k] != new[k]:
220 if isinstance(old[k], dict) and isinstance(new[k], dict):
221 dictDiff(old[k], new[k], onlyOld, onlyNew, diff, key)
222 else:
223 diff.append((key, old[k], new[k]))
224
225 for k in new:
226 key = (keyBase + (k, ))
227 if k not in old:
228 onlyNew.append((key, new[k]))
229
230 return onlyOld, onlyNew, diff
231
232
235
236 def ref(label, k):
237 return "%s%s: '%s'" % (label,
238 ''.join(["[%r]" % (subk, )
239 for subk in k[:-1]]),
240 k[-1])
241
242 out = []
243 for k, v in old:
244 out.append('Only in %s = %r' % (ref(oldLabel, k), v))
245 for k, v in new:
246 out.append('Only in %s = %r' % (ref(newLabel, k), v))
247 for k, oldv, newv in diff:
248 out.append('Value mismatch:')
249 out.append(' %s = %r' % (ref(oldLabel, k), oldv))
250 out.append(' %s = %r' % (ref(newLabel, k), newv))
251 return '\n'.join(out)
252
253
254 -class ConfigEntryComponent(log.Loggable):
255 "I represent a <component> entry in a planet config file"
256 nice = 0
257 logCategory = 'config'
258
259 __pychecker__ = 'maxargs=13'
260
261 - def __init__(self, name, parent, type, label, propertyList, plugList,
262 worker, eatersList, isClockMaster, project, version,
263 virtualFeeds=None):
264 self.name = name
265 self.parent = parent
266 self.type = type
267 self.label = label
268 self.worker = worker
269 self.defs = registry.getRegistry().getComponent(self.type)
270 try:
271 self.config = self._buildConfig(propertyList, plugList,
272 eatersList, isClockMaster,
273 project, version,
274 virtualFeeds)
275 except errors.ConfigError, e:
276
277 e.args = ("While parsing component %s: %s"
278 % (name, log.getExceptionMessage(e)), )
279 raise
280
281 - def _buildVersionTuple(self, version):
282 if version is None:
283 return configure.versionTuple
284 elif isinstance(version, tuple):
285 assert len(version) == 4
286 return version
287 elif isinstance(version, str):
288 try:
289 return common.versionStringToTuple(version)
290 except:
291 raise errors.ConfigError(
292 "<component> version %r not parseable" % version)
293 raise errors.ConfigError(
294 "<component> version %r not parseable" % version)
295
296 - def _buildConfig(self, propertyList, plugsList, eatersList,
297 isClockMaster, project, version, virtualFeeds):
298 """
299 Build a component configuration dictionary.
300 """
301
302
303
304 config = {'name': self.name,
305 'parent': self.parent,
306 'type': self.type,
307 'config-version': CURRENT_VERSION,
308 'avatarId': common.componentId(self.parent, self.name),
309 'project': project or configure.PACKAGE,
310 'version': self._buildVersionTuple(version),
311 'clock-master': isClockMaster or None,
312 'feed': self.defs.getFeeders(),
313 'properties': fluconfig.buildPropertyDict(propertyList,
314 self.defs.getProperties()),
315 'plugs': fluconfig.buildPlugsSet(plugsList,
316 self.defs.getSockets()),
317 'eater': buildEatersDict(eatersList,
318 self.defs.getEaters()),
319 'source': [tup[1] for tup in eatersList],
320 'virtual-feeds': buildVirtualFeeds(virtualFeeds or [],
321 self.defs.getFeeders())}
322
323 if self.label:
324
325 config['label'] = self.label
326
327 if not config['source']:
328
329 del config['source']
330
331
332 return config
333
336
337 - def getLabel(self):
339
342
343 - def getParent(self):
345
346 - def getConfigDict(self):
348
349 - def getWorker(self):
351
352
354 """
355 I represent a <flow> entry in a planet config file.
356
357 @ivar name: name of flow
358 @type name: str
359 @ivar components: dict of name -> component config
360 @type components: dict of str -> L{ConfigEntryComponent}
361 """
362
363 - def __init__(self, name, components):
364 self.name = name
365 self.components = {}
366 for c in components:
367 if c.name in self.components:
368 raise errors.ConfigError(
369 'flow %s already has component named %s' % (name, c.name))
370 self.components[c.name] = c
371
372
374 "I represent a <manager> entry in a planet config file"
375
376 - def __init__(self, name, host, port, transport, certificate, bouncer,
377 fludebug, plugs):
378 self.name = name
379 self.host = host
380 self.port = port
381 self.transport = transport
382 self.certificate = certificate
383 self.bouncer = bouncer
384 self.fludebug = fludebug
385 self.plugs = plugs
386
387
389 "I represent a <atmosphere> entry in a planet config file"
390
391 - def __init__(self):
393
395 return len(self.components)
396
397
399 """
400 This is a base class for parsing planet configuration files (both manager
401 and flow files).
402 """
403 logCategory = 'config'
404
406 if feedId.find(':') == -1:
407 return "%s:default" % feedId
408 else:
409 return feedId
410
417
418 - def parseComponent(self, node, parent, isFeedComponent,
419 needsWorker):
420 """
421 Parse a <component></component> block.
422
423 @rtype: L{ConfigEntryComponent}
424 """
425
426
427
428
429
430
431
432
433
434
435
436
437 attrs = self.parseAttributes(node, ('name', 'type'),
438 ('label', 'worker', 'project', 'version', ))
439 name, componentType, label, worker, project, version = attrs
440 if needsWorker and not worker:
441 raise errors.ConfigError(
442 'component %s does not specify the worker '
443 'that it is to run on' % (name, ))
444 elif worker and not needsWorker:
445 raise errors.ConfigError('component %s specifies a worker to run '
446 'on, but does not need a worker' % (name, ))
447
448 properties = []
449 plugs = []
450 eaters = []
451 clockmasters = []
452 sources = []
453 virtual_feeds = []
454
455 def parseBool(node):
456 return self.parseTextNode(node, common.strToBool)
457 parsers = {'property': (self._parseProperty, properties.append),
458 'compound-property': (self._parseCompoundProperty,
459 properties.append),
460 'plugs': (self.parsePlugs, plugs.extend)}
461
462 if isFeedComponent:
463 parsers.update({'eater': (self._parseEater, eaters.extend),
464 'clock-master': (parseBool, clockmasters.append),
465 'source': (self._parseSource, sources.append),
466 'virtual-feed': (self._parseVirtualFeed,
467 virtual_feeds.append)})
468
469 self.parseFromTable(node, parsers)
470
471 if len(clockmasters) == 0:
472 isClockMaster = None
473 elif len(clockmasters) == 1:
474 isClockMaster = clockmasters[0]
475 else:
476 raise errors.ConfigError("Only one <clock-master> node allowed")
477
478 if sources:
479 msg = ('"source" tag has been deprecated in favor of "eater",'
480 ' please update your configuration file (found in'
481 ' component %r)' % name)
482 warnings.warn(msg, DeprecationWarning)
483
484 for feedId in sources:
485
486 eaters.append((None, feedId))
487
488 return ConfigEntryComponent(name, parent, componentType, label,
489 properties, plugs, worker, eaters,
490 isClockMaster, project, version,
491 virtual_feeds)
492
495
500
502
503
504
505 name, = self.parseAttributes(node, ('name', ))
506 feeds = []
507 parsers = {'feed': (self._parseFeed, feeds.append)}
508 self.parseFromTable(node, parsers)
509 if len(feeds) == 0:
510
511 raise errors.ConfigError(
512 "Eater node %s with no <feed> nodes, is not allowed" % (
513 name, ))
514 return [(name, feedId, alias) for feedId, alias in feeds]
515
516
518 """
519 I represent a planet configuration file for Flumotion.
520
521 @ivar atmosphere: A L{ConfigEntryAtmosphere}, filled in when parse() is
522 called.
523 @ivar flows: A list of L{ConfigEntryFlow}, filled in when parse() is
524 called.
525 """
526 logCategory = 'config'
527
533
553
555
556
557
558
559 ret = {}
560
561 def parseComponent(node):
562 return self.parseComponent(node, 'atmosphere', False, True)
563
564 def gotComponent(comp):
565 ret[comp.name] = comp
566 parsers = {'component': (parseComponent, gotComponent)}
567 self.parseFromTable(node, parsers)
568 return ret
569
586 parsers = {'component': (parseComponent, components.append)}
587 self.parseFromTable(node, parsers)
588
589
590
591
592 masters = [x for x in components if x.config['clock-master']]
593 if len(masters) > 1:
594 raise errors.ConfigError("Multiple clock masters in flow %s: %s"
595 % (name, ', '.join([m.name for m in masters])))
596
597 need_sync = [(x.defs.getClockPriority(), x) for x in components
598 if x.defs.getNeedsSynchronization()]
599 need_sync.sort()
600 need_sync = [x[1] for x in need_sync]
601
602 if need_sync:
603 if masters:
604 master = masters[0]
605 else:
606 master = need_sync[-1]
607
608 masterAvatarId = master.config['avatarId']
609 self.info("Setting %s as clock master" % masterAvatarId)
610
611 for c in need_sync:
612 c.config['clock-master'] = masterAvatarId
613 elif masters:
614 self.info('master clock specified, but no synchronization '
615 'necessary -- ignoring')
616 masters[0].config['clock-master'] = None
617
618 return ConfigEntryFlow(name, components)
619
620
621
623 """
624 Get all component entries from both atmosphere and all flows
625 from the configuration.
626
627 @rtype: dictionary of /parent/name -> L{ConfigEntryComponent}
628 """
629 entries = {}
630 if self.atmosphere and self.atmosphere.components:
631 for c in self.atmosphere.components.values():
632 path = common.componentId('atmosphere', c.name)
633 entries[path] = c
634
635 for flowEntry in self.flows:
636 for c in flowEntry.components.values():
637 path = common.componentId(c.parent, c.name)
638 entries[path] = c
639
640 return entries
641
642
643
644
645
646
648 """
649 I parse manager configuration out of a planet configuration file.
650
651 @ivar manager: A L{ConfigEntryManager} containing options for
652 the manager section, filled in at construction time.
653 """
654 logCategory = 'config'
655
656 MANAGER_SOCKETS = \
657 ['flumotion.component.plugs.adminaction.AdminActionPlug',
658 'flumotion.component.plugs.base.ManagerPlug',
659 'flumotion.component.plugs.identity.IdentityProviderPlug']
660
675
687
689
690
691 name, = self.parseAttributes(node, (), ('name', ))
692 ret = ConfigEntryManager(name, None, None, None, None, None,
693 None, self.plugs)
694
695 def simpleparse(proc):
696 return lambda node: self.parseTextNode(node, proc)
697
698 def recordval(k):
699
700 def record(v):
701 if getattr(ret, k):
702 raise errors.ConfigError('duplicate %s: %s'
703 % (k, getattr(ret, k)))
704 setattr(ret, k, v)
705 return record
706
707 def enum(*allowed):
708
709 def eparse(v):
710 v = str(v)
711 if v not in allowed:
712 raise errors.ConfigError('unknown value %s (should be '
713 'one of %r)' % (v, allowed))
714 return v
715 return eparse
716
717 parsers = {'host': (simpleparse(str), recordval('host')),
718 'port': (simpleparse(int), recordval('port')),
719 'transport': (simpleparse(enum('tcp', 'ssl')),
720 recordval('transport')),
721 'certificate': (simpleparse(str), recordval('certificate')),
722 'component': (_ignore, _ignore),
723 'plugs': (_ignore, _ignore),
724 'debug': (simpleparse(str), recordval('fludebug'))}
725 self.parseFromTable(node, parsers)
726 return ret
727
729
730 def parsecomponent(node):
731 return self.parseComponent(node, 'manager', False, False)
732
733 def gotcomponent(val):
734 if self.bouncer is not None:
735 raise errors.ConfigError('can only have one bouncer '
736 '(%s is superfluous)' % (val.name, ))
737
738 self.bouncer = val
739
740 def parseplugs(node):
741 return fluconfig.buildPlugsSet(self.parsePlugs(node),
742 self.MANAGER_SOCKETS)
743
744 def gotplugs(newplugs):
745 for socket in self.plugs:
746 self.plugs[socket].extend(newplugs[socket])
747
748 parsers = {'host': (_ignore, _ignore),
749 'port': (_ignore, _ignore),
750 'transport': (_ignore, _ignore),
751 'certificate': (_ignore, _ignore),
752 'component': (parsecomponent, gotcomponent),
753 'plugs': (parseplugs, gotplugs),
754 'debug': (_ignore, _ignore)}
755 self.parseFromTable(node, parsers)
756
772
774 self.doc.unlink()
775 self.doc = None
776
777
779
783
794
800
811
813 config = component.get('config')
814 attrs = [('name', component.get('name')),
815 ('type', component.get('type')),
816 ('label', config.get('label', component.get('name'))),
817 ('worker', component.get('workerRequested')),
818 ('project', config['project']),
819 ('version', common.versionTupleToString(config['version']))]
820 self.pushTag('component', attrs)
821 for name, feeders in config['eater'].items():
822 self._writeEater(name, feeders)
823 self._writeProperties(config['properties'].items())
824 if isFeedComponent:
825 if config['clock-master'] == config['avatarId']:
826 value = 'true'
827 else:
828 value = 'false'
829 self.writeTag('clock-master', data=value)
830 self._writePlugs(config['plugs'].items())
831 self._writeVirtualFeeds(config['virtual-feeds'].items())
832 self.popTag()
833 self.writeLine()
834
836 attrs = [('name', name)]
837 self.pushTag('eater', attrs)
838 for feedId, alias in feeders:
839 attrs = [('alias', alias)]
840 self.writeTag('feed', attrs, feedId)
841 self.popTag()
842
844
845 def serialise(propVal):
846 if isinstance(propVal, tuple):
847 return ["%d/%d" % propVal]
848 elif isinstance(propVal, list):
849 return propVal
850 else:
851 return [propVal]
852 for name, value in properties:
853 attrs = [('name', name)]
854 for value in serialise(value):
855 self.writeTag('property', attrs, value)
856
865
871
873 for name, real in virtualfeeds:
874 attrs = [('name', name),
875 ('real', real)]
876 self.writeTag('virtual-feed', attrs)
877
878
882