Package dpkt :: Module ip6
[hide private]
[frames] | no frames]

Source Code for Module dpkt.ip6

  1  # $Id: ip6.py 87 2013-03-05 19:41:04Z andrewflnr@gmail.com $ 
  2  # -*- coding: utf-8 -*- 
  3  """Internet Protocol, version 6.""" 
  4  from __future__ import print_function 
  5  from __future__ import absolute_import 
  6   
  7  from . import dpkt 
  8  from . import ip 
  9  from .decorators import deprecated 
 10  from .compat import compat_ord 
11 12 13 -class IP6(dpkt.Packet):
14 """Internet Protocol, version 6. 15 16 TODO: Longer class information.... 17 18 Attributes: 19 __hdr__: Header fields of IPv6. 20 TODO. 21 """ 22 23 __hdr__ = ( 24 ('_v_fc_flow', 'I', 0x60000000), 25 ('plen', 'H', 0), # payload length (not including header) 26 ('nxt', 'B', 0), # next header protocol 27 ('hlim', 'B', 0), # hop limit 28 ('src', '16s', ''), 29 ('dst', '16s', '') 30 ) 31 32 _protosw = ip.IP._protosw 33 34 @property
35 - def v(self):
36 return self._v_fc_flow >> 28
37 38 @v.setter
39 - def v(self, v):
40 self._v_fc_flow = (self._v_fc_flow & ~0xf0000000) | (v << 28)
41 42 @property
43 - def fc(self):
44 return (self._v_fc_flow >> 20) & 0xff
45 46 @fc.setter
47 - def fc(self, v):
48 self._v_fc_flow = (self._v_fc_flow & ~0xff00000) | (v << 20)
49 50 @property
51 - def flow(self):
52 return self._v_fc_flow & 0xfffff
53 54 @flow.setter
55 - def flow(self, v):
56 self._v_fc_flow = (self._v_fc_flow & ~0xfffff) | (v & 0xfffff)
57
58 - def unpack(self, buf):
59 dpkt.Packet.unpack(self, buf) 60 self.extension_hdrs = {} 61 62 if self.plen: 63 buf = self.data[:self.plen] 64 else: # due to jumbo payload or TSO 65 buf = self.data 66 67 next_ext_hdr = self.nxt 68 69 while next_ext_hdr in ext_hdrs: 70 ext = ext_hdrs_cls[next_ext_hdr](buf) 71 self.extension_hdrs[next_ext_hdr] = ext 72 buf = buf[ext.length:] 73 next_ext_hdr = getattr(ext, 'nxt', None) 74 75 # set the payload protocol id 76 if next_ext_hdr is not None: 77 self.p = next_ext_hdr 78 79 try: 80 self.data = self._protosw[next_ext_hdr](buf) 81 setattr(self, self.data.__class__.__name__.lower(), self.data) 82 except (KeyError, dpkt.UnpackError): 83 self.data = buf
84
85 - def headers_str(self):
86 """Output extension headers in order defined in RFC1883 (except dest opts)""" 87 88 header_str = b"" 89 90 for hdr in ext_hdrs: 91 if hdr in self.extension_hdrs: 92 header_str += bytes(self.extension_hdrs[hdr]) 93 return header_str
94
95 - def __bytes__(self):
96 if (self.p == 6 or self.p == 17 or self.p == 58) and not self.data.sum: 97 # XXX - set TCP, UDP, and ICMPv6 checksums 98 p = bytes(self.data) 99 s = dpkt.struct.pack('>16s16sxBH', self.src, self.dst, self.nxt, len(p)) 100 s = dpkt.in_cksum_add(0, s) 101 s = dpkt.in_cksum_add(s, p) 102 try: 103 self.data.sum = dpkt.in_cksum_done(s) 104 except AttributeError: 105 pass 106 return self.pack_hdr() + self.headers_str() + bytes(self.data)
107 108 @classmethod
109 - def set_proto(cls, p, pktclass):
110 cls._protosw[p] = pktclass
111 112 @classmethod
113 - def get_proto(cls, p):
114 return cls._protosw[p]
115
116 117 -class IP6ExtensionHeader(dpkt.Packet):
118 """ 119 An extension header is very similar to a 'sub-packet'. 120 We just want to re-use all the hdr unpacking etc. 121 """ 122 pass
123
124 125 -class IP6OptsHeader(IP6ExtensionHeader):
126 __hdr__ = ( 127 ('nxt', 'B', 0), # next extension header protocol 128 ('len', 'B', 0) # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header 129 ) 130
131 - def unpack(self, buf):
132 dpkt.Packet.unpack(self, buf) 133 self.length = (self.len + 1) * 8 134 options = [] 135 136 index = 0 137 138 while index < self.length - 2: 139 opt_type = compat_ord(self.data[index]) 140 141 # PAD1 option 142 if opt_type == 0: 143 index += 1 144 continue 145 146 opt_length = compat_ord(self.data[index + 1]) 147 148 if opt_type == 1: # PADN option 149 # PADN uses opt_length bytes in total 150 index += opt_length + 2 151 continue 152 153 options.append( 154 {'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]}) 155 156 # add the two chars and the option_length, to move to the next option 157 index += opt_length + 2 158 159 self.options = options
160
161 162 -class IP6HopOptsHeader(IP6OptsHeader):
163 pass
164
165 166 -class IP6DstOptsHeader(IP6OptsHeader):
167 pass
168
169 170 -class IP6RoutingHeader(IP6ExtensionHeader):
171 __hdr__ = ( 172 ('nxt', 'B', 0), # next extension header protocol 173 ('len', 'B', 0), # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0) 174 ('type', 'B', 0), # routing type (currently, only 0 is used) 175 ('segs_left', 'B', 0), # remaining segments in route, until destination (<= 23) 176 ('rsvd_sl_bits', 'I', 0), # reserved (1 byte), strict/loose bitmap for addresses 177 ) 178 179 @property
180 - def sl_bits(self):
181 return self.rsvd_sl_bits & 0xffffff
182 183 @sl_bits.setter
184 - def sl_bits(self, v):
185 self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff)
186
187 - def unpack(self, buf):
188 hdr_size = 8 189 addr_size = 16 190 191 dpkt.Packet.unpack(self, buf) 192 193 addresses = [] 194 num_addresses = self.len // 2 195 buf = buf[hdr_size:hdr_size + num_addresses * addr_size] 196 197 for i in range(num_addresses): 198 addresses.append(buf[i * addr_size: i * addr_size + addr_size]) 199 200 self.data = buf 201 self.addresses = addresses 202 self.length = self.len * 8 + 8
203
204 205 -class IP6FragmentHeader(IP6ExtensionHeader):
206 __hdr__ = ( 207 ('nxt', 'B', 0), # next extension header protocol 208 ('resv', 'B', 0), # reserved, set to 0 209 ('frag_off_resv_m', 'H', 0), # frag offset (13 bits), reserved zero (2 bits), More frags flag 210 ('id', 'I', 0) # fragments id 211 ) 212
213 - def unpack(self, buf):
214 dpkt.Packet.unpack(self, buf) 215 self.length = self.__hdr_len__ 216 self.data = b''
217 218 @property
219 - def frag_off(self):
220 return self.frag_off_resv_m >> 3
221 222 @frag_off.setter
223 - def frag_off(self, v):
224 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3)
225 226 @property
227 - def m_flag(self):
228 return self.frag_off_resv_m & 1
229 230 @m_flag.setter
231 - def m_flag(self, v):
232 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfffe) | v
233
234 235 -class IP6AHHeader(IP6ExtensionHeader):
236 __hdr__ = ( 237 ('nxt', 'B', 0), # next extension header protocol 238 ('len', 'B', 0), # length of header in 4 octet units (ignoring first 2 units) 239 ('resv', 'H', 0), # reserved, 2 bytes of 0 240 ('spi', 'I', 0), # SPI security parameter index 241 ('seq', 'I', 0) # sequence no. 242 ) 243
244 - def unpack(self, buf):
245 dpkt.Packet.unpack(self, buf) 246 self.length = (self.len + 2) * 4 247 self.auth_data = self.data[:(self.len - 1) * 4]
248
249 250 -class IP6ESPHeader(IP6ExtensionHeader):
251 __hdr__ = ( 252 ('spi', 'I', 0), 253 ('seq', 'I', 0) 254 ) 255
256 - def unpack(self, buf):
257 dpkt.Packet.unpack(self, buf) 258 self.length = self.__hdr_len__ + len(self.data)
259 260 261 ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, 262 ip.IP_PROTO_DSTOPTS] 263 ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader, 264 ip.IP_PROTO_ROUTING: IP6RoutingHeader, 265 ip.IP_PROTO_FRAGMENT: IP6FragmentHeader, 266 ip.IP_PROTO_ESP: IP6ESPHeader, 267 ip.IP_PROTO_AH: IP6AHHeader, 268 ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader}
269 270 271 -def test_ipg():
272 s = b'`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00' 273 _ip = IP6(s) 274 # print `ip` 275 _ip.data.sum = 0 276 s2 = bytes(_ip) 277 IP6(s) 278 # print `ip2` 279 assert (s == s2)
280
281 282 -def test_ip6_routing_header():
283 s = b'`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00' 284 ip = IP6(s) 285 s2 = bytes(ip) 286 # 43 is Routing header id 287 assert (len(ip.extension_hdrs[43].addresses) == 2) 288 assert ip.tcp 289 assert (s == s2) 290 assert bytes(ip) == s
291
292 293 -def test_ip6_fragment_header():
294 s = b'\x06\xee\xff\xfb\x00\x00\xff\xff' 295 fh = IP6FragmentHeader(s) 296 # s2 = str(fh) variable 's2' is not used 297 bytes(fh) 298 assert (fh.nxt == 6) 299 assert (fh.id == 65535) 300 assert (fh.frag_off == 8191) 301 assert (fh.m_flag == 1) 302 assert bytes(fh) == s 303 304 # IP6 with fragment header 305 s = b'\x60\x00\x00\x00\x00\x10\x2c\x00\x02\x22\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x03\x33\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x29\x00\x00\x01\x00\x00\x00\x00\x60\x00\x00\x00\x00\x10\x2c\x00' 306 ip = IP6(s) 307 assert bytes(ip) == s
308
309 310 -def test_ip6_options_header():
311 s = b';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00' 312 options = IP6OptsHeader(s).options 313 assert (len(options) == 3) 314 assert bytes(IP6OptsHeader(s)) == s
315
316 317 -def test_ip6_ah_header():
318 s = b';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 319 ah = IP6AHHeader(s) 320 assert (ah.length == 24) 321 assert (ah.auth_data == b'xxxxxxxx') 322 assert (ah.spi == 0x2020202) 323 assert (ah.seq == 0x1010101) 324 assert bytes(ah) == s
325
326 327 -def test_ip6_esp_header():
328 s = b'\x00\x00\x01\x00\x00\x00\x00\x44\xe2\x4f\x9e\x68\xf3\xcd\xb1\x5f\x61\x65\x42\x8b\x78\x0b\x4a\xfd\x13\xf0\x15\x98\xf5\x55\x16\xa8\x12\xb3\xb8\x4d\xbc\x16\xb2\x14\xbe\x3d\xf9\x96\xd4\xa0\x39\x1f\x85\x74\x25\x81\x83\xa6\x0d\x99\xb6\xba\xa3\xcc\xb6\xe0\x9a\x78\xee\xf2\xaf\x9a' 329 esp = IP6ESPHeader(s) 330 assert esp.length == 68 331 assert esp.spi == 256 332 assert bytes(esp) == s
333
334 335 -def test_ip6_extension_headers():
336 p = b'`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00' 337 ip = IP6(p) 338 o = b';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00' 339 options = IP6HopOptsHeader(o) 340 ip.extension_hdrs[0] = options 341 fh = b'\x06\xee\xff\xfb\x00\x00\xff\xff' 342 ip.extension_hdrs[44] = IP6FragmentHeader(fh) 343 ah = b';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 344 ip.extension_hdrs[51] = IP6AHHeader(ah) 345 do = b';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 346 ip.extension_hdrs[60] = IP6DstOptsHeader(do) 347 assert (len(ip.extension_hdrs) == 5)
348 349 350 if __name__ == '__main__': 351 test_ipg() 352 test_ip6_routing_header() 353 test_ip6_fragment_header() 354 test_ip6_options_header() 355 test_ip6_ah_header() 356 test_ip6_esp_header() 357 test_ip6_extension_headers() 358 print('Tests Successful...') 359