Package dpkt :: Module dns
[hide private]
[frames] | no frames]

Source Code for Module dpkt.dns

  1  # $Id: dns.py 27 2006-11-21 01:22:52Z dahelder $ 
  2  # -*- coding: utf-8 -*- 
  3  """Domain Name System.""" 
  4  from __future__ import print_function 
  5  from __future__ import absolute_import 
  6   
  7  import struct 
  8  import codecs 
  9   
 10  from . import dpkt 
 11  from .decorators import deprecated 
 12  from .compat import compat_ord 
 13   
 14  DNS_Q = 0 
 15  DNS_R = 1 
 16   
 17  # Opcodes 
 18  DNS_QUERY = 0 
 19  DNS_IQUERY = 1 
 20  DNS_STATUS = 2 
 21  DNS_NOTIFY = 4 
 22  DNS_UPDATE = 5 
 23   
 24  # Flags 
 25  DNS_CD = 0x0010  # checking disabled 
 26  DNS_AD = 0x0020  # authenticated data 
 27  DNS_Z = 0x0040  # unused 
 28  DNS_RA = 0x0080  # recursion available 
 29  DNS_RD = 0x0100  # recursion desired 
 30  DNS_TC = 0x0200  # truncated 
 31  DNS_AA = 0x0400  # authoritative answer 
 32  DNS_QR = 0x8000  # response ( query / response ) 
 33   
 34  # Response codes 
 35  DNS_RCODE_NOERR = 0 
 36  DNS_RCODE_FORMERR = 1 
 37  DNS_RCODE_SERVFAIL = 2 
 38  DNS_RCODE_NXDOMAIN = 3 
 39  DNS_RCODE_NOTIMP = 4 
 40  DNS_RCODE_REFUSED = 5 
 41  DNS_RCODE_YXDOMAIN = 6 
 42  DNS_RCODE_YXRRSET = 7 
 43  DNS_RCODE_NXRRSET = 8 
 44  DNS_RCODE_NOTAUTH = 9 
 45  DNS_RCODE_NOTZONE = 10 
 46   
 47  # RR types 
 48  DNS_A = 1 
 49  DNS_NS = 2 
 50  DNS_CNAME = 5 
 51  DNS_SOA = 6 
 52  DNS_NULL = 10 
 53  DNS_PTR = 12 
 54  DNS_HINFO = 13 
 55  DNS_MX = 15 
 56  DNS_TXT = 16 
 57  DNS_AAAA = 28 
 58  DNS_SRV = 33 
 59  DNS_OPT = 41 
 60   
 61  # RR classes 
 62  DNS_IN = 1 
 63  DNS_CHAOS = 3 
 64  DNS_HESIOD = 4 
 65  DNS_ANY = 255 
