Package dpkt :: Module tcp
[hide private]
[frames] | no frames]

Source Code for Module dpkt.tcp

  1  # $Id: tcp.py 42 2007-08-02 22:38:47Z jon.oberheide $ 
  2  # -*- coding: utf-8 -*- 
  3  """Transmission Control Protocol.""" 
  4  from __future__ import print_function 
  5  from __future__ import absolute_import 
  6   
  7  from . import dpkt 
  8  from .decorators import deprecated 
  9  from .compat import compat_ord 
 10   
 11  # TCP control flags 
 12  TH_FIN = 0x01  # end of data 
 13  TH_SYN = 0x02  # synchronize sequence numbers 
 14  TH_RST = 0x04  # reset connection 
 15  TH_PUSH = 0x08  # push 
 16  TH_ACK = 0x10  # acknowledgment number set 
 17  TH_URG = 0x20  # urgent pointer set 
 18  TH_ECE = 0x40  # ECN echo, RFC 3168 
 19  TH_CWR = 0x80  # congestion window reduced 
 20   
 21  TCP_PORT_MAX = 65535  # maximum port 
 22  TCP_WIN_MAX = 65535  # maximum (unscaled) window 
23 24 25 -class TCP(dpkt.Packet):
26 """Transmission Control Protocol. 27 28 TODO: Longer class information.... 29 30 Attributes: 31 __hdr__: Header fields of TCP. 32 TODO. 33 """ 34 35 __hdr__ = ( 36 ('sport', 'H', 0xdead), 37 ('dport', 'H', 0), 38 ('seq', 'I', 0xdeadbeef), 39 ('ack', 'I', 0), 40 ('_off', 'B', ((5 << 4) | 0)), 41 ('flags', 'B', TH_SYN), 42 ('win', 'H', TCP_WIN_MAX), 43 ('sum', 'H', 0), 44 ('urp', 'H', 0) 45 ) 46 opts = b'' 47 48 @property
49 - def off(self):
50 return self._off >> 4
51 52 @off.setter
53 - def off(self, off):
54 self._off = (off << 4) | (self._off & 0xf)
55
56 - def __len__(self):
57 return self.__hdr_len__ + len(self.opts) + len(self.data)
58
59 - def __bytes__(self):
60 return self.pack_hdr() + bytes(self.opts) + bytes(self.data)
61
62 - def unpack(self, buf):
63 dpkt.Packet.unpack(self, buf) 64 ol = ((self._off >> 4) << 2) - self.__hdr_len__ 65 if ol < 0: 66 raise dpkt.UnpackError('invalid header length') 67 self.opts = buf[self.__hdr_len__:self.__hdr_len__ + ol] 68 self.data = buf[self.__hdr_len__ + ol:]
69 70 # Options (opt_type) - http://www.iana.org/assignments/tcp-parameters 71 TCP_OPT_EOL = 0 # end of option list 72 TCP_OPT_NOP = 1 # no operation 73 TCP_OPT_MSS = 2 # maximum segment size 74 TCP_OPT_WSCALE = 3 # window scale factor, RFC 1072 75 TCP_OPT_SACKOK = 4 # SACK permitted, RFC 2018 76 TCP_OPT_SACK = 5 # SACK, RFC 2018 77 TCP_OPT_ECHO = 6 # echo (obsolete), RFC 1072 78 TCP_OPT_ECHOREPLY = 7 # echo reply (obsolete), RFC 1072 79 TCP_OPT_TIMESTAMP = 8 # timestamp, RFC 1323 80 TCP_OPT_POCONN = 9 # partial order conn, RFC 1693 81 TCP_OPT_POSVC = 10 # partial order service, RFC 1693 82 TCP_OPT_CC = 11 # connection count, RFC 1644 83 TCP_OPT_CCNEW = 12 # CC.NEW, RFC 1644 84 TCP_OPT_CCECHO = 13 # CC.ECHO, RFC 1644 85 TCP_OPT_ALTSUM = 14 # alt checksum request, RFC 1146 86 TCP_OPT_ALTSUMDATA = 15 # alt checksum data, RFC 1146 87 TCP_OPT_SKEETER = 16 # Skeeter 88 TCP_OPT_BUBBA = 17 # Bubba 89 TCP_OPT_TRAILSUM = 18 # trailer checksum 90 TCP_OPT_MD5 = 19 # MD5 signature, RFC 2385 91 TCP_OPT_SCPS = 20 # SCPS capabilities 92 TCP_OPT_SNACK = 21 # selective negative acks 93 TCP_OPT_REC = 22 # record boundaries 94 TCP_OPT_CORRUPT = 23 # corruption experienced 95 TCP_OPT_SNAP = 24 # SNAP 96 TCP_OPT_TCPCOMP = 26 # TCP compression filter 97 TCP_OPT_MAX = 27
98 99 100 -def parse_opts(buf):
101 """Parse TCP option buffer into a list of (option, data) tuples.""" 102 opts = [] 103 while buf: 104 o = compat_ord(buf[0]) 105 if o > TCP_OPT_NOP: 106 try: 107 # advance buffer at least 2 bytes = 1 type + 1 length 108 l = max(2, compat_ord(buf[1])) 109 d, buf = buf[2:l], buf[l:] 110 except (IndexError, ValueError): 111 # print 'bad option', repr(str(buf)) 112 opts.append(None) # XXX 113 break 114 else: 115 # options 0 and 1 are not followed by length byte 116 d, buf = b'', buf[1:] 117 opts.append((o, d)) 118 return opts
119
120 121 -def test_parse_opts():
122 # normal scenarios 123 buf = b'\x02\x04\x23\x00\x01\x01\x04\x02' 124 opts = parse_opts(buf) 125 assert opts == [ 126 (TCP_OPT_MSS, b'\x23\x00'), 127 (TCP_OPT_NOP, b''), 128 (TCP_OPT_NOP, b''), 129 (TCP_OPT_SACKOK, b'') 130 ] 131 132 buf = b'\x01\x01\x05\x0a\x37\xf8\x19\x70\x37\xf8\x29\x78' 133 opts = parse_opts(buf) 134 assert opts == [ 135 (TCP_OPT_NOP, b''), 136 (TCP_OPT_NOP, b''), 137 (TCP_OPT_SACK, b'\x37\xf8\x19\x70\x37\xf8\x29\x78') 138 ] 139 140 # test a zero-length option 141 buf = b'\x02\x00\x01' 142 opts = parse_opts(buf) 143 assert opts == [ 144 (TCP_OPT_MSS, b''), 145 (TCP_OPT_NOP, b'') 146 ] 147 148 # test a one-byte malformed option 149 buf = b'\xff' 150 opts = parse_opts(buf) 151 assert opts == [None]
152
153 -def test_offset():
154 tcpheader = TCP(b'\x01\xbb\xc0\xd7\xb6\x56\xa8\xb9\xd1\xac\xaa\xb1\x50\x18\x40\x00\x56\xf8\x00\x00') 155 assert tcpheader.off == 5 156 157 # test setting header offset 158 tcpheader.off = 8 159 assert bytes(tcpheader) == b'\x01\xbb\xc0\xd7\xb6\x56\xa8\xb9\xd1\xac\xaa\xb1\x80\x18\x40\x00\x56\xf8\x00\x00'
160 161 if __name__ == '__main__': 162 # Runs all the test associated with this class/file 163 test_parse_opts() 164 test_offset() 165 print('Tests Successful...') 166