kafka.consumer package

Submodules

kafka.consumer.base module

class kafka.consumer.base.Consumer(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000)[source]

Bases: object

Base class to be used by other consumers. Not to be used directly

This base class provides logic for

  • initialization and fetching metadata of partitions
  • Auto-commit logic
  • APIs for fetching pending message count
commit(partitions=None)[source]

Commit offsets for this consumer

Keyword Arguments:
 partitions (list) – list of partitions to commit, default is to commit all of them
fetch_last_known_offsets(partitions=None)[source]
pending(partitions=None)[source]

Gets the pending message count

Keyword Arguments:
 partitions (list) – list of partitions to check for, default is to check all
stop()[source]

kafka.consumer.kafka module

class kafka.consumer.kafka.KafkaConsumer(*topics, **configs)[source]

Bases: object

A simpler kafka consumer

# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1',
                      metadata_broker_list=['localhost:9092'])
for m in kafka:
  print m

# Alternate interface: next()
print kafka.next()

# Alternate interface: batch iteration
while True:
  for m in kafka.fetch_messages():
    print m
  print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset
# management
kafka = KafkaConsumer('topic1', 'topic2',
                      metadata_broker_list=['localhost:9092'],
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

# Infinite iteration
for m in kafka:
  process_message(m)
  kafka.task_done(m)

# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)

# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()

# Batch process interface
while True:
  for m in kafka.fetch_messages():
    process_message(m)
    kafka.task_done(m)

messages (m) are namedtuples with attributes:

  • m.topic: topic name (str)
  • m.partition: partition number (int)
  • m.offset: message offset on topic-partition log (int)
  • m.key: key (bytes - can be None)
  • m.value: message (output of deserializer_class - default is raw bytes)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

commit()[source]

Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.

Note: this functionality requires server version >=0.8.1.1 See this wiki page.

configure(**configs)[source]

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

fetch_messages()[source]

Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class

Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy

Key configuration parameters:

  • fetch_message_max_bytes
  • fetch_max_wait_ms
  • fetch_min_bytes
  • deserializer_class
  • auto_offset_reset
get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)[source]

Request available fetch offsets for a single topic/partition

Parameters:
  • (str) (topic) –
  • (int) (max_num_offsets) –
  • request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
  • (int)
Returns:

offsets (list)

next()[source]

Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely

Note that this is also the method called internally during iteration:

for m in consumer:
    pass
offsets(group=None)[source]
Keyword Arguments:
 group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups.
Returns:A copy of internal offsets struct
set_topic_partitions(*topics)[source]

Set the topic/partitions to consume Optionally specify offsets to start from

Accepts types:

  • str (utf-8): topic name (will consume all available partitions)

  • tuple: (topic, partition)

  • dict:
    • { topic: partition }
    • { topic: [partition list] }
    • { topic: (partition tuple,) }

Optionally, offsets can be specified directly:

  • tuple: (topic, partition, offset)
  • dict: { (topic, partition): offset, ... }

Example:

kafka = KafkaConsumer()

# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})

# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))

# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
task_done(message)[source]

Mark a fetched message as consumed. Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()

class kafka.consumer.kafka.OffsetsStruct

Bases: tuple

OffsetsStruct(fetch, highwater, commit, task_done)

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

__repr__()

Return a nicely formatted representation string

commit

Alias for field number 2

fetch

Alias for field number 0

highwater

Alias for field number 1

task_done

Alias for field number 3

kafka.consumer.multiprocess module

class kafka.consumer.multiprocess.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)[source]

Bases: kafka.consumer.base.Consumer

A consumer implementation that consumes partitions for a topic in parallel using multiple processes

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
  • partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

__iter__()[source]

Iterator to consume the messages available on this consumer

get_messages(count=1, block=True, timeout=10)[source]

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
stop()[source]

kafka.consumer.simple module

class kafka.consumer.simple.FetchContext(consumer, block, timeout)[source]

Bases: object

Class for managing the state of a consumer during fetch

__enter__()[source]

Set fetch values based on blocking status

__exit__(type, value, traceback)[source]

Reset values

class kafka.consumer.simple.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)[source]

Bases: kafka.consumer.base.Consumer

