1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import base64
17 import io
18 import string
19 import struct
20
21 import dns.exception
22 import dns.rdata
23 import dns.rdatatype
24 import dns.util
25
26 -class HIP(dns.rdata.Rdata):
27 """HIP record
28
29 @ivar hit: the host identity tag
30 @type hit: bytes
31 @ivar algorithm: the public key cryptographic algorithm
32 @type algorithm: int
33 @ivar key: the public key
34 @type key: bytes
35 @ivar servers: the rendezvous servers
36 @type servers: list of dns.name.Name objects
37 @see: RFC 5205"""
38
39 __slots__ = ['hit', 'algorithm', 'key', 'servers']
40
41 - def __init__(self, rdclass, rdtype, hit, algorithm, key, servers):
47
48 - def to_text(self, origin=None, relativize=True, **kw):
49 hit = base64.b16encode(self.hit).decode('ascii').lower()
50 key = base64.b64encode(self.key).decode('ascii')
51 text = ''
52 servers = []
53 for server in self.servers:
54 servers.append(str(server.choose_relativity(origin, relativize)))
55 if len(servers) > 0:
56 text += (' ' + ' '.join(servers))
57 return '%u %s %s%s' % (self.algorithm, hit, key, text)
58
59 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
60 algorithm = tok.get_uint8()
61 hit = bytes.fromhex(tok.get_string())
62 if len(hit) > 255:
63 raise dns.exception.SyntaxError("HIT too long")
64 key = base64.b64decode(tok.get_string().encode('ascii'))
65 servers = []
66 while 1:
67 token = tok.get()
68 if token.is_eol_or_eof():
69 break
70 server = dns.name.from_text(token.value, origin)
71 server.choose_relativity(origin, relativize)
72 servers.append(server)
73 return cls(rdclass, rdtype, hit, algorithm, key, servers)
74
75 from_text = classmethod(from_text)
76
77 - def to_wire(self, file, compress = None, origin = None):
78 lh = len(self.hit)
79 lk = len(self.key)
80 file.write(struct.pack("!BBH", lh, self.algorithm, lk))
81 file.write(self.hit)
82 file.write(self.key)
83 for server in self.servers:
84 server.to_wire(file, None, origin)
85
86 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
87 (lh, algorithm, lk) = struct.unpack('!BBH',
88 wire[current : current + 4])
89 current += 4
90 rdlen -= 4
91 hit = wire[current : current + lh].unwrap()
92 current += lh
93 rdlen -= lh
94 key = wire[current : current + lk].unwrap()
95 current += lk
96 rdlen -= lk
97 servers = []
98 while rdlen > 0:
99 (server, cused) = dns.name.from_wire(wire[: current + rdlen],
100 current)
101 current += cused
102 rdlen -= cused
103 if not origin is None:
104 server = server.relativize(origin)
105 servers.append(server)
106 return cls(rdclass, rdtype, hit, algorithm, key, servers)
107
108 from_wire = classmethod(from_wire)
109
116
117 - def _cmp(self, other):
118 b1 = io.BytesIO()
119 lh = len(self.hit)
120 lk = len(self.key)
121 b1.write(struct.pack("!BBH", lh, self.algorithm, lk))
122 b1.write(self.hit)
123 b1.write(self.key)
124 b2 = io.BytesIO()
125 lh = len(other.hit)
126 lk = len(other.key)
127 b2.write(struct.pack("!BBH", lh, other.algorithm, lk))
128 b2.write(other.hit)
129 b2.write(other.key)
130 v = dns.util.cmp(b1.getvalue(), b2.getvalue())
131 if v != 0:
132 return v
133 ls = len(self.servers)
134 lo = len(other.servers)
135 count = min(ls, lo)
136 i = 0
137 while i < count:
138 v = dns.util.cmp(self.servers[i], other.servers[i])
139 if v != 0:
140 return v
141 i += 1
142 return ls - lo
143