1
2
3 """Diameter."""
4 from __future__ import print_function
5 from __future__ import absolute_import
6
7 import struct
8
9 from . import dpkt
10 from .decorators import deprecated
11 from .compat import compat_ord
12
13
14
15
16
17 ABORT_SESSION = 274
18 ACCOUTING = 271
19 CAPABILITIES_EXCHANGE = 257
20 DEVICE_WATCHDOG = 280
21 DISCONNECT_PEER = 282
22 RE_AUTH = 258
23 SESSION_TERMINATION = 275
27 """Diameter.
28
29 TODO: Longer class information....
30
31 Attributes:
32 __hdr__: Header fields of Diameter.
33 TODO.
34 """
35
36 __hdr__ = (
37 ('v', 'B', 1),
38 ('len', '3s', 0),
39 ('flags', 'B', 0),
40 ('cmd', '3s', 0),
41 ('app_id', 'I', 0),
42 ('hop_id', 'I', 0),
43 ('end_id', 'I', 0)
44 )
45
46 @property
48 return (self.flags >> 7) & 0x1
49
50 @request_flag.setter
52 self.flags = (self.flags & ~0x80) | ((r & 0x1) << 7)
53
54 @property
56 return (self.flags >> 6) & 0x1
57
58 @proxiable_flag.setter
60 self.flags = (self.flags & ~0x40) | ((p & 0x1) << 6)
61
62 @property
64 return (self.flags >> 5) & 0x1
65
66 @error_flag.setter
68 self.flags = (self.flags & ~0x20) | ((e & 0x1) << 5)
69
70 @property
72 return (self.flags >> 4) & 0x1
73
74 @retransmit_flag.setter
76 self.flags = (self.flags & ~0x10) | ((t & 0x1) << 4)
77
79 dpkt.Packet.unpack(self, buf)
80 self.cmd = (compat_ord(self.cmd[0]) << 16) | \
81 (compat_ord(self.cmd[1]) << 8) | \
82 (compat_ord(self.cmd[2]))
83 self.len = (compat_ord(self.len[0]) << 16) | \
84 (compat_ord(self.len[1]) << 8) | \
85 (compat_ord(self.len[2]))
86 self.data = self.data[:self.len - self.__hdr_len__]
87
88 l = []
89 while self.data:
90 avp = AVP(self.data)
91 l.append(avp)
92 self.data = self.data[len(avp):]
93 self.data = self.avps = l
94
99
102
104 return self.pack_hdr() + b''.join(map(bytes, self.data))
105
106 -class AVP(dpkt.Packet):
107 __hdr__ = (
108 ('code', 'I', 0),
109 ('flags', 'B', 0),
110 ('len', '3s', 0),
111 )
112
113 @property
115 return (self.flags >> 7) & 0x1
116
117 @vendor_flag.setter
119 self.flags = (self.flags & ~0x80) | ((v & 0x1) << 7)
120
121 @property
123 return (self.flags >> 6) & 0x1
124
125 @mandatory_flag.setter
127 self.flags = (self.flags & ~0x40) | ((m & 0x1) << 6)
128
129 @property
131 return (self.flags >> 5) & 0x1
132
133 @protected_flag.setter
135 self.flags = (self.flags & ~0x20) | ((p & 0x1) << 5)
136
148
155
161
162
163 __s = b'\x01\x00\x00\x28\x80\x00\x01\x18\x00\x00\x00\x00\x00\x00\x41\xc8\x00\x00\x00\x0c\x00\x00\x01\x08\x40\x00\x00\x0c\x68\x30\x30\x32\x00\x00\x01\x28\x40\x00\x00\x08'
164 __t = b'\x01\x00\x00\x2c\x80\x00\x01\x18\x00\x00\x00\x00\x00\x00\x41\xc8\x00\x00\x00\x0c\x00\x00\x01\x08\xc0\x00\x00\x10\xde\xad\xbe\xef\x68\x30\x30\x32\x00\x00\x01\x28\x40\x00\x00\x08'
172
175 d = Diameter(__s)
176 assert (d.len == 40)
177
178 assert (d.request_flag == 1)
179 assert (d.error_flag == 0)
180 assert (len(d.avps) == 2)
181
182 avp = d.avps[0]
183
184 assert (avp.mandatory_flag == 1)
185 assert (avp.vendor_flag == 0)
186 assert (avp.len == 12)
187 assert (len(avp) == 12)
188 assert (avp.data == b'\x68\x30\x30\x32')
189
190
191 d = Diameter(__t)
192 assert (d.len == 44)
193 avp = d.avps[0]
194 assert (avp.vendor_flag == 1)
195 assert (avp.len == 16)
196 assert (len(avp) == 16)
197 assert (avp.vendor == 3735928559)
198 assert (avp.data == b'\x68\x30\x30\x32')
199
200
201 if __name__ == '__main__':
202 test_pack()
203 test_unpack()
204 print('Tests Successful...')
205