1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import random
24 import socket
25 import string
26 import time
27 from urllib2 import urlparse
28
29 from twisted.cred import portal
30 from twisted.internet import protocol, reactor, address, error, defer
31 from twisted.spread import pb
32 from zope.interface import implements
33
34 from flumotion.common import medium, log, messages, errors
35 from flumotion.common.i18n import N_, gettexter
36 from flumotion.component import component
37 from flumotion.component.component import moods
38 from flumotion.twisted import credentials, fdserver, checkers
39 from flumotion.twisted import reflect
40
41 __version__ = "$Rev: 8681 $"
42 T_ = gettexter()
43
44
46 """
47 An Avatar in the porter representing a streamer
48 """
49
50 - def __init__(self, avatarId, porter, mind):
57
59 return self.mind != None
60
62 self.debug("porter client %s logging out", self.avatarId)
63 self.mind = None
64
68
72
76
80
81
83 """
84 A Realm within the Porter that creates Avatars for streamers logging into
85 the porter.
86 """
87 implements(portal.IRealm)
88
90 """
91 @param porter: The porter that avatars created from here should use.
92 @type porter: L{Porter}
93 """
94 self.porter = porter
95
104
105
107
109 """
110 Return the location, login username/password, and listening port
111 and interface for the porter as a tuple (path, username,
112 password, port, interface, external-interface).
113 """
114 return (self.comp._socketPath, self.comp._username,
115 self.comp._password, self.comp._iptablesPort,
116 self.comp._interface, self.comp._external_interface)
117
118
119 -class Porter(component.BaseComponent, log.Loggable):
120 """
121 The porter optionally sits in front of a set of streamer components.
122 The porter is what actually deals with incoming connections on a socket.
123 It decides which streamer to direct the connection to, then passes the FD
124 (along with some amount of already-read data) to the appropriate streamer.
125 """
126
127 componentMediumClass = PorterMedium
128
130
131
132 self._mappings = {}
133 self._prefixes = {}
134
135 self._socketlistener = None
136
137 self._socketPath = None
138 self._username = None
139 self._password = None
140 self._port = None
141 self._iptablesPort = None
142 self._porterProtocol = None
143
144 self._interface = ''
145 self._external_interface = ''
146
148 """
149 Register a path as being served by a streamer represented by this
150 avatar. Will remove any previous registration at this path.
151
152 @param path: The path to register
153 @type path: str
154 @param avatar: The avatar representing the streamer to direct this path
155 to
156 @type avatar: L{PorterAvatar}
157 """
158 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
159 if path in self._mappings:
160 self.warning("Replacing existing mapping for path \"%s\"" % path)
161
162 self._mappings[path] = avatar
163
165 """
166 Attempt to deregister the given path. A deregistration will only be
167 accepted if the mapping is to the avatar passed.
168
169 @param path: The path to deregister
170 @type path: str
171 @param avatar: The avatar representing the streamer being deregistered
172 @type avatar: L{PorterAvatar}
173 """
174 if path in self._mappings:
175 if self._mappings[path] == avatar:
176 self.debug("Removing porter mapping for \"%s\"" % path)
177 del self._mappings[path]
178 else:
179 self.warning(
180 "Mapping not removed: refers to a different avatar")
181 else:
182 self.warning("Mapping not removed: no mapping found")
183
185 """
186 Register a destination for all requests directed to anything beginning
187 with a specified prefix. Where there are multiple matching prefixes,
188 the longest is selected.
189
190 @param avatar: The avatar being registered
191 @type avatar: L{PorterAvatar}
192 """
193
194 self.debug("Setting prefix \"%s\" for porter", prefix)
195 if prefix in self._prefixes:
196 self.warning("Overwriting prefix")
197
198 self._prefixes[prefix] = avatar
199
201 """
202 Attempt to deregister a default destination for all requests not
203 directed to a specifically-mapped path. This will only succeed if the
204 default is currently equal to this avatar.
205
206 @param avatar: The avatar being deregistered
207 @type avatar: L{PorterAvatar}
208 """
209 if prefix not in self._prefixes:
210 self.warning("Mapping not removed: no mapping found")
211 return
212
213 if self._prefixes[prefix] == avatar:
214 self.debug("Removing prefix destination from porter")
215 del self._prefixes[prefix]
216 else:
217 self.warning(
218 "Not removing prefix destination: expected avatar not found")
219
221 found = None
222
223 for prefix in self._prefixes.keys():
224 self.log("Checking: %r, %r" % (prefix, path))
225 if (path.startswith(prefix) and
226 (not found or len(found) < len(prefix))):
227 found = prefix
228 if found:
229 return self._prefixes[found]
230 else:
231 return None
232
234 """
235 Find a destination Avatar for this path.
236 @returns: The Avatar for this mapping, or None.
237 """
238
239 if path in self._mappings:
240 return self._mappings[path]
241 else:
242 return self.findPrefixMatch(path)
243
245 """
246 Generate a socket pathname in an appropriate location
247 """
248
249
250 import tempfile
251 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
252 os.close(fd)
253
254 return name
255
257 """
258 Generate a random US-ASCII string of length numchars
259 """
260 string = ""
261 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
262 for _ in range(numchars):
263 string += chars[random.randint(0, len(chars) - 1)]
264
265 return string
266
268 props = self.config['properties']
269
270 self.fixRenamedProperties(props,
271 [('socket_path', 'socket-path')])
272
273
274
275
276
277 if 'socket-path' in props:
278
279 self._socketPath = props['socket-path']
280 self._username = props['username']
281 self._password = props['password']
282 else:
283
284
285 self._username = self.generateRandomString(12)
286 self._password = self.generateRandomString(12)
287 self._socketPath = self.generateSocketPath()
288
289 self._requirePassword = props.get('require-password', True)
290 self._socketMode = props.get('socket-mode', 0666)
291 self._port = int(props['port'])
292 self._iptablesPort = int(props.get('iptables-port', self._port))
293 self._porterProtocol = props.get('protocol',
294 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
295 self._interface = props.get('interface', '')
296
297
298 self._external_interface = props.get('external-interface',
299 self._interface)
300
302 d = None
303 if self._socketlistener:
304
305
306 d = self._socketlistener.stopListening()
307 self._socketlistener = None
308 return d
309
311
312 self.have_properties()
313 realm = PorterRealm(self)
314 checker = checkers.FlexibleCredentialsChecker()
315 checker.addUser(self._username, self._password)
316 if not self._requirePassword:
317 checker.allowPasswordless(True)
318
319 p = portal.Portal(realm, [checker])
320 serverfactory = pb.PBServerFactory(p)
321
322 try:
323
324
325
326 try:
327 os.unlink(self._socketPath)
328 except OSError:
329 pass
330
331 self._socketlistener = reactor.listenWith(
332 fdserver.FDPort, self._socketPath,
333 serverfactory, mode=self._socketMode)
334 self.info("Now listening on socketPath %s", self._socketPath)
335 except error.CannotListenError, e:
336 self.warning("Failed to create socket %s" % self._socketPath)
337 m = messages.Error(T_(N_(
338 "Network error: socket path %s is not available."),
339 self._socketPath))
340 self.addMessage(m)
341 self.setMood(moods.sad)
342 return defer.fail(errors.ComponentSetupHandledError())
343
344
345
346 try:
347 proto = reflect.namedAny(self._porterProtocol)
348 self.debug("Created proto %r" % proto)
349 except (ImportError, AttributeError):
350 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
351 self._porterProtocol)
352 proto = HTTPPorterProtocol
353
354
355
356 factory = PorterProtocolFactory(self, proto)
357 try:
358 reactor.listenWith(
359 fdserver.PassableServerPort, self._port, factory,
360 interface=self._interface)
361 self.info("Now listening on interface %r on port %d",
362 self._interface, self._port)
363 except error.CannotListenError, e:
364 self.warning("Failed to listen on interface %r on port %d",
365 self._interface, self._port)
366 m = messages.Error(T_(N_(
367 "Network error: TCP port %d is not available."), self._port))
368 self.addMessage(m)
369 self.setMood(moods.sad)
370 return defer.fail(errors.ComponentSetupHandledError())
371
372
374
376 self._porter = porter
377 self.protocol = protocol
378
380 p = self.protocol(self._porter)
381 p.factory = self
382 return p
383
384
386 """
387 The base porter is capable of accepting HTTP-like protocols (including
388 RTSP) - it reads the first line of a request, and makes the decision
389 solely on that.
390
391 We can't guarantee that we read precisely a line, so the buffer we
392 accumulate will actually be larger than what we actually parse.
393
394 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
395 @cvar delimiters: a list of valid line delimiters I check for
396 """
397
398 logCategory = 'porterprotocol'
399
400
401 MAX_SIZE = 4096
402
403
404
405 PORTER_CLIENT_TIMEOUT = 30
406
407
408
409
410
411 delimiters = ['\r\n', '\n', '\r']
412
420
422
423 self.requestId = self.generateRequestId()
424
425 self.debug("[fd %5d] (ts %f) (request-id %r) accepted connection",
426 self.transport.fileno(), time.time(), self.requestId)
427
428 protocol.Protocol.connectionMade(self)
429
431 self._timeoutDC = None
432 self.debug("Timing out porter client after %d seconds",
433 self.PORTER_CLIENT_TIMEOUT)
434 self.transport.loseConnection()
435
437 if self._timeoutDC:
438 self._timeoutDC.cancel()
439 self._timeoutDC = None
440
442 self._buffer = self._buffer + data
443 self.log("Got data, buffer now \"%s\"" % self._buffer)
444
445
446 for delim in self.delimiters:
447 try:
448 line, remaining = self._buffer.split(delim, 1)
449 break
450 except ValueError:
451
452 pass
453 else:
454
455 self.log("No valid delimiter found")
456 if len(self._buffer) > self.MAX_SIZE:
457
458
459 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, "
460 "buffer exceeded",
461 self.transport.fileno(), time.time(),
462 self.requestId)
463
464 return self.transport.loseConnection()
465 else:
466
467
468 return
469
470
471
472 parsed = self.parseLine(line)
473 if not parsed:
474 self.log("Couldn't parse the first line")
475 return self.transport.loseConnection()
476
477 identifier = self.extractIdentifier(parsed)
478 if not identifier:
479 self.log("Couldn't find identifier in first line")
480 return self.transport.loseConnection()
481
482 if self.requestId:
483 self.log("Injecting request-id %r", self.requestId)
484 parsed = self.injectRequestId(parsed, self.requestId)
485
486
487
488
489 self._buffer = delim.join((self.unparseLine(parsed), remaining))
490
491
492 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s",
493 self.transport.fileno(), time.time(), self.requestId,
494 identifier)
495
496
497
498 destinationAvatar = self._porter.findDestination(identifier)
499
500 if not destinationAvatar or not destinationAvatar.isAttached():
501 if destinationAvatar:
502 self.debug("There was an avatar, but it logged out?")
503
504
505 self.debug(
506 "[fd %5d] (ts %f) (request-id %r) no destination avatar found",
507 self.transport.fileno(), time.time(), self.requestId)
508
509 self.writeNotFoundResponse()
510 return self.transport.loseConnection()
511
512
513
514
515
516
517
518
519 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s",
520 self.transport.fileno(), time.time(), self.requestId,
521 destinationAvatar.avatarId)
522
523
524
525 try:
526 destinationAvatar.mind.broker.transport.sendFileDescriptor(
527 self.transport.fileno(), self._buffer)
528 except OSError, e:
529 self.warning("[fd %5d] failed to send FD: %s",
530 self.transport.fileno(), log.getExceptionMessage(e))
531 self.writeServiceUnavailableResponse()
532 return self.transport.loseConnection()
533
534
535 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s",
536 self.transport.fileno(), time.time(), self.requestId,
537 destinationAvatar.avatarId)
538
539
540
541
542
543 self.transport.keepSocketAlive = True
544 self.transport.loseConnection()
545
547 """
548 Parse the initial line of the request. Return an object that can be
549 used to uniquely identify the stream being requested by passing it to
550 extractIdentifier, or None if the request is unreadable.
551
552 Subclasses should override this.
553 """
554 raise NotImplementedError
555
557 """
558 Recreate the initial request line from the parsed representation. The
559 recreated line does not need to be exactly identical, but both
560 parsedLine(unparseLine(line)) and line should contain the same
561 information (i.e. unparseLine should not lose information).
562
563 UnparseLine has to return a valid line from the porter protocol's
564 scheme point of view (for instance, HTTP).
565
566 Subclasses should override this.
567 """
568 raise NotImplementedError
569
571 """
572 Extract a string that uniquely identifies the requested stream from the
573 parsed representation of the first request line.
574
575 Subclasses should override this, depending on how they implemented
576 parseLine.
577 """
578 raise NotImplementedError
579
581 """
582 Return a string that will uniquely identify the request.
583
584 Subclasses should override this if they want to use request-ids and
585 also implement injectRequestId.
586 """
587 raise NotImplementedError
588
590 """
591 Take the parsed representation of the first request line and a string
592 token, return a parsed representation of the request line with the
593 request-id possibly mixed into it.
594
595 Subclasses should override this if they generate request-ids.
596 """
597
598 return parsed
599
601 """
602 Write a response indicating that the requested resource was not found
603 in this protocol.
604
605 Subclasses should override this to use the correct protocol.
606 """
607 raise NotImplementedError
608
610 """
611 Write a response indicating that the requested resource was
612 temporarily uavailable in this protocol.
613
614 Subclasses should override this to use the correct protocol.
615 """
616 raise NotImplementedError
617
618
620 scheme = 'http'
621 protos = ["HTTP/1.0", "HTTP/1.1"]
622 requestIdParameter = 'FLUREQID'
623 requestIdBitsNo = 256
624
626 try:
627 (method, location, proto) = map(string.strip, line.split(' ', 2))
628
629 if proto not in self.protos:
630 return None
631
632
633 parsed_url = urlparse.urlparse(location)
634
635 return method, parsed_url, proto
636
637 except ValueError:
638 return None
639
641 method, parsed_url, proto = parsed
642 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
643
649
651 method, parsed_url, proto = parsed
652
653 sep = ''
654 if parsed_url[4] != '':
655 sep = '&'
656 query_string = ''.join((parsed_url[4],
657 sep, self.requestIdParameter, '=',
658 requestId))
659 parsed_url = (parsed_url[:4] +
660 (query_string, )
661 + parsed_url[5:])
662 return method, parsed_url, proto
663
665 method, parsed_url, proto = parsed
666
667 return parsed_url[2]
668
670 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
671
673 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n"
674 "Service temporarily unavailable")
675
676
678 scheme = 'rtsp'
679 protos = ["RTSP/1.0"]
680
682 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
683
685 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n"
686 "Service temporarily unavailable")
687