Queue¶
The Queue
class is used to work with RabbitMQ queues on an open channel. The following example shows how you can create a queue using the Queue.declare
method.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'my-queue')
queue.durable = True
queue.declare()
To consume messages you can iterate over the Queue object itself if the defaults for the Queue.__iter__()
method work for your needs:
with conn.channel() as channel:
for message in rabbitpy.Queue(channel, 'example'):
print 'Message: %r' % message
message.ack()
or by the Queue.consume()
method if you would like to specify no_ack, prefetch_count, or priority:
with conn.channel() as channel:
queue = rabbitpy.Queue(channel, 'example')
for message in queue.consume():
print 'Message: %r' % message
message.ack()
Warning
If you use either the Queue
as an iterator method or Queue.consume()
method of consuming messages in PyPy,
you must manually invoke Queue.stop_consuming()
. This is due to PyPy not predictably cleaning up after the generator
used for allowing the iteration over messages. Should your code want to test to see if the code is being executed in PyPy,
you can evaluate the boolean rabbitpy.PYPY
constant value.
API Documentation¶
- class rabbitpy.Queue(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]¶
Create and manage RabbitMQ queues.
- Parameters:
channel (
Channel
) – The channel object to communicate onname (str) – The name of the queue
exclusive (bool) – Queue can only be used by this channel and will auto-delete once the channel is closed.
durable (bool) – Indicates if the queue should survive a RabbitMQ is restart
auto_delete (bool) – Automatically delete when all consumers disconnect
max_length (int) – Maximum queue length
message_ttl (int) – Time-to-live of a message in milliseconds
expires (int) – Milliseconds until a queue is removed after becoming idle
dead_letter_exchange (str) – Dead letter exchange for rejected messages
dead_letter_routing_key (str) – Routing key for dead lettered messages
arguments (dict) – Custom arguments for the queue
- Attributes:
consumer_tag (str) – Contains the consumer tag used to register with RabbitMQ. Can be overwritten with custom value prior to consuming.
- Raises:
- Raises:
- __init__(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]¶
Create a new Queue object instance. Only the
rabbitpy.Channel
object is required.Warning
You should only use a single
Queue
instance per channel when consuming or getting messages. Failure to do so can have unintended consequences.
- __iter__()[source]¶
Quick way to consume messages using defaults of
no_ack=False
, prefetch and priority not set.Warning
You should only use a single
Queue
instance per channel when consuming messages. Failure to do so can have unintended consequences.- Yields:
- __len__()[source]¶
Return the pending number of messages in the queue by doing a passive Queue declare.
- Return type:
int
- __setattr__(name, value)[source]¶
Validate the data types for specific attributes when setting them, otherwise fall throw to the parent
__setattr__
- Parameters:
name (str) – The attribute to set
value (mixed) – The value to set
- Raises:
ValueError
- bind(source, routing_key=None, arguments=None)[source]¶
Bind the queue to the specified exchange or routing key.
- Parameters:
source (str or
rabbitpy.exchange.Exchange
exchange) – The exchange to bind torouting_key (str) – The routing key to use
arguments (dict) – Optional arguments for for RabbitMQ
- Returns:
bool
- consume(no_ack=False, prefetch=None, priority=None, consumer_tag=None)[source]¶
Consume messages from the queue as a
generator
:You can use this method instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode.
New in version 0.26.
Warning
You should only use a single
Queue
instance per channel when consuming messages. Failure to do so can have unintended consequences.- Parameters:
no_ack (bool) – Do not require acknowledgements
prefetch (int) – Set a prefetch count for the channel
priority (int) – Consumer priority
consumer_tag (str) – Optional consumer tag
- Return type:
generator
- Raises:
- consume_messages(no_ack=False, prefetch=None, priority=None)[source]¶
Consume messages from the queue as a generator.
Warning
This method is deprecated in favor of
Queue.consume()
and will be removed in future releases.Deprecated since version 0.26.
You can use this message instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode.
- Parameters:
no_ack (bool) – Do not require acknowledgements
prefetch (int) – Set a prefetch count for the channel
priority (int) – Consumer priority
- Return type:
Generator
- Raises:
- consumer(no_ack=False, prefetch=None, priority=None)[source]¶
Method for returning the contextmanager for consuming messages. You should not use this directly.
Warning
This method is deprecated and will be removed in a future release.
Deprecated since version 0.26.
- Parameters:
no_ack (bool) – Do not require acknowledgements
prefetch (int) – Set a prefetch count for the channel
priority (int) – Consumer priority
- Returns:
None
- declare(passive=False)[source]¶
Declare the queue on the RabbitMQ channel passed into the constructor, returning the current message count for the queue and its consumer count as a tuple.
- Parameters:
passive (bool) – Passive declare to retrieve message count and consumer count information
- Returns:
Message count, Consumer count
- Return type:
tuple(int, int)
- delete(if_unused=False, if_empty=False)[source]¶
Delete the queue
- Parameters:
if_unused (bool) – Delete only if unused
if_empty (bool) – Delete only if empty
- get(acknowledge=True)[source]¶
Request a single message from RabbitMQ using the Basic.Get AMQP command.
Warning
You should only use a single
Queue
instance per channel when getting messages. Failure to do so can have unintended consequences.- Parameters:
acknowledge (bool) – Let RabbitMQ know if you will manually acknowledge or negatively acknowledge the message after each get.
- Return type:
Message
or None
- ha_declare(nodes=None)[source]¶
Declare a the queue as highly available, passing in a list of nodes the queue should live on. If no nodes are passed, the queue will be declared across all nodes in the cluster.
- Parameters:
nodes (list) – A list of nodes to declare. If left empty, queue will be declared on all cluster nodes.
- Returns:
Message count, Consumer count
- Return type:
tuple(int, int)
- stop_consuming()[source]¶
Stop consuming messages. This is usually invoked if you want to cancel your consumer from outside the context manager or generator.
If you invoke this, there is a possibility that the generator method will return None instead of a
rabbitpy.Message
.
- unbind(source, routing_key=None)[source]¶
Unbind queue from the specified exchange where it is bound the routing key. If routing key is None, use the queue name.
- Parameters:
source (str or
rabbitpy.exchange.Exchange
exchange) – The exchange to unbind fromrouting_key (str) – The routing key that binds them