66 67 68 -def pack_name(name, off, label_ptrs):
69 name = codecs.encode(name, 'utf-8') 70 if name: 71 labels = name.split(b'.') 72 else: 73 labels = [] 74 labels.append(b'') 75 buf = b'' 76 for i, label in enumerate(labels): 77 key = b'.'.join(labels[i:]).upper() 78 ptr = label_ptrs.get(key) 79 if not ptr: 80 if len(key) > 1: 81 ptr = off + len(buf) 82 if ptr < 0xc000: 83 label_ptrs[key] = ptr 84 i = len(label) 85 buf += struct.pack("B", i) + label 86 else: 87 buf += struct.pack('>H', (0xc000 | ptr)) 88 break 89 return buf
90
91 92 -def unpack_name(buf, off):
93 name = [] 94 saved_off = 0 95 start_off = off 96 name_length = 0 97 while True: 98 if off >= len(buf): 99 raise dpkt.NeedData() 100 n = compat_ord(buf[off]) 101 if n == 0: 102 off += 1 103 break 104 elif (n & 0xc0) == 0xc0: 105 ptr = struct.unpack('>H', buf[off:off + 2])[0] & 0x3fff 106 if ptr >= start_off: 107 raise dpkt.UnpackError('Invalid label compression pointer') 108 off += 2 109 if not saved_off: 110 saved_off = off 111 start_off = off = ptr 112 elif (n & 0xc0) == 0x00: 113 off += 1 114 name.append(buf[off:off + n]) 115 name_length += n + 1 116 if name_length > 255: 117 raise dpkt.UnpackError('name longer than 255 bytes') 118 off += n 119 else: 120 raise dpkt.UnpackError('Invalid label length %02x' % n) 121 if not saved_off: 122 saved_off = off 123 return codecs.decode(b'.'.join(name), 'utf-8'), saved_off
124
125 126 -class DNS(dpkt.Packet):
127 """Domain Name System. 128 129 TODO: Longer class information.... 130 131 Attributes: 132 __hdr__: Header fields of DNS. 133 TODO. 134 """ 135 136 __hdr__ = ( 137 ('id', 'H', 0), 138 ('op', 'H', DNS_RD), # recursive query 139 # XXX - lists of query, RR objects 140 ('qd', 'H', []), 141 ('an', 'H', []), 142 ('ns', 'H', []), 143 ('ar', 'H', []) 144 ) 145 146 @property
147 - def qr(self):
148 return int((self.op & DNS_QR) == DNS_QR)
149 150 @qr.setter
151 - def qr(self, v):
152 if v: 153 self.op |= DNS_QR 154 else: 155 self.op &= ~DNS_QR
156 157 @property
158 - def opcode(self):
159 return (self.op >> 11) & 0xf
160 161 @opcode.setter
162 - def opcode(self, v):
163 self.op = (self.op & ~0x7800) | ((v & 0xf) << 11)
164 165 @property
166 - def aa(self):
167 return int((self.op & DNS_AA) == DNS_AA)
168 169 @aa.setter
170 - def aa(self, v):
171 if v: 172 self.op |= DNS_AA 173 else: 174 self.op &= ~DNS_AA
175 176 @property
177 - def tc(self):
178 return int((self.op & DNS_TC) == DNS_TC)
179 180 @tc.setter
181 - def tc(self, v):
182 if v: 183 self.op |= DNS_TC 184 else: 185 self.op &= ~DNS_TC
186 187 @property
188 - def rd(self):
189 return int((self.op & DNS_RD) == DNS_RD)
190 191 @rd.setter
192 - def rd(self, v):
193 if v: 194 self.op |= DNS_RD 195 else: 196 self.op &= ~DNS_RD
197 198 @property
199 - def ra(self):
200 return int((self.op & DNS_RA) == DNS_RA)
201 202 @ra.setter
203 - def ra(self, v):
204 if v: 205 self.op |= DNS_RA 206 else: 207 self.op &= ~DNS_RA
208 209 @property
210 - def zero(self):
211 return int((self.op & DNS_Z) == DNS_Z)
212 213 @zero.setter
214 - def zero(self, v):
215 if v: 216 self.op |= DNS_Z 217 else: 218 self.op &= ~DNS_Z
219 220 @property
221 - def rcode(self):
222 return self.op & 0xf
223 224 @rcode.setter
225 - def rcode(self, v):
226 self.op = (self.op & ~0xf) | (v & 0xf)
227
228 - class Q(dpkt.Packet):
229 """DNS question.""" 230 __hdr__ = ( 231 ('name', '1025s', b''), 232 ('type', 'H', DNS_A), 233 ('cls', 'H', DNS_IN) 234 ) 235 236 # XXX - suk
237 - def __len__(self):
238 raise NotImplementedError
239 240 __str__ = __len__ 241
242 - def unpack(self, buf):
243 raise NotImplementedError
244
245 - class RR(Q):
246 """DNS resource record.""" 247 __hdr__ = ( 248 ('name', '1025s', b''), 249 ('type', 'H', DNS_A), 250 ('cls', 'H', DNS_IN), 251 ('ttl', 'I', 0), 252 ('rlen', 'H', 4), 253 ('rdata', 's', b'') 254 ) 255
256 - def pack_rdata(self, off, label_ptrs):
257 # XXX - yeah, this sux 258 if self.rdata: 259 return self.rdata 260 if self.type == DNS_A: 261 return self.ip 262 elif self.type == DNS_NS: 263 return pack_name(self.nsname, off, label_ptrs) 264 elif self.type == DNS_CNAME: 265 return pack_name(self.cname, off, label_ptrs) 266 elif self.type == DNS_PTR: 267 return pack_name(self.ptrname, off, label_ptrs) 268 elif self.type == DNS_SOA: 269 l = [] 270 l.append(pack_name(self.mname, off, label_ptrs)) 271 l.append(pack_name(self.rname, off + len(l[0]), label_ptrs)) 272 l.append(struct.pack('>IIIII', self.serial, self.refresh, 273 self.retry, self.expire, self.minimum)) 274 return b''.join(l) 275 elif self.type == DNS_MX: 276 return struct.pack('>H', self.preference) + \ 277 pack_name(self.mxname, off + 2, label_ptrs) 278 elif self.type == DNS_TXT or self.type == DNS_HINFO: 279 return b''.join(['%s%s' % (chr(len(x)), x) 280 for x in self.text]) 281 elif self.type == DNS_AAAA: 282 return self.ip6 283 elif self.type == DNS_SRV: 284 return struct.pack('>HHH', self.priority, self.weight, self.port) + \ 285 pack_name(self.srvname, off + 6, label_ptrs) 286 elif self.type == DNS_OPT: 287 return b'' # self.rdata 288 else: 289 raise dpkt.PackError('RR type %s is not supported' % self.type)
290
291 - def unpack_rdata(self, buf, off):
292 if self.type == DNS_A: 293 self.ip = self.rdata 294 elif self.type == DNS_NS: 295 self.nsname, off = unpack_name(buf, off) 296 elif self.type == DNS_CNAME: 297 self.cname, off = unpack_name(buf, off) 298 elif self.type == DNS_PTR: 299 self.ptrname, off = unpack_name(buf, off) 300 elif self.type == DNS_SOA: 301 self.mname, off = unpack_name(buf, off) 302 self.rname, off = unpack_name(buf, off) 303 self.serial, self.refresh, self.retry, self.expire, \ 304 self.minimum = struct.unpack('>IIIII', buf[off:off + 20]) 305 elif self.type == DNS_MX: 306 self.preference = struct.unpack('>H', self.rdata[:2]) 307 self.mxname, off = unpack_name(buf, off + 2) 308 elif self.type == DNS_TXT or self.type == DNS_HINFO: 309 self.text = [] 310 buf = self.rdata 311 while buf: 312 n = compat_ord(buf[0]) 313 self.text.append(codecs.decode(buf[1:1 + n], 'utf-8')) 314 buf = buf[1 + n:] 315 elif self.type == DNS_AAAA: 316 self.ip6 = self.rdata 317 elif self.type == DNS_NULL: 318 self.null = codecs.encode(self.rdata, 'hex') 319 elif self.type == DNS_SRV: 320 self.priority, self.weight, self.port = struct.unpack('>HHH', self.rdata[:6]) 321 self.srvname, off = unpack_name(buf, off + 6) 322 elif self.type == DNS_OPT: 323 pass # RFC-6891: OPT is a pseudo-RR not carrying any DNS data 324 else: 325 raise dpkt.UnpackError('RR type %s is not supported' % self.type)
326
327 - def pack_q(self, buf, q):
328 """Append packed DNS question and return buf.""" 329 return buf + pack_name(q.name, len(buf), self.label_ptrs) + struct.pack('>HH', q.type, q.cls)
330
331 - def unpack_q(self, buf, off):
332 """Return DNS question and new offset.""" 333 q = self.Q() 334 q.name, off = unpack_name(buf, off) 335 q.type, q.cls = struct.unpack('>HH', buf[off:off + 4]) 336 off += 4 337 return q, off
338
339 - def pack_rr(self, buf, rr):
340 """Append packed DNS RR and return buf.""" 341 name = pack_name(rr.name, len(buf), self.label_ptrs) 342 rdata = rr.pack_rdata(len(buf) + len(name) + 10, self.label_ptrs) 343 return buf + name + struct.pack('>HHIH', rr.type, rr.cls, rr.ttl, len(rdata)) + rdata
344
345 - def unpack_rr(self, buf, off):
346 """Return DNS RR and new offset.""" 347 rr = self.RR() 348 rr.name, off = unpack_name(buf, off) 349 rr.type, rr.cls, rr.ttl, rdlen = struct.unpack('>HHIH', buf[off:off + 10]) 350 off += 10 351 rr.rdata = buf[off:off + rdlen] 352 rr.rlen = rdlen 353 rr.unpack_rdata(buf, off) 354 off += rdlen 355 return rr, off
356
357 - def unpack(self, buf):
358 dpkt.Packet.unpack(self, buf) 359 off = self.__hdr_len__ 360 cnt = self.qd # FIXME: This relies on this being properly set somewhere else 361 self.qd = [] 362 for _ in range(cnt): 363 q, off = self.unpack_q(buf, off) 364 self.qd.append(q) 365 for x in ('an', 'ns', 'ar'): 366 cnt = getattr(self, x, 0) 367 setattr(self, x, []) 368 for _ in range(cnt): 369 rr, off = self.unpack_rr(buf, off) 370 getattr(self, x).append(rr) 371 self.data = b''
372
373 - def __len__(self):
374 # XXX - cop out 375 return len(str(self))
376
377 - def __bytes__(self):
378 # XXX - compress names on the fly 379 self.label_ptrs = {} 380 buf = struct.pack(self.__hdr_fmt__, self.id, self.op, len(self.qd), 381 len(self.an), len(self.ns), len(self.ar)) 382 for q in self.qd: 383 buf = self.pack_q(buf, q) 384 for x in ('an', 'ns', 'ar'): 385 for rr in getattr(self, x): 386 buf = self.pack_rr(buf, rr) 387 del self.label_ptrs 388 return buf
389
390 391 -def test_basic():
392 from . import ip 393 394 s = b'E\x00\x02\x08\xc15\x00\x00\x80\x11\x92aBk0\x01Bk0w\x005\xc07\x01\xf4\xda\xc2d\xd2\x81\x80\x00\x01\x00\x03\x00\x0b\x00\x0b\x03www\x06google\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x03V\x00\x17\x03www\x06google\x06akadns\x03net\x00\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabh\xc0,\x00\x01\x00\x01\x00\x00\x01\xa3\x00\x04@\xe9\xabc\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0c\x04usw5\x04akam\xc0>\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw6\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04usw7\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x08\x05asia3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02za\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x0f\x02zc\x06akadns\x03org\x00\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zf\xc07\xc07\x00\x02\x00\x01\x00\x00KG\x00\x05\x02zh\xc0\xd5\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04eur3\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use2\xc0t\xc07\x00\x02\x00\x01\x00\x00KG\x00\x07\x04use4\xc0t\xc0\xc1\x00\x01\x00\x01\x00\x00\xfb4\x00\x04\xd0\xb9\x84\xb0\xc0\xd2\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xf1\xc76\xc0\xed\x00\x01\x00\x01\x00\x00\xfb4\x00\x04?\xd7\xc6S\xc0\xfe\x00\x01\x00\x01\x00\x001\x0c\x00\x04?\xd00.\xc1\x0f\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xc1-\x01g\xc1"\x00\x01\x00\x01\x00\x00\x101\x00\x04?\xd1\xaa\x88\xc15\x00\x01\x00\x01\x00\x00\r\x1a\x00\x04PCC\xb6\xc0o\x00\x01\x00\x01\x00\x00\x10\x7f\x00\x04?\xf1I\xd6\xc0\x87\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04\xce\x84dl\xc0\x9a\x00\x01\x00\x01\x00\x00\n\xdf\x00\x04A\xcb\xea\x1b\xc0\xad\x00\x01\x00\x01\x00\x00\x0b)\x00\x04\xc1l\x9a\t' 395 ip = ip.IP(s) 396 my_dns = DNS(ip.udp.data) 397 assert my_dns.qd[0].name == 'www.google.com' and my_dns.an[1].name == 'www.google.akadns.net' 398 s = b'\x05\xf5\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x03cnn\x03com\x00\x00\x01\x00\x01' 399 my_dns = DNS(s) 400 assert s == bytes(my_dns)
401
402 403 -def test_PTR():
404 s = b'g\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x00\x011\x011\x03211\x03141\x07in-addr\x04arpa\x00\x00\x0c\x00\x01\xc0\x0c\x00\x0c\x00\x01\x00\x00\r6\x00$\x07default\nv-umce-ifs\x05umnet\x05umich\x03edu\x00\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\r\x06shabby\x03ifs\xc0O\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0f\x0cfish-license\xc0m\xc0\x0e\x00\x02\x00\x01\x00\x00\r6\x00\x0b\x04dns2\x03itd\xc0O' 405 my_dns = DNS(s) 406 assert my_dns.qd[0].name == '1.1.211.141.in-addr.arpa' and \ 407 my_dns.an[0].ptrname == 'default.v-umce-ifs.umnet.umich.edu' and \ 408 my_dns.ns[0].nsname == 'shabby.ifs.umich.edu' and \ 409 my_dns.ns[1].ttl == 3382 and \ 410 my_dns.ns[2].nsname == 'dns2.itd.umich.edu' 411 assert s == bytes(my_dns)
412
413 414 -def test_OPT():
415 s = b'\x8dn\x01\x10\x00\x01\x00\x00\x00\x00\x00\x01\x04x111\x06xxxx11\x06akamai\x03net\x00\x00\x01\x00\x01\x00\x00)\x0f\xa0\x00\x00\x80\x00\x00\x00' 416 my_dns = DNS(s) 417 my_rr = my_dns.ar[0] 418 assert my_rr.type == DNS_OPT 419 assert my_rr.rlen == 0 and my_rr.rdata == b'' 420 assert bytes(my_dns) == s 421 422 my_rr.rdata = b'\x00\x00\x00\x02\x00\x00' # add 1 attribute tlv 423 my_dns2 = DNS(bytes(my_dns)) 424 my_rr2 = my_dns2.ar[0] 425 assert my_rr2.rlen == 6 and my_rr2.rdata == b'\x00\x00\x00\x02\x00\x00'
426
427 428 -def test_pack_name():
429 # Empty name is \0 430 x = pack_name('', 0, {}) 431 assert x == b'\0'
432
433 434 -def test_random_data():
435 try: 436 DNS(b'\x83z0\xd2\x9a\xec\x94_7\xf3\xb7+\x85"?\xf0\xfb') 437 except dpkt.UnpackError: 438 pass 439 except: 440 assert False 441 else: 442 assert False
443
444 445 -def test_circular_pointers():
446 try: 447 DNS(b'\xc0\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\xc0\x00') 448 except dpkt.UnpackError: 449 pass 450 except: 451 assert False 452 else: 453 assert False
454
455 456 -def test_very_long_name():
457 try: 458 DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00' + (b'\x10abcdef0123456789' * 16) + b'\x00') 459 except dpkt.UnpackError: 460 pass 461 except: 462 assert False 463 else: 464 assert False
465
466 467 -def test_null_response():
468 s = b'\x12\xb0\x84\x00\x00\x01\x00\x01\x00\x00\x00\x00\x0bblahblah666\x06pirate\x03sea\x00\x00\n\x00\x01\xc0\x0c\x00\n\x00\x01\x00\x00\x00\x00\x00\tVACKD\x03\xc5\xe9\x01' 469 my_dns = DNS(s) 470 assert my_dns.qd[0].name == 'blahblah666.pirate.sea' and \ 471 my_dns.an[0].null == b'5641434b4403c5e901' 472 assert str(s) == str(my_dns)
473
474 475 -def test_txt_response():
476 buf = ( 477 b'\x10\x32\x81\x80\x00\x01\x00\x01\x00\x00\x00\x00\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f' 478 b'\x6d\x00\x00\x10\x00\x01\xc0\x0c\x00\x10\x00\x01\x00\x00\x01\x0e\x00\x10\x0f\x76\x3d\x73' 479 b'\x70\x66\x31\x20\x70\x74\x72\x20\x3f\x61\x6c\x6c') 480 my_dns = DNS(buf) 481 my_rr = my_dns.an[0] 482 assert my_rr.type == DNS_TXT 483 assert my_rr.name == 'google.com' 484 assert my_rr.text == ['v=spf1 ptr ?all'] 485 assert str(my_dns) == str(buf) 486 assert bytes(my_dns) == buf
487 488 489 if __name__ == '__main__': 490 # Runs all the test associated with this class/file 491 test_basic() 492 test_PTR() 493 test_OPT() 494 test_pack_name() 495 test_random_data() 496 test_circular_pointers() 497 test_very_long_name() 498 test_null_response() 499 test_txt_response() 500 test_deprecated_methods() 501 test_deprecated_method_performance() 502 print('Tests Successful...') 503