Module brod_consumer

Kafka consumers work in poll mode.

Behaviours: gen_server.

Description

Kafka consumers work in poll mode. In brod, brod_consumer is the poller, which is constantly asking for more data from the kafka node which is a leader for the given partition.

By subscribing to brod_consumer a process should receive the polled message sets (not individual messages) into its mailbox. Shape of the message is documented at brod:subscribe/5.

Messages processed by the subscriber has to be acked by calling ack/2 (or brod:consume_ack/4) to notify the consumer that all messages before the acknowledged offsets are processed, hence more messages can be fetched and sent to the subscriber and the subscriber won't be overwhelmed by it.

Each consumer can have only one subscriber.

See the overview for some more information and examples.

Data Types

config()

config() = brod:consumer_config()

isolation_level()

isolation_level() = kpro:isolation_level()

offset_reset_policy()

offset_reset_policy() = reset_by_subscriber | reset_to_earliest | reset_to_latest

partition()

partition() = brod:partition()

topic()

topic() = brod:topic()

Function Index

ack/2Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.
debug/2Enable/disable debugging on the consumer process.
get_connection/1Get connection pid.
init/1
start_link/4Equivalent to start_link(ClientPid, Topic, Partition, Config, []).
start_link/5Start (link) a partition consumer.
stop/1
stop_maybe_kill/2
subscribe/3Subscribe or resubscribe on messages from a partition.
unsubscribe/2Unsubscribe the current subscriber.

Function Details

ack/2

ack(Pid::pid(), Offset::brod:offset()) -> ok

Subscriber confirms that a message (identified by offset) has been consumed, consumer process now may continue to fetch more messages.

debug/2

debug(Pid::pid(), File::print | string() | none) -> ok

Enable/disable debugging on the consumer process.

debug(Pid, print) prints debug info to stdout.

debug(Pid, File) prints debug info to a file File.

get_connection/1

get_connection(Pid) -> any()

Get connection pid. Test/debug only.

init/1

init(X1) -> any()

start_link/4

start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config()) -> {ok, pid()} | {error, any()}

Equivalent to start_link(ClientPid, Topic, Partition, Config, []).

start_link/5

start_link(Bootstrap::pid() | brod:bootstrap(), Topic::topic(), Partition::partition(), Config::config(), Debug::[any()]) -> {ok, pid()} | {error, any()}

Start (link) a partition consumer.

Possible configs:

stop/1

stop(Pid::pid()) -> ok | {error, any()}

stop_maybe_kill/2

stop_maybe_kill(Pid::pid(), Timeout::timeout()) -> ok

subscribe/3

subscribe(Pid::pid(), SubscriberPid::pid(), ConsumerOptions::config()) -> ok | {error, any()}

Subscribe or resubscribe on messages from a partition.

Caller may specify a set of options extending consumer config. It is possible to update parameters such as max_bytes and max_wait_time, or the starting point (begin_offset) of the data stream. Note that you currently cannot update isolation_level.

Possible options are documented at start_link/5.

unsubscribe/2

unsubscribe(Pid::pid(), SubscriberPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.


Generated by EDoc