1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import base64
17 import io
18 import struct
19
20 import dns.exception
21 import dns.inet
22 import dns.name
23 import dns.util
24
26 """IPSECKEY record
27
28 @ivar precedence: the precedence for this key data
29 @type precedence: int
30 @ivar gateway_type: the gateway type
31 @type gateway_type: int
32 @ivar algorithm: the algorithm to use
33 @type algorithm: int
34 @ivar gateway: the public key
35 @type gateway: None, IPv4 address, IPV6 address, or domain name
36 @ivar key: the public key
37 @type key: bytes
38 @see: RFC 4025"""
39
40 __slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
41
42 - def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
43 gateway, key):
64
65 - def to_text(self, origin=None, relativize=True, **kw):
66 if self.gateway_type == 0:
67 gateway = '.'
68 elif self.gateway_type == 1:
69 gateway = self.gateway
70 elif self.gateway_type == 2:
71 gateway = self.gateway
72 elif self.gateway_type == 3:
73 gateway = str(self.gateway.choose_relativity(origin, relativize))
74 else:
75 raise ValueError('invalid gateway type')
76 return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
77 self.algorithm, gateway,
78 dns.rdata._base64ify(self.key))
79
80 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
81 precedence = tok.get_uint8()
82 gateway_type = tok.get_uint8()
83 algorithm = tok.get_uint8()
84 if gateway_type == 3:
85 gateway = tok.get_name().choose_relativity(origin, relativize)
86 else:
87 gateway = tok.get_string()
88 chunks = []
89 while 1:
90 t = tok.get().unescape()
91 if t.is_eol_or_eof():
92 break
93 if not t.is_identifier():
94 raise dns.exception.SyntaxError
95 chunks.append(t.value)
96 b64 = ''.join(chunks)
97 key = base64.b64decode(b64.encode('ascii'))
98 return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
99 gateway, key)
100
101 from_text = classmethod(from_text)
102
103 - def to_wire(self, file, compress = None, origin = None):
118
119 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
120 if rdlen < 3:
121 raise dns.exception.FormError
122 header = struct.unpack('!BBB', wire[current : current + 3])
123 gateway_type = header[1]
124 current += 3
125 rdlen -= 3
126 if gateway_type == 0:
127 gateway = None
128 elif gateway_type == 1:
129 gateway = dns.inet.inet_ntop(dns.inet.AF_INET,
130 wire[current : current + 4])
131 current += 4
132 rdlen -= 4
133 elif gateway_type == 2:
134 gateway = dns.inet.inet_ntop(dns.inet.AF_INET6,
135 wire[current : current + 16])
136 current += 16
137 rdlen -= 16
138 elif gateway_type == 3:
139 (gateway, cused) = dns.name.from_wire(wire[: current + rdlen],
140 current)
141 current += cused
142 rdlen -= cused
143 else:
144 raise dns.exception.FormError('invalid IPSECKEY gateway type')
145 key = wire[current : current + rdlen].unwrap()
146 return cls(rdclass, rdtype, header[0], gateway_type, header[2],
147 gateway, key)
148
149 from_wire = classmethod(from_wire)
150
151 - def _cmp(self, other):
152 f = io.BytesIO()
153 self.to_wire(f)
154 wire1 = f.getvalue()
155 f.seek(0)
156 f.truncate()
157 other.to_wire(f)
158 wire2 = f.getvalue()
159 f.close()
160
161 return dns.util.cmp(wire1, wire2)
162