Package dpkt :: Module pcapng
[hide private]
[frames] | no frames]

Source Code for Module dpkt.pcapng

  1  """pcap Next Generation file format""" 
  2  # Spec: https://pcapng.github.io/pcapng/ 
  3   
  4  # pylint: disable=no-member 
  5  # pylint: disable=attribute-defined-outside-init 
  6  from __future__ import print_function 
  7  from __future__ import absolute_import 
  8   
  9  from struct import pack as struct_pack, unpack as struct_unpack 
 10  from time import time 
 11  import sys 
 12   
 13  from . import dpkt 
 14  from .compat import BytesIO 
 15   
 16  BYTE_ORDER_MAGIC = 0x1A2B3C4D 
 17  BYTE_ORDER_MAGIC_LE = 0x4D3C2B1A 
 18   
 19  PCAPNG_VERSION_MAJOR = 1 
 20  PCAPNG_VERSION_MINOR = 0 
 21   
 22  # Block types 
 23  PCAPNG_BT_IDB = 0x00000001     # Interface Description Block 
 24  PCAPNG_BT_PB = 0x00000002      # Packet Block (deprecated) 
 25  PCAPNG_BT_SPB = 0x00000003     # Simple Packet Block 
 26  PCAPNG_BT_EPB = 0x00000006     # Enhanced Packet Block 
 27  PCAPNG_BT_SHB = 0x0A0D0D0A     # Section Header Block 
 28   
 29  # Options 
 30  PCAPNG_OPT_ENDOFOPT = 0        # end of options 
 31  PCAPNG_OPT_COMMENT = 1         # comment 
 32   
 33  # SHB options 
 34  PCAPNG_OPT_SHB_HARDWARE = 2    # description of the hardware 
 35  PCAPNG_OPT_SHB_OS = 3          # name of the operating system 
 36  PCAPNG_OPT_SHB_USERAPPL = 4    # name of the application 
 37   
 38  # IDB options 
 39  PCAPNG_OPT_IF_NAME = 2         # interface name 
 40  PCAPNG_OPT_IF_DESCRIPTION = 3  # interface description 
 41  PCAPNG_OPT_IF_IPV4ADDR = 4     # IPv4 network address and netmask for the interface 
 42  PCAPNG_OPT_IF_IPV6ADDR = 5     # IPv6 network address and prefix length for the interface 
 43  PCAPNG_OPT_IF_MACADDR = 6      # interface hardware MAC address 
 44  PCAPNG_OPT_IF_EUIADDR = 7      # interface hardware EUI address 
 45  PCAPNG_OPT_IF_SPEED = 8        # interface speed in bits/s 
 46  PCAPNG_OPT_IF_TSRESOL = 9      # timestamp resolution 
 47  PCAPNG_OPT_IF_TZONE = 10       # time zone 
 48  PCAPNG_OPT_IF_FILTER = 11      # capture filter 
 49  PCAPNG_OPT_IF_OS = 12          # operating system 
 50  PCAPNG_OPT_IF_FCSLEN = 13      # length of the Frame Check Sequence in bits 
 51  PCAPNG_OPT_IF_TSOFFSET = 14    # offset (in seconds) that must be added to packet timestamp 
 52   
 53  # <copied from pcap.py> 
 54  DLT_NULL = 0 
 55  DLT_EN10MB = 1 
 56  DLT_EN3MB = 2 
 57  DLT_AX25 = 3 
 58  DLT_PRONET = 4 
 59  DLT_CHAOS = 5 
 60  DLT_IEEE802 = 6 
 61  DLT_ARCNET = 7 
 62  DLT_SLIP = 8 
 63  DLT_PPP = 9 
 64  DLT_FDDI = 10 
 65  DLT_PFSYNC = 18 
 66  DLT_IEEE802_11 = 105 
 67  DLT_LINUX_SLL = 113 
 68  DLT_PFLOG = 117 
 69  DLT_IEEE802_11_RADIO = 127 
 70   
 71  if sys.platform.find('openbsd') != -1: 
 72      DLT_LOOP = 12 
 73      DLT_RAW = 14 
 74  else: 
 75      DLT_LOOP = 108 
 76      DLT_RAW = 12 
 77   
 78  dltoff = {DLT_NULL: 4, DLT_EN10MB: 14, DLT_IEEE802: 22, DLT_ARCNET: 6, 
 79            DLT_SLIP: 16, DLT_PPP: 4, DLT_FDDI: 21, DLT_PFLOG: 48, DLT_PFSYNC: 4, 
 80            DLT_LOOP: 4, DLT_LINUX_SLL: 16} 
