Package dpkt :: Module radiotap
[hide private]
[frames] | no frames]

Source Code for Module dpkt.radiotap

  1  # -*- coding: utf-8 -*- 
  2  """Radiotap""" 
  3  from __future__ import print_function 
  4  from __future__ import absolute_import 
  5   
  6  import socket 
  7   
  8  from . import dpkt 
  9  from . import ieee80211 
 10  from .decorators import deprecated 
 11   
 12  # Ref: http://www.radiotap.org 
 13  # Fields Ref: http://www.radiotap.org/defined-fields/all 
 14   
 15  # Present flags 
 16  _TSFT_MASK = 0x1000000 
 17  _FLAGS_MASK = 0x2000000 
 18  _RATE_MASK = 0x4000000 
 19  _CHANNEL_MASK = 0x8000000 
 20  _FHSS_MASK = 0x10000000 
 21  _ANT_SIG_MASK = 0x20000000 
 22  _ANT_NOISE_MASK = 0x40000000 
 23  _LOCK_QUAL_MASK = 0x80000000 
 24  _TX_ATTN_MASK = 0x10000 
 25  _DB_TX_ATTN_MASK = 0x20000 
 26  _DBM_TX_POWER_MASK = 0x40000 
 27  _ANTENNA_MASK = 0x80000 
 28  _DB_ANT_SIG_MASK = 0x100000 
 29  _DB_ANT_NOISE_MASK = 0x200000 
 30  _RX_FLAGS_MASK = 0x400000 
 31  _CHANNELPLUS_MASK = 0x200 
 32  _EXT_MASK = 0x1 
 33   
 34  _TSFT_SHIFT = 24 
 35  _FLAGS_SHIFT = 25 
 36  _RATE_SHIFT = 26 
 37  _CHANNEL_SHIFT = 27 
 38  _FHSS_SHIFT = 28 
 39  _ANT_SIG_SHIFT = 29 
 40  _ANT_NOISE_SHIFT = 30 
 41  _LOCK_QUAL_SHIFT = 31 
 42  _TX_ATTN_SHIFT = 16 
 43  _DB_TX_ATTN_SHIFT = 17 
 44  _DBM_TX_POWER_SHIFT = 18 
 45  _ANTENNA_SHIFT = 19 
 46  _DB_ANT_SIG_SHIFT = 20 
 47  _DB_ANT_NOISE_SHIFT = 21 
 48  _RX_FLAGS_SHIFT = 22 
 49  _CHANNELPLUS_SHIFT = 10 
 50  _EXT_SHIFT = 0 
 51   
 52  # Flags elements 
 53  _FLAGS_SIZE = 2 
 54  _CFP_FLAG_SHIFT = 0 
 55  _PREAMBLE_SHIFT = 1 
 56  _WEP_SHIFT = 2 
 57  _FRAG_SHIFT = 3 
 58  _FCS_SHIFT = 4 
 59  _DATA_PAD_SHIFT = 5 
 60  _BAD_FCS_SHIFT = 6 
 61  _SHORT_GI_SHIFT = 7 
 62   
 63  # Channel type 
 64  _CHAN_TYPE_SIZE = 4 
 65  _CHANNEL_TYPE_SHIFT = 4 
 66  _CCK_SHIFT = 5 
 67  _OFDM_SHIFT = 6 
 68  _TWO_GHZ_SHIFT = 7 
 69  _FIVE_GHZ_SHIFT = 8 
 70  _PASSIVE_SHIFT = 9 
 71  _DYN_CCK_OFDM_SHIFT = 10 
 72  _GFSK_SHIFT = 11 
 73  _GSM_SHIFT = 12 
 74  _STATIC_TURBO_SHIFT = 13 
 75  _HALF_RATE_SHIFT = 14 
 76  _QUARTER_RATE_SHIFT = 15 
 77   
 78  # Flags offsets and masks 
 79  _FCS_SHIFT = 4 
 80  _FCS_MASK = 0x10 
