# coding: utf-8
"""
brownie.datastructures.sequences
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:copyright: 2010-2011 by Daniel Neuhäuser
:license: BSD, see LICENSE.rst for details
"""
import textwrap
from keyword import iskeyword
from functools import wraps
from itertools import count
from brownie.itools import chain
[docs]class LazyList(object):
"""
Implements a lazy list which computes items based on the given `iterable`.
This allows you to create :class:`list`\-like objects of unlimited size.
However although most operations don't exhaust the internal iterator
completely some of them do, so if the given iterable is of unlimited size
making such an operation will eventually cause a :exc:`MemoryError`.
Cost in terms of laziness of supported operators, this does not include
supported operators without any cost:
+-----------------+-------------------------------------------------------+
| Operation | Result |
+=================+=======================================================+
| ``list[i]`` | This exhausts the `list` up until the given index. |
+-----------------+ |
| ``list[i] = x`` | |
+-----------------+ |
| ``del list[i]`` | |
+-----------------+-------------------------------------------------------+
| ``len(list)`` | Exhausts the internal iterator. |
+-----------------+-------------------------------------------------------+
| ``x in list`` | Exhausts the `list` up until `x` or until the `list` |
| | is exhausted. |
+-----------------+-------------------------------------------------------+
| ``l1 == l2`` | Exhausts both lists. |
+-----------------+-------------------------------------------------------+
| ``l1 != l2`` | Exhausts both lists. |
+-----------------+-------------------------------------------------------+
| ``bool(list)`` | Exhausts the `list` up to the first item. |
+-----------------+-------------------------------------------------------+
| ``l1 < l2`` | Exhausts the list up to the first item which shows |
| | the result. In the worst case this exhausts both |
+-----------------+ lists. |
| ``l1 > l2`` | |
+-----------------+-------------------------------------------------------+
| ``l1 + l2`` | Creates a new :class:`LazyList` without exhausting |
| | `l1` or `l2`. |
+-----------------+-------------------------------------------------------+
| ``list * n`` | Exhausts the `list`. |
+-----------------+ |
| ``list *= n`` | |
+-----------------+-------------------------------------------------------+
.. versionadded:: 0.5
It is now possible to pickle :class:`LazyList`\s, however this will
exhaust the list.
"""
[docs] @classmethod
def factory(cls, callable):
"""
Returns a wrapper for a given callable which takes the return value
of the wrapped callable and converts it into a :class:`LazyList`.
"""
@wraps(callable)
def wrap(*args, **kwargs):
return cls(callable(*args, **kwargs))
return wrap
def exhausting(func):
@wraps(func)
def wrap(self, *args, **kwargs):
self._exhaust()
return func(self, *args, **kwargs)
return wrap
def __init__(self, iterable):
if isinstance(iterable, (list, tuple, basestring)):
#: ``True`` if the internal iterator is exhausted.
self.exhausted = True
self._collected_data = list(iterable)
else:
self._iterator = iter(iterable)
self.exhausted = False
self._collected_data = []
def _exhaust(self, i=None):
if self.exhausted:
return
elif i is None or i < 0:
index_range = count(self.known_length)
elif isinstance(i, slice):
start, stop = i.start, i.stop
if start < 0 or stop < 0:
index_range = count(self.known_length)
else:
index_range = xrange(self.known_length, stop)
else:
index_range = xrange(self.known_length, i + 1)
for i in index_range:
try:
self._collected_data.append(self._iterator.next())
except StopIteration:
self.exhausted = True
break
@property
def known_length(self):
"""
The number of items which have been taken from the internal iterator.
"""
return len(self._collected_data)
def append(self, object):
"""
Appends the given `object` to the list.
"""
self.extend([object])
def extend(self, objects):
"""
Extends the list with the given `objects`.
"""
if self.exhausted:
self._collected_data.extend(objects)
else:
self._iterator = chain(self._iterator, objects)
[docs] def insert(self, index, object):
"""
Inserts the given `object` at the given `index`.
This method exhausts the internal iterator up until the given `index`.
"""
self._exhaust(index)
self._collected_data.insert(index, object)
[docs] def pop(self, index=None):
"""
Removes and returns the item at the given `index`, if no `index` is
given the last item is used.
This method exhausts the internal iterator up until the given `index`.
"""
self._exhaust(index)
if index is None:
return self._collected_data.pop()
return self._collected_data.pop(index)
[docs] def remove(self, object):
"""
Looks for the given `object` in the list and removes the first
occurrence.
If the item is not found a :exc:`ValueError` is raised.
This method exhausts the internal iterator up until the first
occurrence of the given `object` or entirely if it is not found.
"""
while True:
try:
self._collected_data.remove(object)
return
except ValueError:
if self.exhausted:
raise
else:
self._exhaust(self.known_length)
[docs] @exhausting
def reverse(self):
"""
Reverses the list.
This method exhausts the internal iterator.
"""
self._collected_data.reverse()
[docs] @exhausting
def sort(self, cmp=None, key=None, reverse=False):
"""
Sorts the list using the given `cmp` or `key` function and reverses it
if `reverse` is ``True``.
This method exhausts the internal iterator.
"""
self._collected_data.sort(cmp=cmp, key=key, reverse=reverse)
[docs] @exhausting
def count(self, object):
"""
Counts the occurrences of the given `object` in the list.
This method exhausts the internal iterator.
"""
return self._collected_data.count(object)
[docs] def index(self, object):
"""
Returns first index of the `object` in list
This method exhausts the internal iterator up until the given `object`.
"""
for i, obj in enumerate(self):
if obj == object:
return i
raise ValueError('%s not in LazyList' % object)
def __getitem__(self, i):
"""
Returns the object or objects at the given index.
This method exhausts the internal iterator up until the given index.
"""
self._exhaust(i)
return self._collected_data[i]
def __setitem__(self, i, obj):
"""
Sets the given object or objects at the given index.
This method exhausts the internal iterator up until the given index.
"""
self._exhaust(i)
self._collected_data[i] = obj
def __delitem__(self, i):
"""
Removes the item or items at the given index.
This method exhausts the internal iterator up until the given index.
"""
self._exhaust(i)
del self._collected_data[i]
@exhausting
def __len__(self):
"""
Returns the length of the list.
This method exhausts the internal iterator.
"""
return self.known_length
def __contains__(self, other):
for item in self:
if item == other:
return True
return False
@exhausting
def __eq__(self, other):
"""
Returns ``True`` if the list is equal to the given `other` list, which
may be another :class:`LazyList`, a :class:`list` or a subclass of
either.
This method exhausts the internal iterator.
"""
if isinstance(other, (self.__class__, list)):
return self._collected_data == other
return False
def __ne__(self, other):
"""
Returns ``True`` if the list is unequal to the given `other` list, which
may be another :class:`LazyList`, a :class:`list` or a subclass of
either.
This method exhausts the internal iterator.
"""
return not self.__eq__(other)
__hash__ = None
def __nonzero__(self):
"""
Returns ``True`` if the list is not empty.
This method takes one item from the internal iterator.
"""
self._exhaust(0)
return bool(self._collected_data)
def __lt__(self, other):
"""
This method returns ``True`` if this list is "lower than" the given
`other` list. This is the case if...
- this list is empty and the other is not.
- the first nth item in this list which is unequal to the
corresponding item in the other list, is lower than the corresponding
item.
If this and the other list is empty this method will return ``False``.
"""
if isinstance(other, (self.__class__, list)):
other = list(other)
return list(self) < other
def __gt__(self, other):
"""
This method returns ``True`` if this list is "greater than" the given
`other` list. This is the case if...
- this list is not empty and the other is
- the first nth item in this list which is unequal to the
corresponding item in the other list, is greater than the
corresponding item.
If this and the other list is empty this method will return ``False``.
"""
if isinstance(other, (self.__class__, list)):
other = list(other)
return list(self) > other
def __add__(self, other):
if isinstance(other, (list, self.__class__)):
return self.__class__(chain(self, other))
raise TypeError("can't concatenate with non-list: {0}".format(other))
def __iadd__(self, other):
self.extend(other)
return self
def __mul__(self, other):
if isinstance(other, int):
self._exhaust()
return self.__class__(self._collected_data * other)
raise TypeError("can't multiply sequence by non-int: {0}".format(other))
def __imul__(self, other):
if isinstance(other, int):
self._exhaust()
self._collected_data *= other
return self
else:
raise TypeError(
"can't multiply sequence by non-int: {0}".format(other)
)
@exhausting
def __getstate__(self):
return self._collected_data
def __setstate__(self, state):
self.exhausted = True
self._collected_data = state
def __repr__(self):
"""
Returns the representation string of the list, if the list exhausted
this looks like the representation of any other list, otherwise the
"lazy" part is represented by "...", like "[1, 2, 3, ...]".
"""
if self.exhausted:
return repr(self._collected_data)
elif not self._collected_data:
return '[...]'
return '[%s, ...]' % ', '.join(
repr(obj) for obj in self._collected_data
)
del exhausting
[docs]class CombinedSequence(object):
"""
A sequence combining other sequences.
.. versionadded:: 0.5
"""
def __init__(self, sequences):
self.sequences = list(sequences)
[docs] def at_index(self, index):
"""
Returns the sequence and the 'sequence local' index::
>>> foo = [1, 2, 3]
>>> bar = [4, 5, 6]
>>> cs = CombinedSequence([foo, bar])
>>> cs[3]
4
>>> cs.at_index(3)
([4, 5, 6], 0)
"""
seen = 0
if index >= 0:
for sequence in self.sequences:
if seen <= index < seen + len(sequence):
return sequence, index - seen
seen += len(sequence)
else:
for sequence in reversed(self.sequences):
if seen >= index > seen - len(sequence):
return sequence, index - seen
seen -= len(sequence)
raise IndexError(index)
def __getitem__(self, index):
if isinstance(index, slice):
return list(iter(self))[index]
sequence, index = self.at_index(index)
return sequence[index]
def __len__(self):
return sum(map(len, self.sequences))
def __iter__(self):
return chain.from_iterable(self.sequences)
def __reversed__(self):
return chain.from_iterable(reversed(map(reversed, self.sequences)))
def __eq__(self, other):
if isinstance(other, list):
return list(self) == other
elif isinstance(other, self.__class__):
return self.sequences == other.sequences
return False
def __ne__(self, other):
return not self == other
__hash__ = None
def __mul__(self, times):
if not isinstance(times, int):
return NotImplemented
return list(self) * times
def __rmul__(self, times):
if not isinstance(times, int):
return NotImplemented
return times * list(self)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.sequences)
[docs]class CombinedList(CombinedSequence):
"""
A list combining other lists.
.. versionadded:: 0.5
"""
[docs] def count(self, item):
"""
Returns the number of occurrences of the given `item`.
"""
return sum(sequence.count(item) for sequence in self.sequences)
[docs] def index(self, item, start=None, stop=None):
"""
Returns the index of the first occurence of the given `item` between
`start` and `stop`.
"""
start = 0 if start is None else start
for index, it in enumerate(self[start:stop]):
if item == it:
return index + start
raise ValueError('%r not in list' % item)
def __setitem__(self, index, item):
if isinstance(index, slice):
start = 0 if index.start is None else index.start
stop = len(self) if index.stop is None else index.stop
step = 1 if index.step is None else index.step
for index, item in zip(range(start, stop, step), item):
self[index] = item
else:
list, index = self.at_index(index)
list[index] = item
def __delitem__(self, index):
if isinstance(index, slice):
start = 0 if index.start is None else index.start
stop = len(self) if index.stop is None else index.stop
step = 1 if index.step is None else index.step
for list, index in map(self.at_index, range(start, stop, step)):
del list[index]
else:
list, index = self.at_index(index)
del list[index]
[docs] def append(self, item):
"""
Appends the given `item` to the end of the list.
"""
self.sequences[-1].append(item)
[docs] def extend(self, items):
"""
Extends the list by appending from the given iterable.
"""
self.sequences[-1].extend(items)
[docs] def insert(self, index, item):
"""
Inserts the given `item` before the item at the given `index`.
"""
list, index = self.at_index(index)
list.insert(index, item)
[docs] def pop(self, index=-1):
"""
Removes and returns the item at the given `index`.
An :exc:`IndexError` is raised if the index is out of range.
"""
list, index = self.at_index(index)
return list.pop(index)
[docs] def remove(self, item):
"""
Removes the first occurence of the given `item` from the list.
"""
for sequence in self.sequences:
try:
return sequence.remove(item)
except ValueError:
# we may find a value in the next sequence
pass
raise ValueError('%r not in list' % item)
def _set_values(self, values):
lengths = map(len, self.sequences)
previous_length = 0
for length in lengths:
stop = previous_length + length
self[previous_length:stop] = values[previous_length:stop]
previous_length += length
[docs] def reverse(self):
"""
Reverses the list in-place::
>>> a = [1, 2, 3]
>>> b = [4, 5, 6]
>>> l = CombinedList([a, b])
>>> l.reverse()
>>> a
[6, 5, 4]
"""
self._set_values(self[::-1])
[docs] def sort(self, cmp=None, key=None, reverse=False):
"""
Sorts the list in-place, see :meth:`list.sort`.
"""
self._set_values(sorted(self, cmp, key, reverse))
[docs]def namedtuple(typename, field_names, verbose=False, rename=False, doc=None):
"""
Returns a :class:`tuple` subclass named `typename` with a limited number
of possible items who are accessible under their field name respectively.
Due to the implementation `typename` as well as all `field_names` have to
be valid python identifiers also the names used in `field_names` may not
repeat themselves.
You can solve the latter issue for `field_names` by passing ``rename=True``,
any given name which is either a keyword or a repetition is then replaced
with `_n` where `n` is an integer increasing with every rename starting by
1.
:func:`namedtuple` creates the code for the subclass and executes it
internally you can view that code by passing ``verbose==True``, which will
print the code.
Unlike :class:`tuple` a named tuple provides several methods as helpers:
.. class:: SomeNamedTuple(foo, bar)
.. classmethod:: _make(iterable)
Returns a :class:`SomeNamedTuple` populated with the items from the
given `iterable`.
.. method:: _asdict()
Returns a :class:`dict` mapping the field names to their values.
.. method:: _replace(**kwargs)
Returns a :class:`SomeNamedTuple` values replaced with the given
ones::
>>> t = SomeNamedTuple(1, 2)
>>> t._replace(bar=3)
SomeNamedTuple(foo=1, bar=3)
# doctest: DEACTIVATE
.. note::
:func:`namedtuple` is compatible with :func:`collections.namedtuple`.
.. versionadded:: 0.5
"""
def name_generator():
for i in count(1):
yield '_%d' % i
make_name = name_generator().next
if iskeyword(typename):
raise ValueError('the given typename is a keyword: %s' % typename)
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split()
real_field_names = []
seen_names = set()
for name in field_names:
if iskeyword(name):
if rename:
name = make_name()
else:
raise ValueError('a given field name is a keyword: %s' % name)
elif name in seen_names:
if rename:
name = make_name()
else:
raise ValueError('a field name has been repeated: %s' % name)
real_field_names.append(name)
seen_names.add(name)
code = textwrap.dedent("""
from operator import itemgetter
class %(typename)s(tuple):
'''%(docstring)s'''
_fields = %(fields)s
@classmethod
def _make(cls, iterable):
result = tuple.__new__(cls, iterable)
if len(result) > %(field_count)d:
raise TypeError(
'expected %(field_count)d arguments, got %%d' %% len(result)
)
return result
def __new__(cls, %(fieldnames)s):
return tuple.__new__(cls, (%(fieldnames)s))
def _asdict(self):
return dict(zip(self._fields, self))
def _replace(self, **kwargs):
result = self._make(map(kwargs.pop, %(fields)s, self))
if kwargs:
raise ValueError(
'got unexpected arguments: %%r' %% kwargs.keys()
)
return result
def __getnewargs__(self):
return tuple(self)
def __repr__(self):
return '%(typename)s(%(reprtext)s)' %% self
""") % {
'typename': typename,
'fields': repr(tuple(real_field_names)),
'fieldnames': ', '.join(real_field_names),
'field_count': len(real_field_names),
'reprtext': ', '.join(name + '=%r' for name in real_field_names),
'docstring': doc or typename + '(%s)' % ', '.join(real_field_names)
}
for i, name in enumerate(real_field_names):
code += ' %s = property(itemgetter(%d))\n' % (name, i)
if verbose:
print code
namespace = {}
# there should never occur an exception here but if one does I'd rather
# have the source to see what is going on
try:
exec code in namespace
except SyntaxError, e: # pragma: no cover
raise SyntaxError(e.args[0] + ':\n' + code)
result = namespace[typename]
return result
__all__ = ['LazyList', 'CombinedSequence', 'CombinedList', 'namedtuple']