1
2
3 """Internet Protocol, version 6."""
4
5 import dpkt
6
7 -class IP6(dpkt.Packet):
8 __hdr__ = (
9 ('v_fc_flow', 'I', 0x60000000L),
10 ('plen', 'H', 0),
11 ('nxt', 'B', 0),
12 ('hlim', 'B', 0),
13 ('src', '16s', ''),
14 ('dst', '16s', '')
15 )
16 _protosw = {}
17
22 v = property(_get_v, _set_v)
23
28 fc = property(_get_fc, _set_fc)
29
34 flow = property(_get_flow, _set_flow)
35
58
60 """
61 Output extension headers in order defined in RFC1883 (except dest opts)
62 """
63
64 header_str = ""
65
66 for hdr in ext_hdrs:
67 if not self.extension_hdrs[hdr] is None:
68 header_str += str(self.extension_hdrs[hdr])
69 return header_str
70
71
85
88 set_proto = classmethod(set_proto)
89
92 get_proto = classmethod(get_proto)
93
94
95 import ip
96 IP6._protosw.update(ip.IP._protosw)
97
99 """
100 An extension header is very similar to a 'sub-packet'.
101 We just want to re-use all the hdr unpacking etc.
102 """
103 pass
104
106 __hdr__ = (
107 ('nxt', 'B', 0),
108 ('len', 'B', 0)
109 )
110
112 dpkt.Packet.unpack(self, buf)
113 setattr(self, 'length', (self.len + 1) * 8)
114 options = []
115
116 index = 0
117
118 while (index < self.length - 2):
119 opt_type = ord(self.data[index])
120
121
122 if opt_type == 0:
123 index += 1
124 continue;
125
126 opt_length = ord(self.data[index + 1])
127
128 if opt_type == 1:
129
130 index += opt_length + 2
131 continue
132
133 options.append({'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]})
134
135
136 index += opt_length + 2
137
138 setattr(self, 'options', options)
139
141
143
145 __hdr__ = (
146 ('nxt', 'B', 0),
147 ('len', 'B', 0),
148 ('type', 'B', 0),
149 ('segs_left', 'B', 0),
150 ('rsvd_sl_bits', 'I', 0),
151 )
152
157 sl_bits = property(_get_sl_bits, _set_sl_bits)
158
160 hdr_size = 8
161 addr_size = 16
162
163 dpkt.Packet.unpack(self, buf)
164
165 addresses = []
166 num_addresses = self.len / 2
167 buf = buf[hdr_size:hdr_size + num_addresses * addr_size]
168
169 for i in range(num_addresses):
170 addresses.append(buf[i * addr_size: i * addr_size + addr_size])
171
172 self.data = buf
173 setattr(self, 'addresses', addresses)
174 setattr(self, 'length', self.len * 8 + 8)
175
177 __hdr__ = (
178 ('nxt', 'B', 0),
179 ('resv', 'B', 0),
180 ('frag_off_resv_m', 'H', 0),
181 ('id', 'I', 0)
182 )
183
187
192 frag_off = property(_get_frag_off, _set_frag_off)
193
198 m_flag = property(_get_m_flag, _set_m_flag)
199
201 __hdr__ = (
202 ('nxt', 'B', 0),
203 ('len', 'B', 0),
204 ('resv', 'H', 0),
205 ('spi', 'I', 0),
206 ('seq', 'I', 0)
207 )
208
210 dpkt.Packet.unpack(self, buf)
211 setattr(self, 'length', (self.len + 2) * 4)
212 setattr(self, 'auth_data', self.data[:(self.len - 1) * 4])
213
214
217 raise NotImplementedError("ESP extension headers are not supported.")
218
219
220 ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, ip.IP_PROTO_DSTOPTS]
221 ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader,
222 ip.IP_PROTO_ROUTING: IP6RoutingHeader,
223 ip.IP_PROTO_FRAGMENT: IP6FragmentHeader,
224 ip.IP_PROTO_ESP: IP6ESPHeader,
225 ip.IP_PROTO_AH: IP6AHHeader,
226 ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader}
227
228 if __name__ == '__main__':
229 import unittest
230
232
234 s = '`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00'
235 ip = IP6(s)
236
237 ip.data.sum = 0
238 s2 = str(ip)
239 ip2 = IP6(s)
240
241 assert(s == s2)
242
244 s = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
245 ip = IP6(s)
246 s2 = str(ip)
247
248 assert(len(ip.extension_hdrs[43].addresses) == 2)
249 assert(ip.tcp)
250 assert(s == s2)
251
252
254 s = '\x06\xee\xff\xfb\x00\x00\xff\xff'
255 fh = IP6FragmentHeader(s)
256 s2 = str(fh)
257 assert(fh.nxt == 6)
258 assert(fh.id == 65535)
259 assert(fh.frag_off == 8191)
260 assert(fh.m_flag == 1)
261
263 s = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
264 options = IP6OptsHeader(s).options
265 assert(len(options) == 3)
266
268 s = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
269 ah = IP6AHHeader(s)
270 assert(ah.length == 24)
271 assert(ah.auth_data == 'xxxxxxxx')
272 assert(ah.spi == 0x2020202)
273 assert(ah.seq == 0x1010101)
274
276 p = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
277 ip = IP6(p)
278
279 o = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
280 options = IP6HopOptsHeader(o)
281
282 ip.extension_hdrs[0] = options
283
284 fh = '\x06\xee\xff\xfb\x00\x00\xff\xff'
285 ip.extension_hdrs[44] = IP6FragmentHeader(fh)
286
287 ah = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
288 ip.extension_hdrs[51] = IP6AHHeader(ah)
289
290 do = ';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
291 ip.extension_hdrs[60] = IP6DstOptsHeader(do)
292
293 assert(len([k for k in ip.extension_hdrs if (not ip.extension_hdrs[k] is None)]) == 5)
294
295 unittest.main()
296