1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """DNS TSIG support."""
17
18 import hashlib
19 import hmac
20 import struct
21 import sys
22
23 import dns.exception
24 import dns.hash
25 import dns.rdataclass
26 import dns.name
27
28 -class BadTime(dns.exception.DNSException):
29 """Raised if the current time is not within the TSIG's validity time."""
30 pass
31
33 """Raised if the TSIG signature fails to verify."""
34 pass
35
37 """Base class for all TSIG errors generated by the remote peer"""
38 pass
39
41 """Raised if the peer didn't know the key we used"""
42 pass
43
45 """Raised if the peer didn't like the signature we sent"""
46 pass
47
49 """Raised if the peer didn't like the time we sent"""
50 pass
51
53 """Raised if the peer didn't like amount of truncation in the TSIG we sent"""
54 pass
55
56
57
58 HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT")
59 HMAC_SHA1 = dns.name.from_text("hmac-sha1")
60 HMAC_SHA224 = dns.name.from_text("hmac-sha224")
61 HMAC_SHA256 = dns.name.from_text("hmac-sha256")
62 HMAC_SHA384 = dns.name.from_text("hmac-sha384")
63 HMAC_SHA512 = dns.name.from_text("hmac-sha512")
64
65 default_algorithm = HMAC_MD5
66
67 BADSIG = 16
68 BADKEY = 17
69 BADTIME = 18
70 BADTRUNC = 22
71
72 -def sign(wire, keyname, secret, time, fudge, original_id, error,
73 other_data, request_mac, ctx=None, multi=False, first=True,
74 algorithm=default_algorithm):
75 """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
76 for the input parameters, the HMAC MAC calculated by applying the
77 TSIG signature algorithm, and the TSIG digest context.
78 @rtype: (bytes, bytes, hmac.HMAC object)
79 @raises ValueError: I{other_data} is too long
80 @raises NotImplementedError: I{algorithm} is not supported
81 """
82
83 (algorithm_name, digestmod) = get_algorithm(algorithm)
84 if first:
85 ctx = hmac.new(secret, digestmod=digestmod)
86 ml = len(request_mac)
87 if ml > 0:
88 ctx.update(struct.pack('!H', ml))
89 ctx.update(request_mac)
90 id = struct.pack('!H', original_id)
91 ctx.update(id)
92 ctx.update(wire[2:])
93 if first:
94 ctx.update(keyname.to_digestable())
95 ctx.update(struct.pack('!H', dns.rdataclass.ANY))
96 ctx.update(struct.pack('!I', 0))
97 upper_time = (time >> 32) & 0xffff
98 lower_time = time & 0xffffffff
99 time_mac = struct.pack('!HIH', upper_time, lower_time, fudge)
100 pre_mac = algorithm_name + time_mac
101 ol = len(other_data)
102 if ol > 65535:
103 raise ValueError('TSIG Other Data is > 65535 bytes')
104 post_mac = struct.pack('!HH', error, ol) + other_data
105 if first:
106 ctx.update(pre_mac)
107 ctx.update(post_mac)
108 else:
109 ctx.update(time_mac)
110 mac = ctx.digest()
111 mpack = struct.pack('!H', len(mac))
112 tsig_rdata = pre_mac + mpack + mac + id + post_mac
113 if multi:
114 ctx = hmac.new(secret)
115 ml = len(mac)
116 ctx.update(struct.pack('!H', ml))
117 ctx.update(mac)
118 else:
119 ctx = None
120 return (tsig_rdata, mac, ctx)
121
122 -def hmac_md5(wire, keyname, secret, time, fudge, original_id, error,
123 other_data, request_mac, ctx=None, multi=False, first=True,
124 algorithm=default_algorithm):
125 return sign(wire, keyname, secret, time, fudge, original_id, error,
126 other_data, request_mac, ctx, multi, first, algorithm)
127
128 -def validate(wire, keyname, secret, now, request_mac, tsig_start, tsig_rdata,
129 tsig_rdlen, ctx=None, multi=False, first=True):
130 """Validate the specified TSIG rdata against the other input parameters.
131
132 @raises FormError: The TSIG is badly formed.
133 @raises BadTime: There is too much time skew between the client and the
134 server.
135 @raises BadSignature: The TSIG signature did not validate
136 @rtype: hmac.HMAC object"""
137
138 (adcount,) = struct.unpack("!H", wire[10:12])
139 if adcount == 0:
140 raise dns.exception.FormError
141 adcount -= 1
142 new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start]
143 current = tsig_rdata
144 (aname, used) = dns.name.from_wire(wire, current)
145 current = current + used
146 (upper_time, lower_time, fudge, mac_size) = \
147 struct.unpack("!HIHH", wire[current:current + 10])
148 time = ((upper_time + 0) << 32) + lower_time
149 current += 10
150 mac = wire[current:current + mac_size]
151 current += mac_size
152 (original_id, error, other_size) = \
153 struct.unpack("!HHH", wire[current:current + 6])
154 current += 6
155 other_data = wire[current:current + other_size]
156 current += other_size
157 if current != tsig_rdata + tsig_rdlen:
158 raise dns.exception.FormError
159 if error != 0:
160 if error == BADSIG:
161 raise PeerBadSignature
162 elif error == BADKEY:
163 raise PeerBadKey
164 elif error == BADTIME:
165 raise PeerBadTime
166 elif error == BADTRUNC:
167 raise PeerBadTruncation
168 else:
169 raise PeerError('unknown TSIG error code %d' % error)
170 time_low = time - fudge
171 time_high = time + fudge
172 if now < time_low or now > time_high:
173 raise BadTime
174 (junk, our_mac, ctx) = sign(new_wire, keyname, secret, time, fudge,
175 original_id, error, other_data,
176 request_mac, ctx, multi, first, aname)
177 if (our_mac != mac):
178 raise BadSignature
179 return ctx
180
181 _hashes = None
182
184 try:
185 _hashes[tsig_alg] = dns.hash.get(hash_alg)
186 except KeyError:
187 pass
188
198
200 """Returns the wire format string and the hash module to use for the
201 specified TSIG algorithm
202
203 @rtype: (string, hash constructor)
204 @raises NotImplementedError: I{algorithm} is not supported
205 """
206
207 global _hashes
208 if _hashes is None:
209 _setup_hashes()
210
211 if isinstance(algorithm, str):
212 algorithm = dns.name.from_text(algorithm)
213
214 try:
215 return (algorithm.to_digestable(), _hashes[algorithm])
216 except KeyError:
217 raise NotImplementedError("TSIG algorithm " + str(algorithm) +
218 " is not supported")
219