Package dpkt :: Module dpkt
[hide private]
[frames] | no frames]

Source Code for Module dpkt.dpkt

  1  # $Id: dpkt.py 43 2007-08-02 22:42:59Z jon.oberheide $ 
  2  # -*- coding: utf-8 -*- 
  3  """Simple packet creation and parsing.""" 
  4  from __future__ import absolute_import  
  5   
  6  import copy 
  7  import itertools 
  8  import socket 
  9  import struct 
 10  import array 
 11   
 12  from .compat import compat_ord, compat_izip, iteritems 
 13   
 14   
15 -class Error(Exception):
16 pass
17 18
19 -class UnpackError(Error):
20 pass
21 22
23 -class NeedData(UnpackError):
24 pass
25 26
27 -class PackError(Error):
28 pass
29 30
31 -class _MetaPacket(type):
32 - def __new__(cls, clsname, clsbases, clsdict):
33 t = type.__new__(cls, clsname, clsbases, clsdict) 34 st = getattr(t, '__hdr__', None) 35 if st is not None: 36 # XXX - __slots__ only created in __new__() 37 clsdict['__slots__'] = [x[0] for x in st] + ['data'] 38 t = type.__new__(cls, clsname, clsbases, clsdict) 39 t.__hdr_fields__ = [x[0] for x in st] 40 t.__hdr_fmt__ = getattr(t, '__byte_order__', '>') + ''.join([x[1] for x in st]) 41 t.__hdr_len__ = struct.calcsize(t.__hdr_fmt__) 42 t.__hdr_defaults__ = dict(compat_izip( 43 t.__hdr_fields__, [x[2] for x in st])) 44 return t
45 46
47 -class Packet(_MetaPacket("Temp", (object,), {})):
48 """Base packet class, with metaclass magic to generate members from self.__hdr__. 49 50 Attributes: 51 __hdr__: Packet header should be defined as a list of 52 (name, structfmt, default) tuples. 53 __byte_order__: Byte order, can be set to override the default ('>') 54 55 Example: 56 >>> class Foo(Packet): 57 ... __hdr__ = (('foo', 'I', 1), ('bar', 'H', 2), ('baz', '4s', 'quux')) 58 ... 59 >>> foo = Foo(bar=3) 60 >>> foo 61 Foo(bar=3) 62 >>> str(foo) 63 '\x00\x00\x00\x01\x00\x03quux' 64 >>> foo.bar 65 3 66 >>> foo.baz 67 'quux' 68 >>> foo.foo = 7 69 >>> foo.baz = 'whee' 70 >>> foo 71 Foo(baz='whee', foo=7, bar=3) 72 >>> Foo('hello, world!') 73 Foo(baz=' wor', foo=1751477356L, bar=28460, data='ld!') 74 """ 75
76 - def __init__(self, *args, **kwargs):
77 """Packet constructor with ([buf], [field=val,...]) prototype. 78 79 Arguments: 80 81 buf -- optional packet buffer to unpack 82 83 Optional keyword arguments correspond to members to set 84 (matching fields in self.__hdr__, or 'data'). 85 """ 86 self.data = b'' 87 if args: 88 try: 89 self.unpack(args[0]) 90 except struct.error: 91 if len(args[0]) < self.__hdr_len__: 92 raise NeedData 93 raise UnpackError('invalid %s: %r' % 94 (self.__class__.__name__, args[0])) 95 else: 96 for k in self.__hdr_fields__: 97 setattr(self, k, copy.copy(self.__hdr_defaults__[k])) 98 for k, v in iteritems(kwargs): 99 setattr(self, k, v)
100
101 - def __len__(self):
102 return self.__hdr_len__ + len(self.data)
103
104 - def __getitem__(self, k):
105 try: 106 return getattr(self, k) 107 except AttributeError: 108 raise KeyError
109
110 - def __repr__(self):
111 # Collect and display protocol fields in order: 112 # 1. public fields defined in __hdr__, unless their value is default 113 # 2. properties derived from _private fields defined in __hdr__ 114 # 3. dynamically added fields from self.__dict__, unless they are _private 115 # 4. self.data when it's present 116 117 l = [] 118 # maintain order of fields as defined in __hdr__ 119 for field_name, _, _ in getattr(self, '__hdr__', []): 120 field_value = getattr(self, field_name) 121 if field_value != self.__hdr_defaults__[field_name]: 122 if field_name[0] != '_': 123 l.append('%s=%r' % (field_name, field_value)) # (1) 124 else: 125 # interpret _private fields as name of properties joined by underscores 126 for prop_name in field_name.split('_'): # (2) 127 if isinstance(getattr(self.__class__, prop_name, None), property): 128 l.append('%s=%r' % (prop_name, getattr(self, prop_name))) 129 # (3) 130 l.extend( 131 ['%s=%r' % (attr_name, attr_value) 132 for attr_name, attr_value in iteritems(self.__dict__) 133 if attr_name[0] != '_' # exclude _private attributes 134 and attr_name != self.data.__class__.__name__.lower()]) # exclude fields like ip.udp 135 # (4) 136 if self.data: 137 l.append('data=%r' % self.data) 138 return '%s(%s)' % (self.__class__.__name__, ', '.join(l))
139
140 - def __str__(self):
141 return str(self.__bytes__())
142
143 - def __bytes__(self):
144 return self.pack_hdr() + bytes(self.data)
145
146 - def pack_hdr(self):
147 """Return packed header string.""" 148 try: 149 return struct.pack(self.__hdr_fmt__, 150 *[getattr(self, k) for k in self.__hdr_fields__]) 151 except struct.error: 152 vals = [] 153 for k in self.__hdr_fields__: 154 v = getattr(self, k) 155 if isinstance(v, tuple): 156 vals.extend(v) 157 else: 158 vals.append(v) 159 try: 160 return struct.pack(self.__hdr_fmt__, *vals) 161 except struct.error as e: 162 raise PackError(str(e))
163
164 - def pack(self):
165 """Return packed header + self.data string.""" 166 return bytes(self)
167
168 - def unpack(self, buf):
169 """Unpack packet header fields from buf, and set self.data.""" 170 for k, v in compat_izip(self.__hdr_fields__, 171 struct.unpack(self.__hdr_fmt__, buf[:self.__hdr_len__])): 172 setattr(self, k, v) 173 self.data = buf[self.__hdr_len__:]
174 175 # XXX - ''.join([(len(`chr(x)`)==3) and chr(x) or '.' for x in range(256)]) 176 __vis_filter = b'................................ !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[.]^_`abcdefghijklmnopqrstuvwxyz{|}~.................................................................................................................................' 177 178
179 -def hexdump(buf, length=16):
180 """Return a hexdump output string of the given buffer.""" 181 n = 0 182 res = [] 183 while buf: 184 line, buf = buf[:length], buf[length:] 185 hexa = ' '.join(['%02x' % compat_ord(x) for x in line]) 186 line = line.translate(__vis_filter).decode('utf-8') 187 res.append(' %04d: %-*s %s' % (n, length * 3, hexa, line)) 188 n += length 189 return '\n'.join(res)
190 191
192 -def in_cksum_add(s, buf):
193 n = len(buf) 194 cnt = (n // 2) * 2 195 a = array.array('H', buf[:cnt]) 196 if cnt != n: 197 a.append(compat_ord(buf[-1])) 198 return s + sum(a)
199 200
201 -def in_cksum_done(s):
202 s = (s >> 16) + (s & 0xffff) 203 s += (s >> 16) 204 return socket.ntohs(~s & 0xffff)
205 206
207 -def in_cksum(buf):
208 """Return computed Internet checksum.""" 209 return in_cksum_done(in_cksum_add(0, buf))
210 211
212 -def test_utils():
213 __buf = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e' 214 __hd = ' 0000: 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e ...............' 215 h = hexdump(__buf) 216 assert (h == __hd) 217 c = in_cksum(__buf) 218 assert (c == 51150)
219