Source code for ldap3.utils.asn1

"""
"""

# Created on 2015.08.19
#
# Author: Giovanni Cannata
#
# Copyright 2015 - 2018 Giovanni Cannata
#
# This file is part of ldap3.
#
# ldap3 is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ldap3 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with ldap3 in the COPYING and COPYING.LESSER files.
# If not, see <http://www.gnu.org/licenses/>.

from pyasn1 import __version__ as pyasn1_version
from pyasn1.codec.ber import decoder  # for usage in other modules
from pyasn1.codec.ber.encoder import Encoder # for monkeypatching of boolean value
from ..core.results import RESULT_CODES
from ..utils.conv import to_unicode
from ..protocol.convert import referrals_to_list

CLASSES = {(False, False): 0,  # Universal
           (False, True): 1,  # Application
           (True, False): 2,  # Context
           (True, True): 3}  # Private


# Monkeypatching of pyasn1 for encoding Boolean with the value 0xFF for TRUE
# THIS IS NOT PART OF THE FAST BER DECODER
if pyasn1_version == 'xxx0.2.3':
    from pyasn1.codec.ber.encoder import tagMap, BooleanEncoder, encode
    from pyasn1.type.univ import Boolean
    from pyasn1.compat.octets import ints2octs
    class BooleanCEREncoder(BooleanEncoder):
        _true = ints2octs((255,))

    tagMap[Boolean.tagSet] = BooleanCEREncoder()
else:
    from pyasn1.codec.ber.encoder import tagMap, typeMap, AbstractItemEncoder
    from pyasn1.type.univ import Boolean
    from copy import deepcopy

