Package dns :: Package rdtypes :: Package IN :: Module APL
[hide private]
[frames] | no frames]

Source Code for Module dns.rdtypes.IN.APL

  1  # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. 
  2  # 
  3  # Permission to use, copy, modify, and distribute this software and its 
  4  # documentation for any purpose with or without fee is hereby granted, 
  5  # provided that the above copyright notice and this permission notice 
  6  # appear in all copies. 
  7  # 
  8  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
  9  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 10  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 11  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 12  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 13  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 14  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 15   
 16  import cStringIO 
 17  import struct 
 18   
 19  import dns.exception 
 20  import dns.inet 
 21  import dns.rdata 
 22  import dns.tokenizer 
 23   
24 -class APLItem(object):
25 """An APL list item. 26 27 @ivar family: the address family (IANA address family registry) 28 @type family: int 29 @ivar negation: is this item negated? 30 @type negation: bool 31 @ivar address: the address 32 @type address: string 33 @ivar prefix: the prefix length 34 @type prefix: int 35 """ 36 37 __slots__ = ['family', 'negation', 'address', 'prefix'] 38
39 - def __init__(self, family, negation, address, prefix):
40 self.family = family 41 self.negation = negation 42 self.address = address 43 self.prefix = prefix
44
45 - def __str__(self):
46 if self.negation: 47 return "!%d:%s/%s" % (self.family, self.address, self.prefix) 48 else: 49 return "%d:%s/%s" % (self.family, self.address, self.prefix)
50
51 - def to_wire(self, file):
52 if self.family == 1: 53 address = dns.inet.inet_pton(dns.inet.AF_INET, self.address) 54 elif self.family == 2: 55 address = dns.inet.inet_pton(dns.inet.AF_INET6, self.address) 56 else: 57 address = self.address.decode('hex_codec') 58 # 59 # Truncate least significant zero bytes. 60 # 61 last = 0 62 for i in xrange(len(address) - 1, -1, -1): 63 if address[i] != chr(0): 64 last = i + 1 65 break 66 address = address[0 : last] 67 l = len(address) 68 assert l < 128 69 if self.negation: 70 l |= 0x80 71 header = struct.pack('!HBB', self.family, self.prefix, l) 72 file.write(header) 73 file.write(address)
74
75 -class APL(dns.rdata.Rdata):
76 """APL record. 77 78 @ivar items: a list of APL items 79 @type items: list of APL_Item 80 @see: RFC 3123""" 81 82 __slots__ = ['items'] 83
84 - def __init__(self, rdclass, rdtype, items):
85 super(APL, self).__init__(rdclass, rdtype) 86 self.items = items
87
88 - def to_text(self, origin=None, relativize=True, **kw):
89 return ' '.join(map(lambda x: str(x), self.items))
90
91 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
92 items = [] 93 while 1: 94 token = tok.get().unescape() 95 if token.is_eol_or_eof(): 96 break 97 item = token.value 98 if item[0] == '!': 99 negation = True 100 item = item[1:] 101 else: 102 negation = False 103 (family, rest) = item.split(':', 1) 104 family = int(family) 105 (address, prefix) = rest.split('/', 1) 106 prefix = int(prefix) 107 item = APLItem(family, negation, address, prefix) 108 items.append(item) 109 110 return cls(rdclass, rdtype, items)
111 112 from_text = classmethod(from_text) 113
114 - def to_wire(self, file, compress = None, origin = None):
115 for item in self.items: 116 item.to_wire(file)
117
118 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
119 items = [] 120 while 1: 121 if rdlen < 4: 122 raise dns.exception.FormError 123 header = struct.unpack('!HBB', wire[current : current + 4]) 124 afdlen = header[2] 125 if afdlen > 127: 126 negation = True 127 afdlen -= 128 128 else: 129 negation = False 130 current += 4 131 rdlen -= 4 132 if rdlen < afdlen: 133 raise dns.exception.FormError 134 address = wire[current : current + afdlen].unwrap() 135 l = len(address) 136 if header[0] == 1: 137 if l < 4: 138 address += '\x00' * (4 - l) 139 address = dns.inet.inet_ntop(dns.inet.AF_INET, address) 140 elif header[0] == 2: 141 if l < 16: 142 address += '\x00' * (16 - l) 143 address = dns.inet.inet_ntop(dns.inet.AF_INET6, address) 144 else: 145 # 146 # This isn't really right according to the RFC, but it 147 # seems better than throwing an exception 148 # 149 address = address.encode('hex_codec') 150 current += afdlen 151 rdlen -= afdlen 152 item = APLItem(header[0], negation, address, header[1]) 153 items.append(item) 154 if rdlen == 0: 155 break 156 return cls(rdclass, rdtype, items)
157 158 from_wire = classmethod(from_wire) 159
160 - def _cmp(self, other):
161 f = cStringIO.StringIO() 162 self.to_wire(f) 163 wire1 = f.getvalue() 164 f.seek(0) 165 f.truncate() 166 other.to_wire(f) 167 wire2 = f.getvalue() 168 f.close() 169 170 return cmp(wire1, wire2)
171