1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import cStringIO
17 import struct
18
19 import dns.exception
20 import dns.inet
21 import dns.name
22
24 """IPSECKEY record
25
26 @ivar precedence: the precedence for this key data
27 @type precedence: int
28 @ivar gateway_type: the gateway type
29 @type gateway_type: int
30 @ivar algorithm: the algorithm to use
31 @type algorithm: int
32 @ivar gateway: the public key
33 @type gateway: None, IPv4 address, IPV6 address, or domain name
34 @ivar key: the public key
35 @type key: string
36 @see: RFC 4025"""
37
38 __slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
39
40 - def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
41 gateway, key):
62
63 - def to_text(self, origin=None, relativize=True, **kw):
64 if self.gateway_type == 0:
65 gateway = '.'
66 elif self.gateway_type == 1:
67 gateway = self.gateway
68 elif self.gateway_type == 2:
69 gateway = self.gateway
70 elif self.gateway_type == 3:
71 gateway = str(self.gateway.choose_relativity(origin, relativize))
72 else:
73 raise ValueError('invalid gateway type')
74 return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
75 self.algorithm, gateway,
76 dns.rdata._base64ify(self.key))
77
78 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
79 precedence = tok.get_uint8()
80 gateway_type = tok.get_uint8()
81 algorithm = tok.get_uint8()
82 if gateway_type == 3:
83 gateway = tok.get_name().choose_relativity(origin, relativize)
84 else:
85 gateway = tok.get_string()
86 chunks = []
87 while 1:
88 t = tok.get().unescape()
89 if t.is_eol_or_eof():
90 break
91 if not t.is_identifier():
92 raise dns.exception.SyntaxError
93 chunks.append(t.value)
94 b64 = ''.join(chunks)
95 key = b64.decode('base64_codec')
96 return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
97 gateway, key)
98
99 from_text = classmethod(from_text)
100
101 - def to_wire(self, file, compress = None, origin = None):
116
117 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
118 if rdlen < 3:
119 raise dns.exception.FormError
120 header = struct.unpack('!BBB', wire[current : current + 3])
121 gateway_type = header[1]
122 current += 3
123 rdlen -= 3
124 if gateway_type == 0:
125 gateway = None
126 elif gateway_type == 1:
127 gateway = dns.inet.inet_ntop(dns.inet.AF_INET,
128 wire[current : current + 4])
129 current += 4
130 rdlen -= 4
131 elif gateway_type == 2:
132 gateway = dns.inet.inet_ntop(dns.inet.AF_INET6,
133 wire[current : current + 16])
134 current += 16
135 rdlen -= 16
136 elif gateway_type == 3:
137 (gateway, cused) = dns.name.from_wire(wire[: current + rdlen],
138 current)
139 current += cused
140 rdlen -= cused
141 else:
142 raise dns.exception.FormError('invalid IPSECKEY gateway type')
143 key = wire[current : current + rdlen].unwrap()
144 return cls(rdclass, rdtype, header[0], gateway_type, header[2],
145 gateway, key)
146
147 from_wire = classmethod(from_wire)
148
149 - def _cmp(self, other):
150 f = cStringIO.StringIO()
151 self.to_wire(f)
152 wire1 = f.getvalue()
153 f.seek(0)
154 f.truncate()
155 other.to_wire(f)
156 wire2 = f.getvalue()
157 f.close()
158
159 return cmp(wire1, wire2)
160