Package dpkt :: Module diameter
[hide private]
[frames] | no frames]

Source Code for Module dpkt.diameter

  1  # $Id: diameter.py 23 2006-11-08 15:45:33Z dugsong $ 
  2  # -*- coding: utf-8 -*- 
  3  """Diameter.""" 
  4  from __future__ import print_function 
  5  from __future__ import absolute_import 
  6   
  7  import struct 
  8   
  9  from . import dpkt 
 10  from .decorators import deprecated 
 11  from .compat import compat_ord 
 12   
 13  # Diameter Base Protocol - RFC 3588 
 14  # http://tools.ietf.org/html/rfc3588 
 15   
 16  # Request/Answer Command Codes 
 17  ABORT_SESSION = 274 
 18  ACCOUTING = 271 
 19  CAPABILITIES_EXCHANGE = 257 
 20  DEVICE_WATCHDOG = 280 
 21  DISCONNECT_PEER = 282 
 22  RE_AUTH = 258 
 23  SESSION_TERMINATION = 275 
24 25 26 -class Diameter(dpkt.Packet):
27 """Diameter. 28 29 TODO: Longer class information.... 30 31 Attributes: 32 __hdr__: Header fields of Diameter. 33 TODO. 34 """ 35 36 __hdr__ = ( 37 ('v', 'B', 1), 38 ('len', '3s', 0), 39 ('flags', 'B', 0), 40 ('cmd', '3s', 0), 41 ('app_id', 'I', 0), 42 ('hop_id', 'I', 0), 43 ('end_id', 'I', 0) 44 ) 45 46 @property
47 - def request_flag(self):
48 return (self.flags >> 7) & 0x1
49 50 @request_flag.setter
51 - def request_flag(self, r):
52 self.flags = (self.flags & ~0x80) | ((r & 0x1) << 7)
53 54 @property
55 - def proxiable_flag(self):
56 return (self.flags >> 6) & 0x1
57 58 @proxiable_flag.setter
59 - def proxiable_flag(self, p):
60 self.flags = (self.flags & ~0x40) | ((p & 0x1) << 6)
61 62 @property
63 - def error_flag(self):
64 return (self.flags >> 5) & 0x1
65 66 @error_flag.setter
67 - def error_flag(self, e):
68 self.flags = (self.flags & ~0x20) | ((e & 0x1) << 5)
69 70 @property
71 - def retransmit_flag(self):
72 return (self.flags >> 4) & 0x1
73 74 @retransmit_flag.setter
75 - def retransmit_flag(self, t):
76 self.flags = (self.flags & ~0x10) | ((t & 0x1) << 4)
77
78 - def unpack(self, buf):
79 dpkt.Packet.unpack(self, buf) 80 self.cmd = (compat_ord(self.cmd[0]) << 16) | \ 81 (compat_ord(self.cmd[1]) << 8) | \ 82 (compat_ord(self.cmd[2])) 83 self.len = (compat_ord(self.len[0]) << 16) | \ 84 (compat_ord(self.len[1]) << 8) | \ 85 (compat_ord(self.len[2])) 86 self.data = self.data[:self.len - self.__hdr_len__] 87 88 l = [] 89 while self.data: 90 avp = AVP(self.data) 91 l.append(avp) 92 self.data = self.data[len(avp):] 93 self.data = self.avps = l
94
95 - def pack_hdr(self):
96 self.len = struct.pack("BBB", (self.len >> 16) & 0xff, (self.len >> 8) & 0xff, self.len & 0xff) 97 self.cmd = struct.pack("BBB", (self.cmd >> 16) & 0xff, (self.cmd >> 8) & 0xff, self.cmd & 0xff) 98 return dpkt.Packet.pack_hdr(self)
99
100 - def __len__(self):
101 return self.__hdr_len__ + sum(map(len, self.data))
102
103 - def __bytes__(self):
104 return self.pack_hdr() + b''.join(map(bytes, self.data))
105
106 -class AVP(dpkt.Packet):
107 __hdr__ = ( 108 ('code', 'I', 0), 109 ('flags', 'B', 0), 110 ('len', '3s', 0), 111 ) 112 113 @property
114 - def vendor_flag(self):
115 return (self.flags >> 7) & 0x1
116 117 @vendor_flag.setter
118 - def vendor_flag(self, v):
119 self.flags = (self.flags & ~0x80) | ((v & 0x1) << 7)
120 121 @property
122 - def mandatory_flag(self):
123 return (self.flags >> 6) & 0x1
124 125 @mandatory_flag.setter
126 - def mandatory_flag(self, m):
127 self.flags = (self.flags & ~0x40) | ((m & 0x1) << 6)
128 129 @property
130 - def protected_flag(self):
131 return (self.flags >> 5) & 0x1
132 133 @protected_flag.setter
134 - def protected_flag(self, p):
135 self.flags = (self.flags & ~0x20) | ((p & 0x1) << 5)
136
137 - def unpack(self, buf):
138 dpkt.Packet.unpack(self, buf) 139 self.len = (compat_ord(self.len[0]) << 16) | \ 140 (compat_ord(self.len[1]) << 8) | \ 141 (compat_ord(self.len[2])) 142 143 if self.vendor_flag: 144 self.vendor = struct.unpack('>I', self.data[:4])[0] 145 self.data = self.data[4:self.len - self.__hdr_len__] 146 else: 147 self.data = self.data[:self.len - self.__hdr_len__]
148
149 - def pack_hdr(self):
150 self.len = struct.pack("BBB", (self.len >> 16) & 0xff, (self.len >> 8) & 0xff, self.len & 0xff) 151 data = dpkt.Packet.pack_hdr(self) 152 if self.vendor_flag: 153 data += struct.pack('>I', self.vendor) 154 return data
155
156 - def __len__(self):
157 length = self.__hdr_len__ + len(self.data) 158 if self.vendor_flag: 159 length += 4 160 return length
161 162 163 __s = b'\x01\x00\x00\x28\x80\x00\x01\x18\x00\x00\x00\x00\x00\x00\x41\xc8\x00\x00\x00\x0c\x00\x00\x01\x08\x40\x00\x00\x0c\x68\x30\x30\x32\x00\x00\x01\x28\x40\x00\x00\x08' 164 __t = b'\x01\x00\x00\x2c\x80\x00\x01\x18\x00\x00\x00\x00\x00\x00\x41\xc8\x00\x00\x00\x0c\x00\x00\x01\x08\xc0\x00\x00\x10\xde\xad\xbe\xef\x68\x30\x30\x32\x00\x00\x01\x28\x40\x00\x00\x08'
165 166 167 -def test_pack():
168 d = Diameter(__s) 169 assert (__s == bytes(d)) 170 d = Diameter(__t) 171 assert (__t == bytes(d))
172
173 174 -def test_unpack():
175 d = Diameter(__s) 176 assert (d.len == 40) 177 # assert (d.cmd == DEVICE_WATCHDOG_REQUEST) 178 assert (d.request_flag == 1) 179 assert (d.error_flag == 0) 180 assert (len(d.avps) == 2) 181 182 avp = d.avps[0] 183 # assert (avp.code == ORIGIN_HOST) 184 assert (avp.mandatory_flag == 1) 185 assert (avp.vendor_flag == 0) 186 assert (avp.len == 12) 187 assert (len(avp) == 12) 188 assert (avp.data == b'\x68\x30\x30\x32') 189 190 # also test the optional vendor id support 191 d = Diameter(__t) 192 assert (d.len == 44) 193 avp = d.avps[0] 194 assert (avp.vendor_flag == 1) 195 assert (avp.len == 16) 196 assert (len(avp) == 16) 197 assert (avp.vendor == 3735928559) 198 assert (avp.data == b'\x68\x30\x30\x32')
199 200 201 if __name__ == '__main__': 202 test_pack() 203 test_unpack() 204 print('Tests Successful...') 205