81 # </copied from pcap.py> 82 83 84 -def _swap32b(i):
85 """Swap endianness of an uint32""" 86 return struct_unpack('<I', struct_pack('>I', i))[0]
87
88 89 -def _align32b(i):
90 """Return int `i` aligned to the 32-bit boundary""" 91 r = i % 4 92 return i if not r else i + 4 - r
93
94 95 -def _padded(s):
96 """Return bytes `s` padded with zeroes to align to the 32-bit boundary""" 97 return struct_pack('%ss' % _align32b(len(s)), s)
98
99 100 -def _padlen(s):
101 """Return size of padding required to align str `s` to the 32-bit boundary""" 102 return _align32b(len(s)) - len(s)
103
104 105 -class _PcapngBlock(dpkt.Packet):
106 107 """Base class for a pcapng block with Options""" 108 109 __hdr__ = ( 110 ('type', 'I', 0), # block type 111 ('len', 'I', 12), # block total length: total size of this block, in octets 112 #( body, variable size ) 113 ('_len', 'I', 12), # dup of len 114 ) 115
116 - def unpack_hdr(self, buf):
117 dpkt.Packet.unpack(self, buf)
118
119 - def unpack(self, buf):
120 dpkt.Packet.unpack(self, buf) 121 if self.len > len(buf): 122 raise dpkt.NeedData 123 self._do_unpack_options(buf)
124
125 - def _do_unpack_options(self, buf, oo=None):
126 self.opts = [] 127 self.data = '' 128 oo = oo or self.__hdr_len__ - 4 # options offset 129 ol = self.len - oo - 4 # length 130 131 opts_buf = buf[oo:oo + ol] 132 while opts_buf: 133 opt = (PcapngOptionLE(opts_buf) if self.__hdr_fmt__[0] == '<' 134 else PcapngOption(opts_buf)) 135 self.opts.append(opt) 136 137 opts_buf = opts_buf[len(opt):] 138 if opt.code == PCAPNG_OPT_ENDOFOPT: 139 break 140 141 # duplicate total length field 142 self._len = struct_unpack(self.__hdr_fmt__[0] + 'I', buf[-4:])[0] 143 if self._len != self.len: 144 raise dpkt.UnpackError('length fields do not match')
145
146 - def _do_pack_options(self):
147 if not getattr(self, 'opts', None): 148 return b'' 149 if self.opts[-1].code != PCAPNG_OPT_ENDOFOPT: 150 raise dpkt.PackError('options must end with opt_endofopt') 151 return b''.join(bytes(o) for o in self.opts)
152
153 - def __bytes__(self):
154 opts_buf = self._do_pack_options() 155 self.len = self._len = self.__hdr_len__ + len(opts_buf) 156 157 hdr_buf = dpkt.Packet.pack_hdr(self) 158 return hdr_buf[:-4] + opts_buf + hdr_buf[-4:]
159
160 - def __len__(self):
161 if not getattr(self, 'opts', None): 162 return self.__hdr_len__ 163 164 opts_len = sum(len(o) for o in self.opts) 165 return self.__hdr_len__ + opts_len
166
167 168 -class PcapngBlockLE(_PcapngBlock):
169 __byte_order__ = '<'
170
171 172 -class PcapngOption(dpkt.Packet):
173 174 """A single Option""" 175 176 __hdr__ = ( 177 ('code', 'H', PCAPNG_OPT_ENDOFOPT), 178 ('len', 'H', 0), 179 ) 180
181 - def unpack(self, buf):
182 dpkt.Packet.unpack(self, buf) 183 self.data = buf[self.__hdr_len__:self.__hdr_len__ + self.len] 184 185 # decode comment 186 if self.code == PCAPNG_OPT_COMMENT: 187 self.text = self.data.decode('utf-8')
188 189
190 - def __bytes__(self):
191 #return dpkt.Packet.__bytes__(self) 192 # encode comment 193 if self.code == PCAPNG_OPT_COMMENT: 194 text = getattr(self, 'text', self.data) 195 196 self.data = text.encode('utf-8') if not isinstance(text, bytes) else text 197 198 self.len = len(self.data) 199 return dpkt.Packet.pack_hdr(self) + _padded(self.data)
200
201 - def __len__(self):
202 return self.__hdr_len__ + len(self.data) + _padlen(self.data)
203
204 - def __repr__(self):
205 if self.code == PCAPNG_OPT_ENDOFOPT: 206 return '{0}(opt_endofopt)'.format(self.__class__.__name__) 207 else: 208 return dpkt.Packet.__repr__(self)
209
210 211 -class PcapngOptionLE(PcapngOption):
212 __byte_order__ = '<'
213
214 215 -class SectionHeaderBlock(_PcapngBlock):
216 217 """Section Header block""" 218 219 __hdr__ = ( 220 ('type', 'I', PCAPNG_BT_SHB), 221 ('len', 'I', 28), 222 ('bom', 'I', BYTE_ORDER_MAGIC), 223 ('v_major', 'H', PCAPNG_VERSION_MAJOR), 224 ('v_minor', 'H', PCAPNG_VERSION_MINOR), 225 ('sec_len', 'q', -1), # section length, -1 = auto 226 #( options, variable size ) 227 ('_len', 'I', 28) 228 )
229
230 231 -class SectionHeaderBlockLE(SectionHeaderBlock):
232 __byte_order__ = '<'
233
234 235 -class InterfaceDescriptionBlock(_PcapngBlock):
236 237 """Interface Description block""" 238 239 __hdr__ = ( 240 ('type', 'I', PCAPNG_BT_IDB), 241 ('len', 'I', 20), 242 ('linktype', 'H', DLT_EN10MB), 243 ('_reserved', 'H', 0), 244 ('snaplen', 'I', 1500), 245 #( options, variable size ) 246 ('_len', 'I', 20) 247 )
248
249 250 -class InterfaceDescriptionBlockLE(InterfaceDescriptionBlock):
251 __byte_order__ = '<'
252
253 254 -class EnhancedPacketBlock(_PcapngBlock):
255 256 """Enhanced Packet block""" 257 258 __hdr__ = ( 259 ('type', 'I', PCAPNG_BT_EPB), 260 ('len', 'I', 64), 261 ('iface_id', 'I', 0), 262 ('ts_high', 'I', 0), # timestamp high 263 ('ts_low', 'I', 0), # timestamp low 264 ('caplen', 'I', 0), # captured len, size of pkt_data 265 ('pkt_len', 'I', 0), # actual packet len 266 #( pkt_data, variable size ) 267 #( options, variable size ) 268 ('_len', 'I', 64) 269 ) 270
271 - def unpack(self, buf):
272 dpkt.Packet.unpack(self, buf) 273 if self.len > len(buf): 274 raise dpkt.NeedData 275 276 # packet data 277 po = self.__hdr_len__ - 4 # offset of pkt_data 278 self.pkt_data = buf[po:po + self.caplen] 279 280 # skip padding between pkt_data and options 281 opts_offset = po + _align32b(self.caplen) 282 self._do_unpack_options(buf, opts_offset)
283
284 - def __bytes__(self):
285 pkt_buf = self.pkt_data 286 self.caplen = self.pkt_len = len(pkt_buf) 287 288 opts_buf = self._do_pack_options() 289 self.len = self._len = self.__hdr_len__ + _align32b(self.caplen) + len(opts_buf) 290 291 hdr_buf = dpkt.Packet.pack_hdr(self) 292 return hdr_buf[:-4] + _padded(pkt_buf) + opts_buf + hdr_buf[-4:]
293
294 - def __len__(self):
295 opts_len = sum(len(o) for o in self.opts) 296 return self.__hdr_len__ + _align32b(self.caplen) + opts_len
297
298 299 -class EnhancedPacketBlockLE(EnhancedPacketBlock):
300 __byte_order__ = '<'
301
302 303 -class Writer(object):
304 305 """Simple pcapng dumpfile writer.""" 306
307 - def __init__(self, fileobj, snaplen=1500, linktype=DLT_EN10MB, shb=None, idb=None):
308 """ 309 Create a pcapng dumpfile writer for the given fileobj. 310 311 shb can be an instance of SectionHeaderBlock(LE) 312 idb can be an instance of InterfaceDescriptionBlock(LE) 313 """ 314 self.__f = fileobj 315 self.__le = sys.byteorder == 'little' 316 317 if shb: 318 self._validate_block('shb', shb, SectionHeaderBlock) 319 if idb: 320 self._validate_block('idb', idb, InterfaceDescriptionBlock) 321 322 if self.__le: 323 shb = shb or SectionHeaderBlockLE() 324 idb = idb or InterfaceDescriptionBlockLE(snaplen=snaplen, linktype=linktype) 325 else: 326 shb = shb or SectionHeaderBlock() 327 idb = idb or InterfaceDescriptionBlock(snaplen=snaplen, linktype=linktype) 328 329 self.__f.write(bytes(shb)) 330 self.__f.write(bytes(idb))
331
332 - def _validate_block(self, arg_name, blk, expected_cls):
333 """Check a user-defined block for correct type and endianness""" 334 if not isinstance(blk, expected_cls): 335 raise ValueError('{0}: expecting class {1}'.format( 336 arg_name, expected_cls.__name__)) 337 338 if self.__le and blk.__hdr_fmt__[0] == '>': 339 raise ValueError('{0}: expecting class {1}LE on a little-endian system'.format( 340 arg_name, expected_cls.__name__)) 341 342 if not self.__le and blk.__hdr_fmt__[0] == '<': 343 raise ValueError('{0}: expecting class {1} on a big-endian system'.format( 344 arg_name, expected_cls.__name__.replace('LE', '')))
345
346 - def writepkt(self, pkt, ts=None):
347 """ 348 Write a single packet with its timestamp. 349 350 pkt can be a buffer or an instance of EnhancedPacketBlock(LE) 351 ts is a Unix timestamp in seconds since Epoch (e.g. 1454725786.99) 352 """ 353 if isinstance(pkt, EnhancedPacketBlock): 354 self._validate_block('pkt', pkt, EnhancedPacketBlock) 355 356 if ts is not None: # ts as an argument gets precedence 357 ts = int(round(ts * 1e6)) 358 elif pkt.ts_high == pkt.ts_low == 0: 359 ts = int(round(time() * 1e6)) 360 361 if ts is not None: 362 pkt.ts_high = ts >> 32 363 pkt.ts_low = ts & 0xffffffff 364 365 self.__f.write(bytes(pkt)) 366 return 367 368 # pkt is a buffer - wrap it into an EPB 369 if ts is None: 370 ts = time() 371 ts = int(round(ts * 1e6)) # to int microseconds 372 373 s = bytes(pkt) 374 n = len(s) 375 376 kls = EnhancedPacketBlockLE if self.__le else EnhancedPacketBlock 377 epb = kls(ts_high=ts >> 32, ts_low=ts & 0xffffffff, caplen=n, pkt_len=n, pkt_data=s) 378 self.__f.write(bytes(epb))
379
380 - def close(self):
381 self.__f.close()
382
383 384 -class Reader(object):
385 386 """Simple pypcap-compatible pcapng file reader.""" 387
388 - def __init__(self, fileobj):
389 self.name = getattr(fileobj, 'name', '<{0}>'.format(fileobj.__class__.__name__)) 390 self.__f = fileobj 391 392 shb = SectionHeaderBlock() 393 buf = self.__f.read(shb.__hdr_len__) 394 if len(buf) < shb.__hdr_len__: 395 raise ValueError('invalid pcapng header') 396 397 # unpack just the header since endianness is not known 398 shb.unpack_hdr(buf) 399 if shb.type != PCAPNG_BT_SHB: 400 raise ValueError('invalid pcapng header: not a SHB') 401 402 # determine the correct byte order and reload full SHB 403 if shb.bom == BYTE_ORDER_MAGIC_LE: 404 self.__le = True 405 buf += self.__f.read(_swap32b(shb.len) - shb.__hdr_len__) 406 shb = SectionHeaderBlockLE(buf) 407 elif shb.bom == BYTE_ORDER_MAGIC: 408 self.__le = False 409 buf += self.__f.read(shb.len - shb.__hdr_len__) 410 shb = SectionHeaderBlock(buf) 411 else: 412 raise ValueError('unknown endianness') 413 414 # check if this version is supported 415 if shb.v_major != PCAPNG_VERSION_MAJOR: 416 raise ValueError('unknown pcapng version {0}.{1}'.format(shb.v_major, shb.v_minor,)) 417 418 # look for a mandatory IDB 419 idb = None 420 while 1: 421 buf = self.__f.read(8) 422 if len(buf) < 8: 423 break 424 425 blk_type, blk_len = struct_unpack('<II' if self.__le else '>II', buf) 426 buf += self.__f.read(blk_len - 8) 427 428 if blk_type == PCAPNG_BT_IDB: 429 idb = (InterfaceDescriptionBlockLE(buf) if self.__le 430 else InterfaceDescriptionBlock(buf)) 431 break 432 # just skip other blocks 433 434 if idb is None: 435 raise ValueError('IDB not found') 436 437 # set timestamp resolution and offset 438 self._divisor = float(1e6) # defaults 439 self._tsoffset = 0 440 for opt in idb.opts: 441 if opt.code == PCAPNG_OPT_IF_TSRESOL: 442 # if MSB=0, the remaining bits is a neg power of 10 (e.g. 6 means microsecs) 443 # if MSB=1, the remaining bits is a neg power of 2 (e.g. 10 means 1/1024 of second) 444 opt_val = struct_unpack('b', opt.data)[0] 445 pow_num = 2 if opt_val & 0b10000000 else 10 446 self._divisor = float(pow_num ** (opt_val & 0b01111111)) 447 448 elif opt.code == PCAPNG_OPT_IF_TSOFFSET: 449 # 64-bit int that specifies an offset (in seconds) that must be added to the 450 # timestamp of each packet 451 self._tsoffset = struct_unpack('<q' if self.__le else '>q', opt.data)[0] 452 453 if idb.linktype in dltoff: 454 self.dloff = dltoff[idb.linktype] 455 else: 456 self.dloff = 0 457 458 self.idb = idb 459 self.snaplen = idb.snaplen 460 self.filter = '' 461 self.__iter = iter(self)
462 463 @property
464 - def fd(self):
465 return self.__f.fileno()
466
467 - def fileno(self):
468 return self.fd
469 472
473 - def setfilter(self, value, optimize=1):
474 return NotImplementedError
475
476 - def readpkts(self):
477 return list(self)
478
479 - def next(self):
480 return next(self.__iter)
481
482 - def dispatch(self, cnt, callback, *args):
483 """Collect and process packets with a user callback. 484 485 Return the number of packets processed, or 0 for a savefile. 486 487 Arguments: 488 489 cnt -- number of packets to process; 490 or 0 to process all packets until EOF 491 callback -- function with (timestamp, pkt, *args) prototype 492 *args -- optional arguments passed to callback on execution 493 """ 494 processed = 0 495 if cnt > 0: 496 for _ in range(cnt): 497 try: 498 ts, pkt = next(iter(self)) 499 except StopIteration: 500 break 501 callback(ts, pkt, *args) 502 processed += 1 503 else: 504 for ts, pkt in self: 505 callback(ts, pkt, *args) 506 processed += 1 507 return processed
508
509 - def loop(self, callback, *args):
510 self.dispatch(0, callback, *args)
511
512 - def __iter__(self):
513 while 1: 514 buf = self.__f.read(8) 515 if len(buf) < 8: 516 break 517 518 blk_type, blk_len = struct_unpack('<II' if self.__le else '>II', buf) 519 buf += self.__f.read(blk_len - 8) 520 521 if blk_type == PCAPNG_BT_EPB: 522 epb = EnhancedPacketBlockLE(buf) if self.__le else EnhancedPacketBlock(buf) 523 ts = self._tsoffset + (((epb.ts_high << 32) | epb.ts_low) / self._divisor) 524 yield (ts, epb.pkt_data)
525
526 # just ignore other blocks 527 528 529 ######### 530 # TESTS # 531 ######### 532 533 534 -def test_shb():
535 """Test SHB with options""" 536 buf = ( 537 b'\x0a\x0d\x0d\x0a\x58\x00\x00\x00\x4d\x3c\x2b\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff' 538 b'\xff\xff\x04\x00\x31\x00\x54\x53\x68\x61\x72\x6b\x20\x31\x2e\x31\x30\x2e\x30\x72\x63\x32' 539 b'\x20\x28\x53\x56\x4e\x20\x52\x65\x76\x20\x34\x39\x35\x32\x36\x20\x66\x72\x6f\x6d\x20\x2f' 540 b'\x74\x72\x75\x6e\x6b\x2d\x31\x2e\x31\x30\x29\x00\x00\x00\x00\x00\x00\x00\x58\x00\x00\x00') 541 542 opt_buf = b'\x04\x00\x31\x00TShark 1.10.0rc2 (SVN Rev 49526 from /trunk-1.10)\x00\x00\x00' 543 544 # block unpacking 545 shb = SectionHeaderBlockLE(buf) 546 assert shb.type == PCAPNG_BT_SHB 547 assert shb.bom == BYTE_ORDER_MAGIC 548 assert shb.v_major == 1 549 assert shb.v_minor == 0 550 assert shb.sec_len == -1 551 assert shb.data == '' 552 553 # options unpacking 554 assert len(shb.opts) == 2 555 assert shb.opts[0].code == PCAPNG_OPT_SHB_USERAPPL 556 assert shb.opts[0].data == b'TShark 1.10.0rc2 (SVN Rev 49526 from /trunk-1.10)' 557 assert shb.opts[0].len == len(shb.opts[0].data) 558 559 assert shb.opts[1].code == PCAPNG_OPT_ENDOFOPT 560 assert shb.opts[1].len == 0 561 562 # option packing 563 assert str(shb.opts[0]) == str(opt_buf) 564 assert len(shb.opts[0]) == len(opt_buf) 565 assert bytes(shb.opts[1]) == b'\x00\x00\x00\x00' 566 567 # block packing 568 assert str(shb) == str(buf) 569 assert len(shb) == len(buf)
570
571 572 -def test_idb():
573 """Test IDB with options""" 574 buf = ( 575 b'\x01\x00\x00\x00\x20\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\x09\x00\x01\x00\x06\x00' 576 b'\x00\x00\x00\x00\x00\x00\x20\x00\x00\x00') 577 578 # block unpacking 579 idb = InterfaceDescriptionBlockLE(buf) 580 assert idb.type == PCAPNG_BT_IDB 581 assert idb.linktype == DLT_EN10MB 582 assert idb.snaplen == 0xffff 583 assert idb.data == '' 584 585 # options unpacking 586 assert len(idb.opts) == 2 587 assert idb.opts[0].code == PCAPNG_OPT_IF_TSRESOL 588 assert idb.opts[0].len == 1 589 assert idb.opts[0].data == b'\x06' 590 591 assert idb.opts[1].code == PCAPNG_OPT_ENDOFOPT 592 assert idb.opts[1].len == 0 593 594 # option packing 595 assert bytes(idb.opts[0]) == b'\x09\x00\x01\x00\x06\x00\x00\x00' 596 assert len(idb.opts[0]) == 8 597 assert bytes(idb.opts[1]) == b'\x00\x00\x00\x00' 598 599 # block packing 600 assert str(idb) == str(buf) 601 assert len(idb) == len(buf)
602
603 604 -def test_epb():
605 """Test EPB with a non-ascii comment option""" 606 buf = ( 607 b'\x06\x00\x00\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73\xe6\x04\x00\xbe\x37\xe2\x19\x4a\x00' 608 b'\x00\x00\x4a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x45\x00' 609 b'\x00\x3c\x5d\xb3\x40\x00\x40\x06\xdf\x06\x7f\x00\x00\x01\x7f\x00\x00\x01\x98\x34\x11\x4e' 610 b'\x95\xcb\x2d\x3a\x00\x00\x00\x00\xa0\x02\xaa\xaa\xfe\x30\x00\x00\x02\x04\xff\xd7\x04\x02' 611 b'\x08\x0a\x05\x8f\x70\x89\x00\x00\x00\x00\x01\x03\x03\x07\x00\x00\x01\x00\x0a\x00\xd0\xbf' 612 b'\xd0\xb0\xd0\xba\xd0\xb5\xd1\x82\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00') 613 614 # block unpacking 615 epb = EnhancedPacketBlockLE(buf) 616 assert epb.type == PCAPNG_BT_EPB 617 assert epb.caplen == len(epb.pkt_data) 618 assert epb.pkt_len == len(epb.pkt_data) 619 assert epb.caplen == 74 620 assert epb.ts_high == 321139 621 assert epb.ts_low == 434255806 622 assert epb.data == '' 623 624 # options unpacking 625 assert len(epb.opts) == 2 626 assert epb.opts[0].code == PCAPNG_OPT_COMMENT 627 assert epb.opts[0].text == u'\u043f\u0430\u043a\u0435\u0442' 628 629 assert epb.opts[1].code == PCAPNG_OPT_ENDOFOPT 630 assert epb.opts[1].len == 0 631 632 # option packing 633 assert bytes(epb.opts[0]) == b'\x01\x00\x0a\x00\xd0\xbf\xd0\xb0\xd0\xba\xd0\xb5\xd1\x82\x00\x00' 634 assert len(epb.opts[0]) == 16 635 assert bytes(epb.opts[1]) == b'\x00\x00\x00\x00' 636 637 # block packing 638 assert str(epb) == str(buf) 639 assert len(epb) == len(buf)
640
641 642 -def test_simple_write_read():
643 """Test writing a basic pcapng and then reading it""" 644 fobj = BytesIO() 645 646 writer = Writer(fobj, snaplen=0x2000, linktype=DLT_LINUX_SLL) 647 writer.writepkt(b'foo', ts=1454725786.526401) 648 fobj.flush() 649 fobj.seek(0) 650 651 reader = Reader(fobj) 652 assert reader.snaplen == 0x2000 653 assert reader.datalink() == DLT_LINUX_SLL 654 655 ts, buf1 = next(iter(reader)) 656 assert ts == 1454725786.526401 657 assert buf1 == b'foo' 658 659 # test dispatch() 660 fobj.seek(0) 661 reader = Reader(fobj) 662 assert reader.dispatch(1, lambda ts, pkt: None) == 1 663 assert reader.dispatch(1, lambda ts, pkt: None) == 0 664 fobj.close()
665
666 667 -def test_custom_read_write():
668 """Test a full pcapng file with 1 ICMP packet""" 669 buf = ( 670 b'\x0a\x0d\x0d\x0a\x7c\x00\x00\x00\x4d\x3c\x2b\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff' 671 b'\xff\xff\x03\x00\x1e\x00\x36\x34\x2d\x62\x69\x74\x20\x57\x69\x6e\x64\x6f\x77\x73\x20\x38' 672 b'\x2e\x31\x2c\x20\x62\x75\x69\x6c\x64\x20\x39\x36\x30\x30\x00\x00\x04\x00\x34\x00\x44\x75' 673 b'\x6d\x70\x63\x61\x70\x20\x31\x2e\x31\x32\x2e\x37\x20\x28\x76\x31\x2e\x31\x32\x2e\x37\x2d' 674 b'\x30\x2d\x67\x37\x66\x63\x38\x39\x37\x38\x20\x66\x72\x6f\x6d\x20\x6d\x61\x73\x74\x65\x72' 675 b'\x2d\x31\x2e\x31\x32\x29\x00\x00\x00\x00\x7c\x00\x00\x00\x01\x00\x00\x00\x7c\x00\x00\x00' 676 b'\x01\x00\x00\x00\x00\x00\x04\x00\x02\x00\x32\x00\x5c\x44\x65\x76\x69\x63\x65\x5c\x4e\x50' 677 b'\x46\x5f\x7b\x33\x42\x42\x46\x32\x31\x41\x37\x2d\x39\x31\x41\x45\x2d\x34\x44\x44\x42\x2d' 678 b'\x41\x42\x32\x43\x2d\x43\x37\x38\x32\x39\x39\x39\x43\x32\x32\x44\x35\x7d\x00\x00\x09\x00' 679 b'\x01\x00\x06\x00\x00\x00\x0c\x00\x1e\x00\x36\x34\x2d\x62\x69\x74\x20\x57\x69\x6e\x64\x6f' 680 b'\x77\x73\x20\x38\x2e\x31\x2c\x20\x62\x75\x69\x6c\x64\x20\x39\x36\x30\x30\x00\x00\x00\x00' 681 b'\x00\x00\x7c\x00\x00\x00\x06\x00\x00\x00\x84\x00\x00\x00\x00\x00\x00\x00\x63\x20\x05\x00' 682 b'\xd6\xc4\xab\x0b\x4a\x00\x00\x00\x4a\x00\x00\x00\x08\x00\x27\x96\xcb\x7c\x52\x54\x00\x12' 683 b'\x35\x02\x08\x00\x45\x00\x00\x3c\xa4\x40\x00\x00\x1f\x01\x27\xa2\xc0\xa8\x03\x28\x0a\x00' 684 b'\x02\x0f\x00\x00\x56\xf0\x00\x01\x00\x6d\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c' 685 b'\x4d\x4e\x4f\x50\x51\x52\x53\x54\x55\x56\x57\x41\x42\x43\x44\x45\x46\x47\x48\x49\x00\x00' 686 b'\x01\x00\x0f\x00\x64\x70\x6b\x74\x20\x69\x73\x20\x61\x77\x65\x73\x6f\x6d\x65\x00\x00\x00' 687 b'\x00\x00\x84\x00\x00\x00') 688 689 fobj = BytesIO(buf) 690 691 # test reading 692 reader = Reader(fobj) 693 assert reader.snaplen == 0x40000 694 assert reader.datalink() == DLT_EN10MB 695 696 assert reader.idb.opts[0].data.decode('utf-8') == '\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}' 697 assert reader.idb.opts[2].data.decode('utf-8') == '64-bit Windows 8.1, build 9600' 698 699 ts, buf1 = next(iter(reader)) 700 assert ts == 1442984653.2108380 701 assert len(buf1) == 74 702 703 assert buf1.startswith(b'\x08\x00\x27\x96') 704 assert buf1.endswith(b'FGHI') 705 fobj.close() 706 707 # test pcapng customized writing 708 shb = SectionHeaderBlockLE(opts=[ 709 PcapngOptionLE(code=3, data=b'64-bit Windows 8.1, build 9600'), 710 PcapngOptionLE(code=4, data=b'Dumpcap 1.12.7 (v1.12.7-0-g7fc8978 from master-1.12)'), 711 PcapngOptionLE() 712 ]) 713 idb = InterfaceDescriptionBlockLE(snaplen=0x40000, opts=[ 714 PcapngOptionLE(code=2, data=b'\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}'), 715 PcapngOptionLE(code=9, data=b'\x06'), 716 PcapngOptionLE(code=12, data=b'64-bit Windows 8.1, build 9600'), 717 PcapngOptionLE() 718 ]) 719 epb = EnhancedPacketBlockLE(opts=[ 720 PcapngOptionLE(code=1, text=b'dpkt is awesome'), 721 PcapngOptionLE() 722 ], pkt_data=( 723 b'\x08\x00\x27\x96\xcb\x7c\x52\x54\x00\x12\x35\x02\x08\x00\x45\x00\x00\x3c\xa4\x40\x00\x00' 724 b'\x1f\x01\x27\xa2\xc0\xa8\x03\x28\x0a\x00\x02\x0f\x00\x00\x56\xf0\x00\x01\x00\x6d\x41\x42' 725 b'\x43\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c\x4d\x4e\x4f\x50\x51\x52\x53\x54\x55\x56\x57\x41' 726 b'\x42\x43\x44\x45\x46\x47\x48\x49' 727 )) 728 fobj = BytesIO() 729 writer = Writer(fobj, shb=shb, idb=idb) 730 writer.writepkt(epb, ts=1442984653.210838) 731 assert fobj.getvalue() == buf 732 fobj.close() 733 734 # same with timestamps defined inside EPB 735 epb.ts_high = 335971 736 epb.ts_low = 195806422 737 738 fobj = BytesIO() 739 writer = Writer(fobj, shb=shb, idb=idb) 740 writer.writepkt(epb) 741 assert fobj.getvalue() == buf 742 fobj.close()
743 744 745 if __name__ == '__main__': 746 # TODO: big endian unit tests; could not find any examples.. 747 748 test_shb() 749 test_idb() 750 test_epb() 751 test_simple_write_read() 752 test_custom_read_write() 753 repr(PcapngOptionLE()) 754 755 print('Tests Successful...') 756