Package dns :: Module rdata
[hide private]
[frames] | no frames]

Source Code for Module dns.rdata

  1  # Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. 
  2  # 
  3  # Permission to use, copy, modify, and distribute this software and its 
  4  # documentation for any purpose with or without fee is hereby granted, 
  5  # provided that the above copyright notice and this permission notice 
  6  # appear in all copies. 
  7  # 
  8  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
  9  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 10  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 11  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 12  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 13  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 14  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 15   
 16  """DNS rdata. 
 17   
 18  @var _rdata_modules: A dictionary mapping a (rdclass, rdtype) tuple to 
 19  the module which implements that type. 
 20  @type _rdata_modules: dict 
 21  @var _module_prefix: The prefix to use when forming modules names.  The 
 22  default is 'dns.rdtypes'.  Changing this value will break the library. 
 23  @type _module_prefix: string 
 24  @var _hex_chunk: At most this many octets that will be represented in each 
 25  chunk of hexstring that _hexify() produces before whitespace occurs. 
 26  @type _hex_chunk: int""" 
 27   
 28  import base64 
 29  import io 
 30   
 31  import dns.exception 
 32  import dns.name 
 33  import dns.rdataclass 
 34  import dns.rdatatype 
 35  import dns.tokenizer 
 36  import dns.wiredata 
 37  import dns.util 
 38   
 39  _hex_chunksize = 32 