A simple consumer implementation that consumes all/specified partitions for a topic

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • partitions – An optional list of partitions to consume the data from
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • fetch_size_bytes – number of bytes to request in a FetchRequest
  • buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
  • max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
  • iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

get_message(block=True, timeout=0.1, get_partition_info=None)[source]
get_messages(count=1, block=True, timeout=0.1)[source]

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
provide_partition_info()[source]

Indicates that partition info must be returned by the consumer

seek(offset, whence)[source]

Alter the current offset in the consumer, similar to fseek

Parameters:
  • offset – how much to modify the offset
  • whence

    where to modify it from

    • 0 is relative to the earliest available offset (head)
    • 1 is relative to the current offset
    • 2 is relative to the latest known offset (tail)

Module contents

class kafka.consumer.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)

Bases: kafka.consumer.base.Consumer

A simple consumer implementation that consumes all/specified partitions for a topic

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • partitions – An optional list of partitions to consume the data from
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • fetch_size_bytes – number of bytes to request in a FetchRequest
  • buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
  • max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
  • iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

get_message(block=True, timeout=0.1, get_partition_info=None)
get_messages(count=1, block=True, timeout=0.1)

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
provide_partition_info()

Indicates that partition info must be returned by the consumer

seek(offset, whence)

Alter the current offset in the consumer, similar to fseek

Parameters:
  • offset – how much to modify the offset
  • whence

    where to modify it from

    • 0 is relative to the earliest available offset (head)
    • 1 is relative to the current offset
    • 2 is relative to the latest known offset (tail)
class kafka.consumer.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)

Bases: kafka.consumer.base.Consumer

A consumer implementation that consumes partitions for a topic in parallel using multiple processes

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
  • partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

__iter__()

Iterator to consume the messages available on this consumer

get_messages(count=1, block=True, timeout=10)

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
stop()
class kafka.consumer.KafkaConsumer(*topics, **configs)

Bases: object

A simpler kafka consumer

# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1',
                      metadata_broker_list=['localhost:9092'])
for m in kafka:
  print m

# Alternate interface: next()
print kafka.next()

# Alternate interface: batch iteration
while True:
  for m in kafka.fetch_messages():
    print m
  print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset
# management
kafka = KafkaConsumer('topic1', 'topic2',
                      metadata_broker_list=['localhost:9092'],
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

# Infinite iteration
for m in kafka:
  process_message(m)
  kafka.task_done(m)

# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)

# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()

# Batch process interface
while True:
  for m in kafka.fetch_messages():
    process_message(m)
    kafka.task_done(m)

messages (m) are namedtuples with attributes:

  • m.topic: topic name (str)
  • m.partition: partition number (int)
  • m.offset: message offset on topic-partition log (int)
  • m.key: key (bytes - can be None)
  • m.value: message (output of deserializer_class - default is raw bytes)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

commit()

Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.

Note: this functionality requires server version >=0.8.1.1 See this wiki page.

configure(**configs)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

fetch_messages()

Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class

Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy

Key configuration parameters:

  • fetch_message_max_bytes
  • fetch_max_wait_ms
  • fetch_min_bytes
  • deserializer_class
  • auto_offset_reset
get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)

Request available fetch offsets for a single topic/partition

Parameters:
  • (str) (topic) –
  • (int) (max_num_offsets) –
  • request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
  • (int)
Returns:

offsets (list)

next()

Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely

Note that this is also the method called internally during iteration:

for m in consumer:
    pass
offsets(group=None)
Keyword Arguments:
 group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups.
Returns:A copy of internal offsets struct
set_topic_partitions(*topics)

Set the topic/partitions to consume Optionally specify offsets to start from

Accepts types:

  • str (utf-8): topic name (will consume all available partitions)

  • tuple: (topic, partition)

  • dict:
    • { topic: partition }
    • { topic: [partition list] }
    • { topic: (partition tuple,) }

Optionally, offsets can be specified directly:

  • tuple: (topic, partition, offset)
  • dict: { (topic, partition): offset, ... }

Example:

kafka = KafkaConsumer()

# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})

# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))

# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
task_done(message)

Mark a fetched message as consumed. Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()