Package dpkt :: Module sctp
[hide private]
[frames] | no frames]

Source Code for Module dpkt.sctp

  1  # $Id: sctp.py 23 2006-11-08 15:45:33Z dugsong $ 
  2  # -*- coding: utf-8 -*- 
  3  """Stream Control Transmission Protocol.""" 
  4  from __future__ import print_function 
  5  from __future__ import absolute_import 
  6   
  7  from . import dpkt 
  8  from . import crc32c 
  9   
 10  # Stream Control Transmission Protocol 
 11  # http://tools.ietf.org/html/rfc2960 
 12   
 13  # Chunk Types 
 14  DATA = 0 
 15  INIT = 1 
 16  INIT_ACK = 2 
 17  SACK = 3 
 18  HEARTBEAT = 4 
 19  HEARTBEAT_ACK = 5 
 20  ABORT = 6 
 21  SHUTDOWN = 7 
 22  SHUTDOWN_ACK = 8 
 23  ERROR = 9 
 24  COOKIE_ECHO = 10 
 25  COOKIE_ACK = 11 
 26  ECNE = 12 
 27  CWR = 13 
 28  SHUTDOWN_COMPLETE = 14 
 29   
 30   
31 -class SCTP(dpkt.Packet):
32 """Stream Control Transmission Protocol. 33 34 TODO: Longer class information.... 35 36 Attributes: 37 __hdr__: Header fields of SCTP. 38 TODO. 39 """ 40 41 __hdr__ = ( 42 ('sport', 'H', 0), 43 ('dport', 'H', 0), 44 ('vtag', 'I', 0), 45 ('sum', 'I', 0) 46 ) 47
48 - def unpack(self, buf):
49 dpkt.Packet.unpack(self, buf) 50 l = [] 51 while self.data: 52 chunk = Chunk(self.data) 53 l.append(chunk) 54 self.data = self.data[len(chunk):] 55 self.data = self.chunks = l
56
57 - def __len__(self):
58 return self.__hdr_len__ + sum(map(len, self.data))
59
60 - def __bytes__(self):
61 l = [bytes(x) for x in self.data] 62 if self.sum == 0: 63 s = crc32c.add(0xffffffff, self.pack_hdr()) 64 for x in l: 65 s = crc32c.add(s, x) 66 self.sum = crc32c.done(s) 67 return self.pack_hdr() + b''.join(l)
68 69
70 -class Chunk(dpkt.Packet):
71 __hdr__ = ( 72 ('type', 'B', INIT), 73 ('flags', 'B', 0), 74 ('len', 'H', 0) 75 ) 76
77 - def unpack(self, buf):
78 dpkt.Packet.unpack(self, buf) 79 self.data = self.data[:self.len - self.__hdr_len__]
80 81 82 __s = b'\x80\x44\x00\x50\x00\x00\x00\x00\x30\xba\xef\x54\x01\x00\x00\x3c\x3b\xb9\x9c\x46\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08\x00\x00\x00\x00' 83 84
85 -def test_sctp_pack():
86 sctp = SCTP(__s) 87 assert (__s == bytes(sctp)) 88 sctp.sum = 0 89 assert (__s == bytes(sctp))
90 91
92 -def test_sctp_unpack():
93 sctp = SCTP(__s) 94 assert (sctp.sport == 32836) 95 assert (sctp.dport == 80) 96 assert (len(sctp.chunks) == 1) 97 assert (len(sctp) == 72) 98 chunk = sctp.chunks[0] 99 assert (chunk.type == INIT) 100 assert (chunk.len == 60)
101 102 103 if __name__ == '__main__': 104 test_sctp_pack() 105 test_sctp_unpack() 106 print('Tests Successful...') 107