81 82 83 -class Radiotap(dpkt.Packet):
84 """Radiotap. 85 86 TODO: Longer class information.... 87 88 Attributes: 89 __hdr__: Header fields of Radiotap. 90 TODO. 91 """ 92 93 __hdr__ = ( 94 ('version', 'B', 0), 95 ('pad', 'B', 0), 96 ('length', 'H', 0), 97 ('present_flags', 'I', 0) 98 ) 99 100 @property
101 - def tsft_present(self):
102 return (self.present_flags & _TSFT_MASK) >> _TSFT_SHIFT
103 104 @tsft_present.setter
105 - def tsft_present(self, val):
106 self.present_flags |= val << _TSFT_SHIFT
107 108 @property
109 - def flags_present(self):
110 return (self.present_flags & _FLAGS_MASK) >> _FLAGS_SHIFT
111 112 @flags_present.setter
113 - def flags_present(self, val):
115 116 @property
117 - def rate_present(self):
118 return (self.present_flags & _RATE_MASK) >> _RATE_SHIFT
119 120 @rate_present.setter
121 - def rate_present(self, val):
122 self.present_flags |= val << _RATE_SHIFT
123 124 @property
125 - def channel_present(self):
127 128 @channel_present.setter
129 - def channel_present(self, val):
131 132 @property
133 - def fhss_present(self):
134 return (self.present_flags & _FHSS_MASK) >> _FHSS_SHIFT
135 136 @fhss_present.setter
137 - def fhss_present(self, val):
138 self.present_flags |= val << _FHSS_SHIFT
139 140 @property
141 - def ant_sig_present(self):
143 144 @ant_sig_present.setter
145 - def ant_sig_present(self, val):
147 148 @property
149 - def ant_noise_present(self):
151 152 @ant_noise_present.setter
153 - def ant_noise_present(self, val):
155 156 @property
157 - def lock_qual_present(self):
159 160 @lock_qual_present.setter
161 - def lock_qual_present(self, val):
163 164 @property
165 - def tx_attn_present(self):
167 168 @tx_attn_present.setter
169 - def tx_attn_present(self, val):
171 172 @property
173 - def db_tx_attn_present(self):
175 176 @db_tx_attn_present.setter
177 - def db_tx_attn_present(self, val):
179 180 @property
181 - def dbm_tx_power_present(self):
183 184 @dbm_tx_power_present.setter
185 - def dbm_tx_power_present(self, val):
187 188 @property
189 - def ant_present(self):
191 192 @ant_present.setter
193 - def ant_present(self, val):
195 196 @property
197 - def db_ant_sig_present(self):
199 200 @db_ant_sig_present.setter
201 - def db_ant_sig_present(self, val):
203 204 @property
205 - def db_ant_noise_present(self):
207 208 @db_ant_noise_present.setter
209 - def db_ant_noise_present(self, val):
211 212 @property
213 - def rx_flags_present(self):
215 216 @rx_flags_present.setter
217 - def rx_flags_present(self, val):
219 220 @property
221 - def chanplus_present(self):
223 224 @chanplus_present.setter
225 - def chanplus_present(self, val):
227 228 @property
229 - def ext_present(self):
230 return (self.present_flags & _EXT_MASK) >> _EXT_SHIFT
231 232 @ext_present.setter
233 - def ext_present(self, val):
234 self.present_flags |= val << _EXT_SHIFT
235
236 - def unpack(self, buf):
237 dpkt.Packet.unpack(self, buf) 238 self.data = buf[socket.ntohs(self.length):] 239 240 self.fields = [] 241 buf = buf[self.__hdr_len__:] 242 243 # decode each field into self.<name> (eg. self.tsft) as well as append it self.fields list 244 field_decoder = [ 245 ('tsft', self.tsft_present, self.TSFT), 246 ('flags', self.flags_present, self.Flags), 247 ('rate', self.rate_present, self.Rate), 248 ('channel', self.channel_present, self.Channel), 249 ('fhss', self.fhss_present, self.FHSS), 250 ('ant_sig', self.ant_sig_present, self.AntennaSignal), 251 ('ant_noise', self.ant_noise_present, self.AntennaNoise), 252 ('lock_qual', self.lock_qual_present, self.LockQuality), 253 ('tx_attn', self.tx_attn_present, self.TxAttenuation), 254 ('db_tx_attn', self.db_tx_attn_present, self.DbTxAttenuation), 255 ('dbm_tx_power', self.dbm_tx_power_present, self.DbmTxPower), 256 ('ant', self.ant_present, self.Antenna), 257 ('db_ant_sig', self.db_ant_sig_present, self.DbAntennaSignal), 258 ('db_ant_noise', self.db_ant_noise_present, self.DbAntennaNoise), 259 ('rx_flags', self.rx_flags_present, self.RxFlags) 260 ] 261 for name, present_bit, parser in field_decoder: 262 if present_bit: 263 field = parser(buf) 264 field.data = b'' 265 setattr(self, name, field) 266 self.fields.append(field) 267 buf = buf[len(field):] 268 269 if len(self.data) > 0: 270 if self.flags_present and self.flags.fcs: 271 self.data = ieee80211.IEEE80211(self.data, fcs=self.flags.fcs) 272 else: 273 self.data = ieee80211.IEEE80211(self.data)
274
275 - class Antenna(dpkt.Packet):
276 __hdr__ = ( 277 ('index', 'B', 0), 278 )
279
280 - class AntennaNoise(dpkt.Packet):
281 __hdr__ = ( 282 ('db', 'B', 0), 283 )
284
285 - class AntennaSignal(dpkt.Packet):
286 __hdr__ = ( 287 ('db', 'B', 0), 288 )
289
290 - class Channel(dpkt.Packet):
291 __hdr__ = ( 292 ('freq', 'H', 0), 293 ('flags', 'H', 0), 294 )
295
296 - class FHSS(dpkt.Packet):
297 __hdr__ = ( 298 ('set', 'B', 0), 299 ('pattern', 'B', 0), 300 )
301
302 - class Flags(dpkt.Packet):
303 __hdr__ = ( 304 ('val', 'B', 0), 305 ) 306 307 @property
308 - def fcs(self): return (self.val & _FCS_MASK) >> _FCS_SHIFT
309 310 # TODO statement seems to have no effect 311 @fcs.setter
312 - def fcs(self, v): (v << _FCS_SHIFT) | (self.val & ~_FCS_MASK)
313 314
315 - class LockQuality(dpkt.Packet):
316 __hdr__ = ( 317 ('val', 'H', 0), 318 )
319
320 - class RxFlags(dpkt.Packet):
321 __hdr__ = ( 322 ('val', 'H', 0), 323 )
324
325 - class Rate(dpkt.Packet):
326 __hdr__ = ( 327 ('val', 'B', 0), 328 )
329
330 - class TSFT(dpkt.Packet):
331 __hdr__ = ( 332 ('usecs', 'Q', 0), 333 )
334
335 - class TxAttenuation(dpkt.Packet):
336 __hdr__ = ( 337 ('val', 'H', 0), 338 )
339
340 - class DbTxAttenuation(dpkt.Packet):
341 __hdr__ = ( 342 ('db', 'H', 0), 343 )
344
345 - class DbAntennaNoise(dpkt.Packet):
346 __hdr__ = ( 347 ('db', 'B', 0), 348 )
349
350 - class DbAntennaSignal(dpkt.Packet):
351 __hdr__ = ( 352 ('db', 'B', 0), 353 )
354
355 - class DbmTxPower(dpkt.Packet):
356 __hdr__ = ( 357 ('dbm', 'B', 0), 358 )
359
360 361 -def test_Radiotap():
362 s = b'\x00\x00\x00\x18\x6e\x48\x00\x00\x00\x02\x6c\x09\xa0\x00\xa8\x81\x02\x00\x00\x00\x00\x00\x00\x00' 363 rad = Radiotap(s) 364 assert(rad.version == 0) 365 assert(rad.present_flags == 0x6e480000) 366 assert(rad.tsft_present == 0) 367 assert(rad.flags_present == 1) 368 assert(rad.rate_present == 1) 369 assert(rad.channel_present == 1) 370 assert(rad.fhss_present == 0) 371 assert(rad.ant_sig_present == 1) 372 assert(rad.ant_noise_present == 1) 373 assert(rad.lock_qual_present == 0) 374 assert(rad.db_tx_attn_present == 0) 375 assert(rad.dbm_tx_power_present == 0) 376 assert(rad.ant_present == 1) 377 assert(rad.db_ant_sig_present == 0) 378 assert(rad.db_ant_noise_present == 0) 379 assert(rad.rx_flags_present == 1) 380 assert(rad.channel.freq == 0x6c09) 381 assert(rad.channel.flags == 0xa000) 382 assert(len(rad.fields) == 7)
383
384 385 -def test_fcs():
386 s = b'\x00\x00\x1a\x00\x2f\x48\x00\x00\x34\x8f\x71\x09\x00\x00\x00\x00\x10\x0c\x85\x09\xc0\x00\xcc\x01\x00\x00' 387 rt = Radiotap(s) 388 assert(rt.flags_present == 1) 389 assert(rt.flags.fcs == 1)
390 391 392 if __name__ == '__main__': 393 test_Radiotap() 394 test_fcs() 395 print('Tests Successful...') 396