"""
Implement the base channel construct that is used by rabbitpy objects
such as :py:class:`Exchange <rabbitpy.Exchange>` or
:py:class:`Exchange <rabbitpy.Queue>`. It is responsible for coordinating
the communication between the IO thread and the higher-level objects.
"""
import logging
try:
import queue
except ImportError:
import Queue as queue
from pamqp import specification
from pamqp import PYTHON3
from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import message
LOGGER = logging.getLogger(__name__)
[docs]class Channel(base.AMQPChannel):
"""The Channel object is the communications object used by Exchanges,
Messages, Queues, and Transactions. It is created by invoking the
:py:meth:`rabbitpy.Connection.channel()
<rabbitpy.connection.Connection.channel>` method. It can act as a context
manager, allowing for quick shorthand use:
.. code:: python
with connection.channel():
# Do something
To create a new channel, invoke
py:meth:`rabbitpy.connection.Connection.channel`
"""
DEFAULT_CLOSE_CODE = 200
DEFAULT_CLOSE_REASON = 'Normal Shutdown'
REMOTE_CLOSED = 0x04
STATES = base.AMQPChannel.STATES
STATES[0x04] = 'Remotely Closed'
def __init__(self, channel_id, server_capabilities, events,
exception_queue, read_queue, write_queue,
maximum_frame_size, write_trigger, blocking_read=False):
"""Create a new instance of the Channel class
To improve performance, pass blocking_read to True. Note that doing
so prevents KeyboardInterrupt/CTRL-C from exiting the Python
interpreter.
:param int channel_id: The channel # to use for this instance
:param dict server_capabilities: Features the server supports
:param events rabbitpy.Events: Event management object
:param queue.Queue exception_queue: Exception queue
:param queue.Queue read_queue: Queue to read pending frames from
:param queue.Queue write_queue: Queue to write pending AMQP objs to
:param int maximum_frame_size: The max frame size for msg bodies
:param socket write_trigger: Write to this socket to break IO waiting
:param bool blocking: Use blocking Queue.get to improve performance
"""
super(Channel, self).__init__(exception_queue, write_trigger,
blocking_read)
self._channel_id = channel_id
self._consumers = []
self._events = events
self._maximum_frame_size = maximum_frame_size
self._publisher_confirms = False
self._read_queue = read_queue
self._write_queue = write_queue
self._server_capabilities = server_capabilities
def __enter__(self):
"""For use as a context manager, return a handle to this object
instance.
:rtype: Channel
"""
return self
def __exit__(self, exc_type, exc_val, unused_exc_tb):
"""When leaving the context, examine why the context is leaving, if
it's an exception or what.
"""
if exc_type:
self._set_state(self.CLOSED)
raise
if self.open:
self.close()
[docs] def close(self):
"""Close the channel, cancelling any active consumers, purging the read
queue, while looking to see if a Basic.Nack should be sent, sending it
if so.
"""
if self.closed:
LOGGER.debug('Channel %i close invoked when already closed',
self._channel_id)
return
self._set_state(self.CLOSING)
# Empty the queue and nack the max id (and all previous)
if self._consumers:
delivery_tag = 0
discard_counter = 0
ack_tags = []
for queue_obj, no_ack in self._consumers:
self._cancel_consumer(queue_obj)
if not no_ack:
LOGGER.debug('Channel %i will nack messages for %s',
self._channel_id, queue_obj.consumer_tag)
ack_tags.append(queue_obj.consumer_tag)
# If there are any ack tags, get the last msg to nack
if ack_tags:
while not self._read_queue.empty():
frame_value = self._get_from_read_queue()
if not frame_value:
break
if (frame_value.name == 'Basic.Deliver' and
frame_value.consumer_tag in ack_tags):
if delivery_tag < frame_value.delivery_tag:
delivery_tag = frame_value.delivery_tag
discard_counter += 1
if delivery_tag:
self._multi_nack(delivery_tag)
super(Channel, self).close()
[docs] def enable_publisher_confirms(self):
"""Turn on Publisher Confirms. If confirms are turned on, the
Message.publish command will return a bool indicating if a message has
been successfully published.
"""
if not self._supports_publisher_confirms:
raise exceptions.NotSupportedError('Confirm.Select')
self.rpc(specification.Confirm.Select())
self._publisher_confirms = True
@property
def id(self):
"""Return the channel id
:rtype: int
"""
return self._channel_id
@property
def maximum_frame_size(self):
return self._maximum_frame_size
[docs] def open(self):
"""Open the channel, invoked directly upon creation by the Connection
"""
self._set_state(self.OPENING)
self._write_frame(self._build_open_frame())
self._wait_on_frame(specification.Channel.OpenOk)
self._set_state(self.OPEN)
LOGGER.debug('Channel #%i open', self._channel_id)
[docs] def prefetch_count(self, value, all_channels=False):
"""Set a prefetch count for the channel (or all channels on the same
connection).
:param int value: The prefetch count to set
:param bool all_channels: Set the prefetch count on all channels on the
same connection
"""
self.rpc(specification.Basic.Qos(prefetch_count=value,
global_=all_channels))
[docs] def prefetch_size(self, value, all_channels=False):
"""Set a prefetch size in bytes for the channel (or all channels on the
same connection).
:param int value: The prefetch size to set
:param bool all_channels: Set the prefetch size on all channels on the
same connection
"""
if value is None:
return
self.rpc(specification.Basic.Qos(prefetch_count=value,
global_=all_channels))
@property
def publisher_confirms(self):
"""Returns True if publisher confirms are enabled.
:rtype: bool
"""
return self._publisher_confirms
[docs] def recover(self, requeue=False):
"""Recover all unacknowledged messages that are associated with this
channel.
:param bool requeue: Requeue the message
"""
self.rpc(specification.Basic.Recover(requeue=requeue))
@staticmethod
def _build_open_frame():
"""Build and return a channel open frame
:rtype: pamqp.specification.Channel.Open
"""
return specification.Channel.Open()
def _cancel_consumer(self, obj):
"""Cancel the consuming of a queue.
:param rabbitpy.amqp_queue.Queue obj: The queue to cancel
"""
frame_value = specification.Basic.Cancel(consumer_tag=obj.consumer_tag)
self._write_frame(frame_value)
if not self.closed:
self._wait_on_frame(specification.Basic.CancelOk)
LOGGER.debug('Basic.CancelOk received')
def _check_for_rpc_request(self, value):
"""Inspect a frame to see if it's a RPC request from RabbitMQ.
:param specification.Frame value:
"""
if isinstance(value, specification.Channel.Close):
self._on_remote_close(value)
elif isinstance(value, specification.Basic.Cancel):
pass
elif isinstance(value, specification.Basic.Return):
self._on_basic_return(self._wait_for_content_frames(value))
def _consume(self, obj, no_ack, priority):
"""Register a Queue object as a consumer, issuing Basic.Consume.
:param rabbitpy.amqp_queue.Queue obj: The queue to consume
:param bool no_ack: no_ack mode
:param int priority: Consumer priority
:raises: ValueError
"""
args = dict()
if priority is not None:
if not self._supports_consumer_priorities:
raise exceptions.NotSupportedError('consumer_priorities')
if not isinstance(priority, int):
raise ValueError('Consumer priority must be an int')
args['x-priority'] = priority
self.rpc(specification.Basic.Consume(queue=obj.name,
consumer_tag=obj.consumer_tag,
no_ack=no_ack,
arguments=args))
self._consumers.append((obj, no_ack))
def _consume_message(self):
"""Get a message from the stack, blocking while doing so. If a consumer
is cancelled out-of-band, we will receive a Basic.CancelOk
instead.
:rtype: rabbitpy.message.Message
"""
frame_value = self._wait_on_frame([specification.Basic.Deliver,
specification.Basic.CancelOk])
if isinstance(frame_value, specification.Basic.CancelOk):
return None
return self._wait_for_content_frames(frame_value)
def _create_message(self, method_frame, header_frame, body):
"""Create a message instance with the channel it was received on and
the dictionary of message parts. Will return None if no message can be
created.
:param pamqp.specification.Frame method_frame: The method frame value
:param header_frame: Header frame value
:type header_frame: pamqp.header.ContentHeader or None
:param body: The message body
:type body: str or None
:rtype: rabbitpy.message.Message or None
"""
if not method_frame:
LOGGER.warning('Received empty method_frame, returning None')
return None
if not header_frame:
LOGGER.debug('Malformed header frame: %r', header_frame)
props = header_frame.properties.to_dict() if header_frame else dict()
msg = message.Message(self, body, props)
msg.method = method_frame
msg.name = method_frame.name
return msg
def _get_from_read_queue(self):
"""Fetch a frame from the read queue and return it, otherwise return
None
:rtype: pamqp.specification.Frame
"""
try:
frame_value = self._read_queue.get(False)
self._read_queue.task_done()
except queue.Empty:
return None
return frame_value
def _get_message(self):
"""Try and get a delivered message from the connection's message stack.
:rtype: rabbitpy.message.Message or None
"""
LOGGER.debug('Waiting on GetOk or GetEmpty')
frame_value = self._wait_on_frame([specification.Basic.GetOk,
specification.Basic.GetEmpty])
LOGGER.debug('Returned with %r', frame_value)
if isinstance(frame_value, specification.Basic.GetEmpty):
return None
LOGGER.debug('Waiting on content frames for %r', frame_value)
return self._wait_for_content_frames(frame_value)
def _multi_nack(self, delivery_tag):
"""Send a multiple negative acknowledgement, re-queueing the items
:param int delivery_tag: The delivery tag for this channel
"""
if not self._supports_basic_nack:
raise exceptions.NotSupportedError('Basic.Nack')
LOGGER.debug('Sending Basic.Nack with requeue')
self.rpc(specification.Basic.Nack(delivery_tag=delivery_tag,
multiple=True,
requeue=True))
def _on_basic_return(self, msg):
"""Raise a MessageReturnedException so the publisher can handle
returned messages.
:param pmqid.message.message msg: The message to add
:raises: rabbitpy.exceptions.MessageReturnedException
"""
# Could happen when closing
if not msg:
return
LOGGER.warning('Basic.Return received on channel %i', self._channel_id)
message_id = msg.properties.get('message_id', 'Unknown')
raise exceptions.MessageReturnedException(message_id,
msg.method.reply_code,
msg.method.reply_text)
def _on_remote_close(self, value):
"""Handle RabbitMQ remotely closing the channel
:param value: The Channel.Close method frame
:type value: pamqp.specification.Channel.Close
"""
self._set_state(self.REMOTE_CLOSED)
if value.reply_code in exceptions.AMQP:
LOGGER.error('Received remote close (%s): %s',
value.reply_code, value.reply_text)
raise exceptions.AMQP[value.reply_code](value)
else:
raise exceptions.RemoteClosedChannelException(self._channel_id,
value.reply_code,
value.reply_text)
def _wait_for_confirmation(self):
"""Used by the Message.publish method when publisher confirmations are
enabled.
:rtype: pamqp.frame.Frame
"""
return self._wait_on_frame([specification.Basic.Ack,
specification.Basic.Nack])
def _wait_for_content_frames(self, method_frame):
"""Used by both Channel._get_message and Channel._consume_message for
getting a message parts off the queue and returning the fully
constructed message.
:param method_frame: The method frame for the message
:type method_frame: Basic.Deliver or Basic.Get or Basic.Return
:rtype: rabbitpy.Message
"""
if self.closing or self.closed:
return None
header_value = self._wait_on_frame(['ContentHeader',
specification.Channel.CloseOk])
self._check_for_rpc_request(header_value)
if not header_value:
return self._create_message(method_frame, None, None)
body_value = bytes() if PYTHON3 else str()
while len(body_value) < header_value.body_size:
body_part = self._wait_on_frame(['ContentBody',
specification.Channel.CloseOk])
self._check_for_rpc_request(body_part)
if not body_part:
break
body_value += body_part.value
if len(body_value) == header_value.body_size:
break
if self.closing or self.closed:
return None
return self._create_message(method_frame, header_value, body_value)
@property
def _supports_basic_nack(self):
"""Indicates if the server supports Basic.Nack
:rtype: bool
"""
return self._server_capabilities.get(b'basic_nack', False)
@property
def _supports_consumer_cancel_notify(self):
"""Indicates if the server supports sending consumer cancellation
notifications
:rtype: bool
"""
return self._server_capabilities.get(b'consumer_cancel_notify', False)
@property
def _supports_consumer_priorities(self):
"""Indicates if the server supports consumer priorities
:rtype: bool
"""
return self._server_capabilities.get(b'consumer_priorities', False)
@property
def _supports_per_consumer_qos(self):
"""Indicates if the server supports per consumer qos
:rtype: bool
"""
return self._server_capabilities.get(b'per_consumer_qos', False)
@property
def _supports_publisher_confirms(self):
"""Indicates if the server supports publisher confirmations
:rtype: bool
"""
return self._server_capabilities.get(b'publisher_confirms', False)