Package dpkt :: Module ip6
[hide private]
[frames] | no frames]

Source Code for Module dpkt.ip6

  1  # $Id: ip6.py 58 2010-03-06 00:06:14Z dugsong $ 
  2   
  3  """Internet Protocol, version 6.""" 
  4   
  5  import dpkt 
  6   
7 -class IP6(dpkt.Packet):
8 __hdr__ = ( 9 ('v_fc_flow', 'I', 0x60000000L), 10 ('plen', 'H', 0), # payload length (not including header) 11 ('nxt', 'B', 0), # next header protocol 12 ('hlim', 'B', 0), # hop limit 13 ('src', '16s', ''), 14 ('dst', '16s', '') 15 ) 16 _protosw = {} # XXX - shared with IP 17
18 - def _get_v(self):
19 return self.v_fc_flow >> 28
20 - def _set_v(self, v):
21 self.v_fc_flow = (self.v_fc_flow & ~0xf0000000L) | (v << 28)
22 v = property(_get_v, _set_v) 23
24 - def _get_fc(self):
25 return (self.v_fc_flow >> 20) & 0xff
26 - def _set_fc(self, v):
27 self.v_fc_flow = (self.v_fc_flow & ~0xff00000L) | (v << 20)
28 fc = property(_get_fc, _set_fc) 29
30 - def _get_flow(self):
31 return self.v_fc_flow & 0xfffff
32 - def _set_flow(self, v):
33 self.v_fc_flow = (self.v_fc_flow & ~0xfffff) | (v & 0xfffff)
34 flow = property(_get_flow, _set_flow) 35
36 - def unpack(self, buf):
37 dpkt.Packet.unpack(self, buf) 38 self.extension_hdrs = dict(((i, None) for i in ext_hdrs)) 39 40 buf = self.data[:self.plen] 41 42 next = self.nxt 43 44 while (next in ext_hdrs): 45 ext = ext_hdrs_cls[next](buf) 46 self.extension_hdrs[next] = ext 47 buf = buf[ext.length:] 48 next = ext.nxt 49 50 # set the payload protocol id 51 setattr(self, 'p', next) 52 53 try: 54 self.data = self._protosw[next](buf) 55 setattr(self, self.data.__class__.__name__.lower(), self.data) 56 except (KeyError, dpkt.UnpackError): 57 self.data = buf
58
59 - def headers_str(self):
60 """ 61 Output extension headers in order defined in RFC1883 (except dest opts) 62 """ 63 64 header_str = "" 65 66 for hdr in ext_hdrs: 67 if not self.extension_hdrs[hdr] is None: 68 header_str += str(self.extension_hdrs[hdr]) 69 return header_str
70 71
72 - def __str__(self):
73 if (self.nxt == 6 or self.nxt == 17 or self.nxt == 58) and \ 74 not self.data.sum: 75 # XXX - set TCP, UDP, and ICMPv6 checksums 76 p = str(self.data) 77 s = dpkt.struct.pack('>16s16sxBH', self.src, self.dst, self.nxt, len(p)) 78 s = dpkt.in_cksum_add(0, s) 79 s = dpkt.in_cksum_add(s, p) 80 try: 81 self.data.sum = dpkt.in_cksum_done(s) 82 except AttributeError: 83 pass 84 return self.pack_hdr() + self.headers_str() + str(self.data)
85
86 - def set_proto(cls, p, pktclass):
87 cls._protosw[p] = pktclass
88 set_proto = classmethod(set_proto) 89
90 - def get_proto(cls, p):
91 return cls._protosw[p]
92 get_proto = classmethod(get_proto)
93 94 # XXX - auto-load IP6 dispatch table from IP dispatch table 95 import ip 96 IP6._protosw.update(ip.IP._protosw) 97
98 -class IP6ExtensionHeader(dpkt.Packet):
99 """ 100 An extension header is very similar to a 'sub-packet'. 101 We just want to re-use all the hdr unpacking etc. 102 """ 103 pass 104
105 -class IP6OptsHeader(IP6ExtensionHeader):
106 __hdr__ = ( 107 ('nxt', 'B', 0), # next extension header protocol 108 ('len', 'B', 0) # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header 109 ) 110
111 - def unpack(self, buf):
112 dpkt.Packet.unpack(self, buf) 113 setattr(self, 'length', (self.len + 1) * 8) 114 options = [] 115 116 index = 0 117 118 while (index < self.length - 2): 119 opt_type = ord(self.data[index]) 120 121 # PAD1 option 122 if opt_type == 0: 123 index += 1 124 continue; 125 126 opt_length = ord(self.data[index + 1]) 127 128 if opt_type == 1: # PADN option 129 # PADN uses opt_length bytes in total 130 index += opt_length + 2 131 continue 132 133 options.append({'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]}) 134 135 # add the two chars and the option_length, to move to the next option 136 index += opt_length + 2 137 138 setattr(self, 'options', options)
139
140 -class IP6HopOptsHeader(IP6OptsHeader): pass
141
142 -class IP6DstOptsHeader(IP6OptsHeader): pass
143
144 -class IP6RoutingHeader(IP6ExtensionHeader):
145 __hdr__ = ( 146 ('nxt', 'B', 0), # next extension header protocol 147 ('len', 'B', 0), # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0) 148 ('type', 'B', 0), # routing type (currently, only 0 is used) 149 ('segs_left', 'B', 0), # remaining segments in route, until destination (<= 23) 150 ('rsvd_sl_bits', 'I', 0), # reserved (1 byte), strict/loose bitmap for addresses 151 ) 152
153 - def _get_sl_bits(self):
154 return self.rsvd_sl_bits & 0xffffff
155 - def _set_sl_bits(self, v):
156 self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff)
157 sl_bits = property(_get_sl_bits, _set_sl_bits) 158
159 - def unpack(self, buf):
160 hdr_size = 8 161 addr_size = 16 162 163 dpkt.Packet.unpack(self, buf) 164 165 addresses = [] 166 num_addresses = self.len / 2 167 buf = buf[hdr_size:hdr_size + num_addresses * addr_size] 168 169 for i in range(num_addresses): 170 addresses.append(buf[i * addr_size: i * addr_size + addr_size]) 171 172 self.data = buf 173 setattr(self, 'addresses', addresses) 174 setattr(self, 'length', self.len * 8 + 8)
175
176 -class IP6FragmentHeader(IP6ExtensionHeader):
177 __hdr__ = ( 178 ('nxt', 'B', 0), # next extension header protocol 179 ('resv', 'B', 0), # reserved, set to 0 180 ('frag_off_resv_m', 'H', 0), # frag offset (13 bits), reserved zero (2 bits), More frags flag 181 ('id', 'I', 0) # fragments id 182 ) 183
184 - def unpack(self, buf):
185 dpkt.Packet.unpack(self, buf) 186 setattr(self, 'length', self.__hdr_len__)
187
188 - def _get_frag_off(self):
189 return self.frag_off_resv_m >> 3
190 - def _set_frag_off(self, v):
191 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3)
192 frag_off = property(_get_frag_off, _set_frag_off) 193
194 - def _get_m_flag(self):
195 return self.frag_off_resv_m & 1
196 - def _set_m_flag(self, v):
197 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfffe) | v
198 m_flag = property(_get_m_flag, _set_m_flag)
199
200 -class IP6AHHeader(IP6ExtensionHeader):
201 __hdr__ = ( 202 ('nxt', 'B', 0), # next extension header protocol 203 ('len', 'B', 0), # length of header in 4 octet units (ignoring first 2 units) 204 ('resv', 'H', 0), # reserved, 2 bytes of 0 205 ('spi', 'I', 0), # SPI security parameter index 206 ('seq', 'I', 0) # sequence no. 207 ) 208
209 - def unpack(self, buf):
210 dpkt.Packet.unpack(self, buf) 211 setattr(self, 'length', (self.len + 2) * 4) 212 setattr(self, 'auth_data', self.data[:(self.len - 1) * 4])
213 214
215 -class IP6ESPHeader(IP6ExtensionHeader):
216 - def unpack(self, buf):
217 raise NotImplementedError("ESP extension headers are not supported.")
218 219 220 ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, ip.IP_PROTO_DSTOPTS] 221 ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader, 222 ip.IP_PROTO_ROUTING: IP6RoutingHeader, 223 ip.IP_PROTO_FRAGMENT: IP6FragmentHeader, 224 ip.IP_PROTO_ESP: IP6ESPHeader, 225 ip.IP_PROTO_AH: IP6AHHeader, 226 ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader} 227 228 if __name__ == '__main__': 229 import unittest 230
231 - class IP6TestCase(unittest.TestCase):
232
233 - def test_IP6(self):
234 s = '`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00' 235 ip = IP6(s) 236 #print `ip` 237 ip.data.sum = 0 238 s2 = str(ip) 239 ip2 = IP6(s) 240 #print `ip2` 241 assert(s == s2)
242
243 - def test_IP6RoutingHeader(self):
244 s = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00' 245 ip = IP6(s) 246 s2 = str(ip) 247 # 43 is Routing header id 248 assert(len(ip.extension_hdrs[43].addresses) == 2) 249 assert(ip.tcp) 250 assert(s == s2)
251 252
253 - def test_IP6FragmentHeader(self):
254 s = '\x06\xee\xff\xfb\x00\x00\xff\xff' 255 fh = IP6FragmentHeader(s) 256 s2 = str(fh) 257 assert(fh.nxt == 6) 258 assert(fh.id == 65535) 259 assert(fh.frag_off == 8191) 260 assert(fh.m_flag == 1)
261
262 - def test_IP6OptionsHeader(self):
263 s = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00' 264 options = IP6OptsHeader(s).options 265 assert(len(options) == 3)
266
267 - def test_IP6AHHeader(self):
268 s = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 269 ah = IP6AHHeader(s) 270 assert(ah.length == 24) 271 assert(ah.auth_data == 'xxxxxxxx') 272 assert(ah.spi == 0x2020202) 273 assert(ah.seq == 0x1010101)
274
275 - def test_IP6ExtensionHeaders(self):
276 p = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00' 277 ip = IP6(p) 278 279 o = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00' 280 options = IP6HopOptsHeader(o) 281 282 ip.extension_hdrs[0] = options 283 284 fh = '\x06\xee\xff\xfb\x00\x00\xff\xff' 285 ip.extension_hdrs[44] = IP6FragmentHeader(fh) 286 287 ah = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 288 ip.extension_hdrs[51] = IP6AHHeader(ah) 289 290 do = ';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 291 ip.extension_hdrs[60] = IP6DstOptsHeader(do) 292 293 assert(len([k for k in ip.extension_hdrs if (not ip.extension_hdrs[k] is None)]) == 5)
294 295 unittest.main() 296