Behaviours: gen_server.
Kafka consumers work in poll mode. In brod, brod_consumer is the poller,
which is constantly asking for more data from the kafka node which is a leader
for the given partition.
By subscribing to brod_consumer a process should receive the polled message
sets (not individual messages) into its mailbox. Shape of the message is
documented at brod:subscribe/5.
Messages processed by the subscriber has to be acked by calling
ack/2 (or brod:consume_ack/4) to notify the consumer
that all messages before the acknowledged offsets are processed,
hence more messages can be fetched and sent to the subscriber and the
subscriber won't be overwhelmed by it.
Each consumer can have only one subscriber.
See the overview for some more information and examples.config() = brod:consumer_config()
isolation_level() = kpro:isolation_level()
offset_reset_policy() = reset_by_subscriber | reset_to_earliest | reset_to_latest
partition() = brod:partition()
topic() = brod:topic()
| ack/2 | Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages. |
| debug/2 | Enable/disable debugging on the consumer process. |
| get_connection/1 | Get connection pid. |
| init/1 | |
| start_link/4 | Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
|
| start_link/5 | Start (link) a partition consumer. |
| stop/1 | |
| stop_maybe_kill/2 | |
| subscribe/3 | Subscribe or resubscribe on messages from a partition. |
| unsubscribe/2 | Unsubscribe the current subscriber. |
ack(Pid::pid(), Offset::brod:offset()) -> ok
Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
debug(Pid::pid(), File::print | string() | none) -> ok
Enable/disable debugging on the consumer process.
debug(Pid, print) prints debug info to stdout.
debug(Pid, File) prints debug info to a file File.
get_connection(Pid) -> any()
Get connection pid. Test/debug only.
init(X1) -> any()
start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config()) -> {ok, pid()} | {error, any()}
Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config(), Debug::[any()]) -> {ok, pid()} | {error, any()}
Start (link) a partition consumer.
Possible configs:min_bytes (optional, default = 0)
max_bytes (optional, default = 1MB)
Maximum bytes to fetch in a batch of messages.
NOTE: this value might be expanded to retry when it is not enough to fetch even a single message, then slowly shrunk back to the given value.max_wait_time (optional, default = 10000 ms)
min_bytes of messages in fetch responsesleep_timeout (optional, default = 1000 ms)
prefetch_count (optional, default = 10)
prefetch_bytes (optional, default = 100KB)
brod_consumer is greed, it only stops fetching more messages in
when number of unacked messages has exceeded prefetch_count AND
the unacked total volume has exceeded prefetch_bytesbegin_offset (optional, default = latest)
last_processed_offset + 1 as the begin_offset
to proceed. The offset has to already exist at the time of calling.offset_reset_policy (optional, default = reset_by_subscriber)
How to reset begin_offset if OffsetOutOfRange exception is received.
reset_by_subscriber: consumer is suspended
(is_suspended=true in state) and wait
for subscriber to re-subscribe with a new
begin_offset option.
reset_to_earliest: consume from the earliest offset.
reset_to_latest: consume from the last available offset.size_stat_window: (optional, default = 5)
max_bytes in
fetch requests after it has been expanded to fetch a large
message. Use 0 to immediately shrink back to original
max_bytes from config. A size estimation allows users to set
a relatively small max_bytes, then let it dynamically adjust
to a number around PrefetchCount * AverageSizeisolation_level: (optional, default = read_committed)
read_uncommitted to retrieve
all records, independently on the transaction outcome (if any),
and read_committed to get only the records from committed
transactionsshare_leader_conn: (optional, default = false)
true to consume less TCP connections towards Kafka,
but may lead to higher fetch latency. This is because Kafka can
ony accumulate messages for the oldest fetch request, later
requests behind it may get blocked until max_wait_time expires
for the oldest onestop(Pid::pid()) -> ok | {error, any()}
stop_maybe_kill(Pid::pid(), Timeout::timeout()) -> ok
subscribe(Pid::pid(), SubscriberPid::pid(), ConsumerOptions::config()) -> ok | {error, any()}
Subscribe or resubscribe on messages from a partition.
Caller may specify a set of options extending consumer config.
It is possible to update parameters such as max_bytes and
max_wait_time, or the starting point (begin_offset) of the data
stream. Note that you currently cannot update isolation_level.
start_link/5.
unsubscribe(Pid::pid(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Generated by EDoc