kafka.consumer package¶
Submodules¶
kafka.consumer.base module¶
- class kafka.consumer.base.Consumer(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000)[source]¶
Bases: object
Base class to be used by other consumers. Not to be used directly
This base class provides logic for
- initialization and fetching metadata of partitions
- Auto-commit logic
- APIs for fetching pending message count
- commit(partitions=None)[source]¶
Commit offsets for this consumer
Keyword Arguments: partitions (list) – list of partitions to commit, default is to commit all of them
kafka.consumer.kafka module¶
- class kafka.consumer.kafka.KafkaConsumer(*topics, **configs)[source]¶
Bases: object
A simpler kafka consumer
# A very basic 'tail' consumer, with no stored offset management kafka = KafkaConsumer('topic1', metadata_broker_list=['localhost:9092']) for m in kafka: print m # Alternate interface: next() print kafka.next() # Alternate interface: batch iteration while True: for m in kafka.fetch_messages(): print m print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset # management kafka = KafkaConsumer('topic1', 'topic2', metadata_broker_list=['localhost:9092'], group_id='my_consumer_group', auto_commit_enable=True, auto_commit_interval_ms=30 * 1000, auto_offset_reset='smallest') # Infinite iteration for m in kafka: process_message(m) kafka.task_done(m) # Alternate interface: next() m = kafka.next() process_message(m) kafka.task_done(m) # If auto_commit_enable is False, remember to commit() periodically kafka.commit() # Batch process interface while True: for m in kafka.fetch_messages(): process_message(m) kafka.task_done(m)
messages (m) are namedtuples with attributes:
- m.topic: topic name (str)
- m.partition: partition number (int)
- m.offset: message offset on topic-partition log (int)
- m.key: key (bytes - can be None)
- m.value: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor, otherwise defaults will be used:
client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, consumer_timeout_ms=-1
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
- commit()[source]¶
Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
Note: this functionality requires server version >=0.8.1.1 See this wiki page.
- configure(**configs)[source]¶
Configuration settings can be passed to constructor, otherwise defaults will be used:
client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, auto_commit_interval_messages=None, consumer_timeout_ms=-1
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
- fetch_messages()[source]¶
Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class
Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
Key configuration parameters:
- fetch_message_max_bytes
- fetch_max_wait_ms
- fetch_min_bytes
- deserializer_class
- auto_offset_reset
- get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)[source]¶
Request available fetch offsets for a single topic/partition
Parameters: - (str) (topic) –
- (int) (max_num_offsets) –
- request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
- (int) –
Returns: offsets (list)
- next()[source]¶
Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely
Note that this is also the method called internally during iteration:
for m in consumer: pass
- offsets(group=None)[source]¶
Keyword Arguments: group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups. Returns: A copy of internal offsets struct
- set_topic_partitions(*topics)[source]¶
Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
- class kafka.consumer.kafka.OffsetsStruct¶
Bases: tuple
OffsetsStruct(fetch, highwater, commit, task_done)
- __getnewargs__()¶
Return self as a plain tuple. Used by copy and pickle.
- __getstate__()¶
Exclude the OrderedDict from pickling
- __repr__()¶
Return a nicely formatted representation string
- commit¶
Alias for field number 2
- fetch¶
Alias for field number 0
- highwater¶
Alias for field number 1
- task_done¶
Alias for field number 3
kafka.consumer.multiprocess module¶
- class kafka.consumer.multiprocess.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)[source]¶
Bases: kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique
- topic – the topic to consume
Keyword Arguments: - auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
- partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
- get_messages(count=1, block=True, timeout=10)[source]¶
Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till some messages are fetched.
- timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
kafka.consumer.simple module¶
- class kafka.consumer.simple.FetchContext(consumer, block, timeout)[source]¶
Bases: object
Class for managing the state of a consumer during fetch
- class kafka.consumer.simple.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)[source]¶
Bases: kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- fetch_size_bytes – number of bytes to request in a FetchRequest
- buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
- max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
- iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
- get_messages(count=1, block=True, timeout=0.1)[source]¶
Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till some messages are fetched.
- timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
- seek(offset, whence)[source]¶
Alter the current offset in the consumer, similar to fseek
Parameters: - offset – how much to modify the offset
- whence –
where to modify it from
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
Module contents¶
- class kafka.consumer.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)¶
Bases: kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique
- topic – the topic to consume
Keyword Arguments: - partitions – An optional list of partitions to consume the data from
- auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- fetch_size_bytes – number of bytes to request in a FetchRequest
- buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
- max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
- iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
- get_message(block=True, timeout=0.1, get_partition_info=None)¶
- get_messages(count=1, block=True, timeout=0.1)¶
Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till some messages are fetched.
- timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
- provide_partition_info()¶
Indicates that partition info must be returned by the consumer
- seek(offset, whence)¶
Alter the current offset in the consumer, similar to fseek
Parameters: - offset – how much to modify the offset
- whence –
where to modify it from
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
- class kafka.consumer.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)¶
Bases: kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
Parameters: - client – a connected KafkaClient
- group – a name for this consumer, used for offset storage and must be unique
- topic – the topic to consume
Keyword Arguments: - auto_commit – default True. Whether or not to auto commit the offsets
- auto_commit_every_n – default 100. How many messages to consume before a commit
- auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
- num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
- partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
- __iter__()¶
Iterator to consume the messages available on this consumer
- get_messages(count=1, block=True, timeout=10)¶
Fetch the specified number of messages
Keyword Arguments: - count – Indicates the maximum number of messages to be fetched
- block – If True, the API will block till some messages are fetched.
- timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
- stop()¶
- class kafka.consumer.KafkaConsumer(*topics, **configs)¶
Bases: object
A simpler kafka consumer
# A very basic 'tail' consumer, with no stored offset management kafka = KafkaConsumer('topic1', metadata_broker_list=['localhost:9092']) for m in kafka: print m # Alternate interface: next() print kafka.next() # Alternate interface: batch iteration while True: for m in kafka.fetch_messages(): print m print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset # management kafka = KafkaConsumer('topic1', 'topic2', metadata_broker_list=['localhost:9092'], group_id='my_consumer_group', auto_commit_enable=True, auto_commit_interval_ms=30 * 1000, auto_offset_reset='smallest') # Infinite iteration for m in kafka: process_message(m) kafka.task_done(m) # Alternate interface: next() m = kafka.next() process_message(m) kafka.task_done(m) # If auto_commit_enable is False, remember to commit() periodically kafka.commit() # Batch process interface while True: for m in kafka.fetch_messages(): process_message(m) kafka.task_done(m)
messages (m) are namedtuples with attributes:
- m.topic: topic name (str)
- m.partition: partition number (int)
- m.offset: message offset on topic-partition log (int)
- m.key: key (bytes - can be None)
- m.value: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor, otherwise defaults will be used:
client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, consumer_timeout_ms=-1
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
- commit()¶
Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
Note: this functionality requires server version >=0.8.1.1 See this wiki page.
- configure(**configs)¶
Configuration settings can be passed to constructor, otherwise defaults will be used:
client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, auto_commit_interval_messages=None, consumer_timeout_ms=-1
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
- fetch_messages()¶
Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class
Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
Key configuration parameters:
- fetch_message_max_bytes
- fetch_max_wait_ms
- fetch_min_bytes
- deserializer_class
- auto_offset_reset
- get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)¶
Request available fetch offsets for a single topic/partition
Parameters: - (str) (topic) –
- (int) (max_num_offsets) –
- request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
- (int) –
Returns: offsets (list)
- next()¶
Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely
Note that this is also the method called internally during iteration:
for m in consumer: pass
- offsets(group=None)¶
Keyword Arguments: group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups. Returns: A copy of internal offsets struct
- set_topic_partitions(*topics)¶
Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
- task_done(message)¶
Mark a fetched message as consumed. Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()