40 41 -def _hexify(data, chunksize=None):
42 """Convert a binary string into its hex encoding, broken up into chunks 43 of I{chunksize} characters separated by a space. 44 45 @param data: the binary string 46 @type data: string 47 @param chunksize: the chunk size. Default is L{dns.rdata._hex_chunksize} 48 @rtype: string 49 """ 50 51 if chunksize is None: 52 chunksize = _hex_chunksize 53 hex = base64.b16encode(data).decode('ascii').lower() 54 l = len(hex) 55 if l > chunksize: 56 chunks = [] 57 i = 0 58 while i < l: 59 chunks.append(hex[i : i + chunksize]) 60 i += chunksize 61 hex = ' '.join(chunks) 62 return hex
63 64 _base64_chunksize = 32
65 66 -def _base64ify(data, chunksize=None):
67 """Convert a binary string into its base64 encoding, broken up into chunks 68 of I{chunksize} characters separated by a space. 69 70 @param data: the binary string 71 @type data: string 72 @param chunksize: the chunk size. Default is 73 L{dns.rdata._base64_chunksize} 74 @rtype: string 75 """ 76 77 if chunksize is None: 78 chunksize = _base64_chunksize 79 b64 = base64.b64encode(data).decode('ascii') 80 l = len(b64) 81 if l > chunksize: 82 chunks = [] 83 i = 0 84 while i < l: 85 chunks.append(b64[i : i + chunksize]) 86 i += chunksize 87 b64 = ' '.join(chunks) 88 return b64
89 90 _escaped = frozenset('"\\')
91 92 -def _escapify(qstring):
93 """Escape the characters in a quoted string which need it. 94 95 @param qstring: the string 96 @type qstring: string 97 @returns: the escaped string 98 @rtype: string 99 """ 100 101 if isinstance(qstring, bytes): 102 qstring = qstring.decode('latin_1') 103 text = '' 104 for c in qstring: 105 if c in _escaped: 106 text += '\\' + c 107 elif ord(c) >= 0x20 and ord(c) < 0x7F: 108 text += c 109 else: 110 text += '\\%03d' % ord(c) 111 return text
112
113 -def _truncate_bitmap(what):
114 """Determine the index of greatest byte that isn't all zeros, and 115 return the bitmap that contains all the bytes less than that index. 116 117 @param what: a string of octets representing a bitmap. 118 @type what: string 119 @rtype: string 120 """ 121 122 for i in range(len(what) - 1, -1, -1): 123 if what[i] != 0: 124 break 125 return bytes(what[0 : i + 1])
126
127 -class Rdata(object):
128 """Base class for all DNS rdata types. 129 """ 130 131 __slots__ = ['rdclass', 'rdtype'] 132
133 - def __init__(self, rdclass, rdtype):
134 """Initialize an rdata. 135 @param rdclass: The rdata class 136 @type rdclass: int 137 @param rdtype: The rdata type 138 @type rdtype: int 139 """ 140 141 self.rdclass = rdclass 142 self.rdtype = rdtype
143
144 - def covers(self):
145 """DNS SIG/RRSIG rdatas apply to a specific type; this type is 146 returned by the covers() function. If the rdata type is not 147 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when 148 creating rdatasets, allowing the rdataset to contain only RRSIGs 149 of a particular type, e.g. RRSIG(NS). 150 @rtype: int 151 """ 152 153 return dns.rdatatype.NONE
154
155 - def extended_rdatatype(self):
156 """Return a 32-bit type value, the least significant 16 bits of 157 which are the ordinary DNS type, and the upper 16 bits of which are 158 the "covered" type, if any. 159 @rtype: int 160 """ 161 162 return self.covers() << 16 | self.rdtype
163
164 - def to_text(self, origin=None, relativize=True, **kw):
165 """Convert an rdata to text format. 166 @rtype: string 167 """ 168 raise NotImplementedError
169
170 - def to_wire(self, file, compress = None, origin = None):
171 """Convert an rdata to wire format. 172 @rtype: string 173 """ 174 175 raise NotImplementedError
176
177 - def to_digestable(self, origin = None):
178 """Convert rdata to a format suitable for digesting in hashes. This 179 is also the DNSSEC canonical form.""" 180 f = io.BytesIO() 181 self.to_wire(f, None, origin) 182 return f.getvalue()
183
184 - def validate(self):
185 """Check that the current contents of the rdata's fields are 186 valid. If you change an rdata by assigning to its fields, 187 it is a good idea to call validate() when you are done making 188 changes. 189 """ 190 dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
191
192 - def __repr__(self):
193 covers = self.covers() 194 if covers == dns.rdatatype.NONE: 195 ctext = '' 196 else: 197 ctext = '(' + dns.rdatatype.to_text(covers) + ')' 198 return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \ 199 dns.rdatatype.to_text(self.rdtype) + ctext + ' rdata: ' + \ 200 str(self) + '>'
201
202 - def __str__(self):
203 return self.to_text()
204
205 - def _cmp(self, other):
206 """Compare an rdata with another rdata of the same rdtype and 207 rdclass. Return < 0 if self < other in the DNSSEC ordering, 208 0 if self == other, and > 0 if self > other. 209 """ 210 raise NotImplementedError
211
212 - def __eq__(self, other):
213 if not isinstance(other, Rdata): 214 return False 215 if self.rdclass != other.rdclass or \ 216 self.rdtype != other.rdtype: 217 return False 218 return self._cmp(other) == 0
219
220 - def __ne__(self, other):
221 if not isinstance(other, Rdata): 222 return True 223 if self.rdclass != other.rdclass or \ 224 self.rdtype != other.rdtype: 225 return True 226 return self._cmp(other) != 0
227
228 - def __lt__(self, other):
229 if not isinstance(other, Rdata): 230 return NotImplemented 231 if self.rdclass != other.rdclass or \ 232 self.rdtype != other.rdtype: 233 return (self.rdclass, self.rdtype) < (other.rdclass, other.rdtype) 234 return self._cmp(other) < 0
235
236 - def __le__(self, other):
237 if not isinstance(other, Rdata): 238 return NotImplemented 239 if self.rdclass != other.rdclass or \ 240 self.rdtype != other.rdtype: 241 return (self.rdclass, self.rdtype) <= (other.rdclass, other.rdtype) 242 return self._cmp(other) <= 0
243
244 - def __ge__(self, other):
245 if not isinstance(other, Rdata): 246 return NotImplemented 247 if self.rdclass != other.rdclass or \ 248 self.rdtype != other.rdtype: 249 return (self.rdclass, self.rdtype) >= (other.rdclass, other.rdtype) 250 return self._cmp(other) >= 0
251
252 - def __gt__(self, other):
253 if not isinstance(other, Rdata): 254 return NotImplemented 255 if self.rdclass != other.rdclass or \ 256 self.rdtype != other.rdtype: 257 return (self.rdclass, self.rdtype) > (other.rdclass, other.rdtype) 258 return self._cmp(other) > 0
259
260 - def __hash__(self):
261 return hash(self.to_digestable(dns.name.root))
262
263 - def _wire_cmp(self, other):
264 # A number of types compare rdata in wire form, so we provide 265 # the method here instead of duplicating it. 266 # 267 # We specifiy an arbitrary origin of '.' when doing the 268 # comparison, since the rdata may have relative names and we 269 # can't convert a relative name to wire without an origin. 270 b1 = io.BytesIO() 271 self.to_wire(b1, None, dns.name.root) 272 b2 = io.BytesIO() 273 other.to_wire(b2, None, dns.name.root) 274 return dns.util.cmp(b1.getvalue(), b2.getvalue())
275 276 @classmethod
277 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
278 """Build an rdata object from text format. 279 280 @param rdclass: The rdata class 281 @type rdclass: int 282 @param rdtype: The rdata type 283 @type rdtype: int 284 @param tok: The tokenizer 285 @type tok: dns.tokenizer.Tokenizer 286 @param origin: The origin to use for relative names 287 @type origin: dns.name.Name 288 @param relativize: should names be relativized? 289 @type relativize: bool 290 @rtype: dns.rdata.Rdata instance 291 """ 292 293 raise NotImplementedError
294 295 @classmethod
296 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
297 """Build an rdata object from wire format 298 299 @param rdclass: The rdata class 300 @type rdclass: int 301 @param rdtype: The rdata type 302 @type rdtype: int 303 @param wire: The wire-format message 304 @type wire: string 305 @param current: The offet in wire of the beginning of the rdata. 306 @type current: int 307 @param rdlen: The length of the wire-format rdata 308 @type rdlen: int 309 @param origin: The origin to use for relative names 310 @type origin: dns.name.Name 311 @rtype: dns.rdata.Rdata instance 312 """ 313 314 raise NotImplementedError
315
316 - def choose_relativity(self, origin = None, relativize = True):
317 """Convert any domain names in the rdata to the specified 318 relativization. 319 """ 320 321 pass
322
323 324 -class GenericRdata(Rdata):
325 """Generate Rdata Class 326 327 This class is used for rdata types for which we have no better 328 implementation. It implements the DNS "unknown RRs" scheme. 329 """ 330 331 __slots__ = ['data'] 332
333 - def __init__(self, rdclass, rdtype, data):
334 super(GenericRdata, self).__init__(rdclass, rdtype) 335 self.data = data
336
337 - def to_text(self, origin=None, relativize=True, **kw):
338 return r'\# %d ' % len(self.data) + _hexify(self.data)
339 340 @classmethod
341 - def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
342 token = tok.get() 343 if not token.is_identifier() or token.value != '\#': 344 print('XXX %u %u' % (rdclass, rdtype)) 345 raise dns.exception.SyntaxError(r'generic rdata does not start with \#') 346 length = tok.get_int() 347 chunks = [] 348 while 1: 349 token = tok.get() 350 if token.is_eol_or_eof(): 351 break 352 chunks.append(token.value) 353 data = bytes.fromhex(''.join(chunks)) 354 if len(data) != length: 355 raise dns.exception.SyntaxError('generic rdata hex data has wrong length') 356 return cls(rdclass, rdtype, data)
357
358 - def to_wire(self, file, compress = None, origin = None):
359 file.write(self.data)
360 361 @classmethod
362 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
363 return cls(rdclass, rdtype, wire[current : current + rdlen])
364
365 - def _cmp(self, other):
366 return dns.util.cmp(self.data, other.data)
367 368 _rdata_modules = {} 369 _module_prefix = 'dns.rdtypes'
370 371 -def get_rdata_class(rdclass, rdtype):
372 373 def import_module(name): 374 mod = __import__(name) 375 components = name.split('.') 376 for comp in components[1:]: 377 mod = getattr(mod, comp) 378 return mod
379 380 mod = _rdata_modules.get((rdclass, rdtype)) 381 rdclass_text = dns.rdataclass.to_text(rdclass) 382 rdtype_text = dns.rdatatype.to_text(rdtype) 383 rdtype_text = rdtype_text.replace('-', '_') 384 if not mod: 385 mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype)) 386 if not mod: 387 try: 388 mod = import_module('.'.join([_module_prefix, 389 rdclass_text, rdtype_text])) 390 _rdata_modules[(rdclass, rdtype)] = mod 391 except ImportError: 392 try: 393 mod = import_module('.'.join([_module_prefix, 394 'ANY', rdtype_text])) 395 _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod 396 except ImportError: 397 mod = None 398 if mod: 399 cls = getattr(mod, rdtype_text) 400 else: 401 cls = GenericRdata 402 return cls 403
404 -def from_text(rdclass, rdtype, tok, origin = None, relativize = True):
405 """Build an rdata object from text format. 406 407 This function attempts to dynamically load a class which 408 implements the specified rdata class and type. If there is no 409 class-and-type-specific implementation, the GenericRdata class 410 is used. 411 412 Once a class is chosen, its from_text() class method is called 413 with the parameters to this function. 414 415 If I{tok} is a string, then a tokenizer is created and the string 416 is used as its input. 417 418 @param rdclass: The rdata class 419 @type rdclass: int 420 @param rdtype: The rdata type 421 @type rdtype: int 422 @param tok: The tokenizer or input text 423 @type tok: dns.tokenizer.Tokenizer or string 424 @param origin: The origin to use for relative names 425 @type origin: dns.name.Name 426 @param relativize: Should names be relativized? 427 @type relativize: bool 428 @rtype: dns.rdata.Rdata instance""" 429 430 if isinstance(tok, str): 431 tok = dns.tokenizer.Tokenizer(tok) 432 cls = get_rdata_class(rdclass, rdtype) 433 if cls != GenericRdata: 434 # peek at first token 435 token = tok.get() 436 tok.unget(token) 437 if token.is_identifier() and \ 438 token.value == r'\#': 439 # 440 # Known type using the generic syntax. Extract the 441 # wire form from the generic syntax, and then run 442 # from_wire on it. 443 # 444 rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin, 445 relativize) 446 return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data), 447 origin) 448 return cls.from_text(rdclass, rdtype, tok, origin, relativize)
449
450 -def from_wire(rdclass, rdtype, wire, current, rdlen, origin = None):
451 """Build an rdata object from wire format 452 453 This function attempts to dynamically load a class which 454 implements the specified rdata class and type. If there is no 455 class-and-type-specific implementation, the GenericRdata class 456 is used. 457 458 Once a class is chosen, its from_wire() class method is called 459 with the parameters to this function. 460 461 @param rdclass: The rdata class 462 @type rdclass: int 463 @param rdtype: The rdata type 464 @type rdtype: int 465 @param wire: The wire-format message 466 @type wire: string 467 @param current: The offet in wire of the beginning of the rdata. 468 @type current: int 469 @param rdlen: The length of the wire-format rdata 470 @type rdlen: int 471 @param origin: The origin to use for relative names 472 @type origin: dns.name.Name 473 @rtype: dns.rdata.Rdata instance""" 474 475 wire = dns.wiredata.maybe_wrap(wire) 476 cls = get_rdata_class(rdclass, rdtype) 477 return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin)
478