Package dns :: Package rdtypes :: Package IN :: Module APL
[hide private]
[frames] | no frames]

Source Code for Module dns.rdtypes.IN.APL

  1  # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. 
  2  # 
  3  # Permission to use, copy, modify, and distribute this software and its 
  4  # documentation for any purpose with or without fee is hereby granted, 
  5  # provided that the above copyright notice and this permission notice 
  6  # appear in all copies. 
  7  # 
  8  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
  9  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 10  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 11  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 12  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 13  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 14  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 15   
 16  import io 
 17  import struct 
 18   
 19  import dns.exception 
 20  import dns.inet 
 21  import dns.rdata 
 22  import dns.tokenizer 
 23  import dns.util 
 24   
25 -class APLItem(object):
26 """An APL list item. 27 28 @ivar family: the address family (IANA address family registry) 29 @type family: int 30 @ivar negation: is this item negated? 31 @type negation: bool 32 @ivar address: the address 33 @type address: string 34 @ivar prefix: the prefix length 35 @type prefix: int 36 """ 37 38 __slots__ = ['family', 'negation', 'address', 'prefix'] 39
40 - def __init__(self, family, negation, address, prefix):
41 self.family = family 42 self.negation = negation 43 self.address = address 44 self.prefix = prefix
45
46 - def __str__(self):
47 if self.negation: 48 return "!%d:%s/%s" % (self.family, self.address, self.prefix) 49 else: 50 return "%d:%s/%s" % (self.family, self.address, self.prefix)
51
52 - def to_wire(self, file):
53 if self.family == 1: 54 address = dns.inet.inet_pton(dns.inet.AF_INET, self.address) 55 elif self.family == 2: 56 address = dns.inet.inet_pton(dns.inet.AF_INET6, self.address) 57 else: 58 address = self.address.decode('hex_codec') 59 # 60 # Truncate least significant zero bytes. 61 # 62 last = 0 63 for i in range(len(address) - 1, -1, -1): 64 if address[i] != chr(0): 65 last = i + 1 66 break 67 address = address[0 : last] 68 l = len(address) 69 assert l < 128 70 if self.negation: 71 l |= 0x80 72 header = struct.pack('!HBB', self.family, self.prefix, l) 73 file.write(header) 74 file.write(address)
75
76 -class APL(dns.rdata.Rdata):
77 """APL record. 78 79 @ivar items: a list of APL items 80 @type items: list of APL_Item 81 @see: RFC 3123""" 82 83 __slots__ = ['items'] 84
85 - def __init__(self, rdclass, rdtype, items):
86 super(APL, self).__init__(rdclass, rdtype) 87 self.items = items
88
89 - def to_text(self, origin=None, relativize=True, **kw):
90 return ' '.join(map(lambda x: str(x), self.items))
91
92 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
93 items = [] 94 while 1: 95 token = tok.get().unescape() 96 if token.is_eol_or_eof(): 97 break 98 item = token.value 99 if item[0] == '!': 100 negation = True 101 item = item[1:] 102 else: 103 negation = False 104 (family, rest) = item.split(':', 1) 105 family = int(family) 106 (address, prefix) = rest.split('/', 1) 107 prefix = int(prefix) 108 item = APLItem(family, negation, address, prefix) 109 items.append(item) 110 111 return cls(rdclass, rdtype, items)
112 113 from_text = classmethod(from_text) 114
115 - def to_wire(self, file, compress = None, origin = None):
116 for item in self.items: 117 item.to_wire(file)
118
119 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
120 items = [] 121 while 1: 122 if rdlen < 4: 123 raise dns.exception.FormError 124 header = struct.unpack('!HBB', wire[current : current + 4]) 125 afdlen = header[2] 126 if afdlen > 127: 127 negation = True 128 afdlen -= 128 129 else: 130 negation = False 131 current += 4 132 rdlen -= 4 133 if rdlen < afdlen: 134 raise dns.exception.FormError 135 address = wire[current : current + afdlen].unwrap() 136 l = len(address) 137 if header[0] == 1: 138 if l < 4: 139 address += b'\x00' * (4 - l) 140 address = dns.inet.inet_ntop(dns.inet.AF_INET, address) 141 elif header[0] == 2: 142 if l < 16: 143 address += b'\x00' * (16 - l) 144 address = dns.inet.inet_ntop(dns.inet.AF_INET6, address) 145 else: 146 # 147 # This isn't really right according to the RFC, but it 148 # seems better than throwing an exception 149 # 150 address = dns.rdata._hexify(address) 151 current += afdlen 152 rdlen -= afdlen 153 item = APLItem(header[0], negation, address, header[1]) 154 items.append(item) 155 if rdlen == 0: 156 break 157 return cls(rdclass, rdtype, items)
158 159 from_wire = classmethod(from_wire) 160
161 - def _cmp(self, other):
162 f = io.BytesIO() 163 self.to_wire(f) 164 wire1 = f.getvalue() 165 f.seek(0) 166 f.truncate() 167 other.to_wire(f) 168 wire2 = f.getvalue() 169 f.close() 170 171 return dns.util.cmp(wire1, wire2)
172