[docs] class LDAPBooleanEncoder(AbstractItemEncoder): supportIndefLenMode = False if pyasn1_version <= '0.2.3': from pyasn1.compat.octets import ints2octs _true = ints2octs((255,)) _false = ints2octs((0,)) def encodeValue(self, encodeFun, value, defMode, maxChunkSize): return value and self._true or self._false, 0 elif pyasn1_version <= '0.3.1': def encodeValue(self, encodeFun, value, defMode, maxChunkSize): return value and (255,) or (0,), False, False elif pyasn1_version <= '0.3.4': def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): return value and (255,) or (0,), False, False elif pyasn1_version <= '0.3.7': def encodeValue(self, value, encodeFun, **options): return value and (255,) or (0,), False, False else:
[docs] def encodeValue(self, value, asn1Spec, encodeFun, **options): return value and (255,) or (0,), False, False
customTagMap = deepcopy(tagMap) customTypeMap = deepcopy(typeMap) customTagMap[Boolean.tagSet] = LDAPBooleanEncoder() customTypeMap[Boolean.typeId] = LDAPBooleanEncoder() encode = Encoder(customTagMap, customTypeMap) # end of monkey patching # a fast BER decoder for LDAP responses only
[docs]def compute_ber_size(data): """ Compute size according to BER definite length rules Returns size of value and value offset """ if data[1] <= 127: # BER definite length - short form. Highest bit of byte 1 is 0, message length is in the last 7 bits - Value can be up to 127 bytes long return data[1], 2 else: # BER definite length - long form. Highest bit of byte 1 is 1, last 7 bits counts the number of following octets containing the value length bytes_length = data[1] - 128 value_length = 0 cont = bytes_length for byte in data[2: 2 + bytes_length]: cont -= 1 value_length += byte * (256 ** cont) return value_length, bytes_length + 2
[docs]def decode_message_fast(message): ber_len, ber_value_offset = compute_ber_size(get_bytes(message[:10])) # get start of sequence, at maximum 3 bytes for length decoded = decode_sequence(message, ber_value_offset, ber_len + ber_value_offset, LDAP_MESSAGE_CONTEXT) return { 'messageID': decoded[0][3], 'protocolOp': decoded[1][2], 'payload': decoded[1][3], 'controls': decoded[2][3] if len(decoded) == 3 else None }
[docs]def decode_sequence(message, start, stop, context_decoders=None): decoded = [] while start < stop: octet = get_byte(message[start]) ber_class = CLASSES[(bool(octet & 0b10000000), bool(octet & 0b01000000))] ber_constructed = bool(octet & 0b00100000) ber_type = octet & 0b00011111 ber_decoder = DECODERS[(ber_class, octet & 0b00011111)] if ber_class < 2 else None ber_len, ber_value_offset = compute_ber_size(get_bytes(message[start: start + 10])) start += ber_value_offset if ber_decoder: value = ber_decoder(message, start, start + ber_len, context_decoders) # call value decode function else: # try: value = context_decoders[ber_type](message, start, start + ber_len) # call value decode function for context class # except KeyError: # if ber_type == 3: # Referral in result # value = decode_sequence(message, start, start + ber_len) # else: # raise # re-raise, should never happen decoded.append((ber_class, ber_constructed, ber_type, value)) start += ber_len return decoded
[docs]def decode_integer(message, start, stop, context_decoders=None): first = message[start] value = -1 if get_byte(first) & 0x80 else 0 for octet in message[start: stop]: value = value << 8 | get_byte(octet) return value
[docs]def decode_octet_string(message, start, stop, context_decoders=None): return message[start: stop]
[docs]def decode_boolean(message, start, stop, context_decoders=None): return False if message[start: stop] == 0 else True
[docs]def decode_bind_response(message, start, stop, context_decoders=None): return decode_sequence(message, start, stop, BIND_RESPONSE_CONTEXT)
[docs]def decode_extended_response(message, start, stop, context_decoders=None): return decode_sequence(message, start, stop, EXTENDED_RESPONSE_CONTEXT)
[docs]def decode_intermediate_response(message, start, stop, context_decoders=None): return decode_sequence(message, start, stop, INTERMEDIATE_RESPONSE_CONTEXT)
[docs]def decode_controls(message, start, stop, context_decoders=None): return decode_sequence(message, start, stop, CONTROLS_CONTEXT)
[docs]def ldap_result_to_dict_fast(response): response_dict = dict() response_dict['result'] = int(response[0][3]) # resultCode response_dict['description'] = RESULT_CODES[response_dict['result']] response_dict['dn'] = to_unicode(response[1][3], from_server=True) # matchedDN response_dict['message'] = to_unicode(response[2][3], from_server=True) # diagnosticMessage if len(response) == 4: response_dict['referrals'] = referrals_to_list([to_unicode(referral[3], from_server=True) for referral in response[3][3]]) # referrals else: response_dict['referrals'] = None return response_dict
###### if str is not bytes: # Python 3 def get_byte(x): return x def get_bytes(x): return x else: # Python 2
[docs] def get_byte(x): return ord(x)
[docs] def get_bytes(x): return bytearray(x)
DECODERS = { # Universal (0, 1): decode_boolean, # Boolean (0, 2): decode_integer, # Integer (0, 4): decode_octet_string, # Octet String (0, 10): decode_integer, # Enumerated (0, 16): decode_sequence, # Sequence (0, 17): decode_sequence, # Set # Application (1, 1): decode_bind_response, # Bind response (1, 4): decode_sequence, # Search result entry (1, 5): decode_sequence, # Search result done (1, 7): decode_sequence, # Modify response (1, 9): decode_sequence, # Add response (1, 11): decode_sequence, # Delete response (1, 13): decode_sequence, # ModifyDN response (1, 15): decode_sequence, # Compare response (1, 19): decode_sequence, # Search result reference (1, 24): decode_extended_response, # Extended response (1, 25): decode_intermediate_response, # intermediate response (2, 3): decode_octet_string # } BIND_RESPONSE_CONTEXT = { 7: decode_octet_string # SaslCredentials } EXTENDED_RESPONSE_CONTEXT = { 10: decode_octet_string, # ResponseName 11: decode_octet_string # Response Value } INTERMEDIATE_RESPONSE_CONTEXT = { 0: decode_octet_string, # IntermediateResponseName 1: decode_octet_string # IntermediateResponseValue } LDAP_MESSAGE_CONTEXT = { 0: decode_controls, # Controls 3: decode_sequence # Referral } CONTROLS_CONTEXT = { 0: decode_sequence # Control }