Module brod_group_subscriber

A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.

Behaviours: brod_group_member, gen_server.

This module defines the brod_group_subscriber behaviour.
Required callback functions: init/2, handle_message/4.

Description

A group subscriber is a gen_server which subscribes to partition consumers (poller) and calls the user-defined callback functions for message processing.

An overview of what it does behind the scene:
  1. Start a consumer group coordinator to manage the consumer group states, see brod_group_coordinator:start_link/6
  2. Start (if not already started) topic-consumers (pollers) and subscribe to the partition workers when group assignment is received from the group leader, see brod:start_consumer/3
  3. Call CallbackModule:handle_message/4 when messages are received from the partition consumers.
  4. Send acknowledged offsets to group coordinator which will be committed to kafka periodically.
Callbacks are documented in the source code of this module.

Data Types

member_id()

member_id() = brod:group_member_id()

Function Index

ack/4Acknowledge and commit an offset.
ack/5Acknowledge an offset.
assign_partitions/3This function is called only when partition_assignment_strategy is set for callback_implemented in group config.
assignments_received/4Called by group coordinator when there is new assignment received.
assignments_revoked/1Called by group coordinator before re-joining the consumer group.
code_change/3
commit/1Commit all acked offsets.
commit/4Commit offset for a topic.
get_committed_offsets/2Called by group coordinator when initializing the assignments for subscriber.
handle_call/3
handle_cast/2
handle_info/2
init/1
start_link/7Handle a message.
start_link/8Start (link) a group subscriber.
stop/1Stop group subscriber, wait for pid DOWN before return.
terminate/2
user_data/1

Function Details

ack/4

ack(Pid::pid(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset()) -> ok

Acknowledge and commit an offset. The subscriber may ack a later (greater) offset which will be considered as multi-acking the earlier (smaller) offsets. This also means that disordered acks may overwrite offset commits and lead to unnecessary message re-delivery in case of restart.

ack/5

ack(Pid::pid(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset(), Commit::boolean()) -> ok

Acknowledge an offset. This call may or may not commit group subscriber offset depending on the value of Commit argument

assign_partitions/3

assign_partitions(Pid::pid(), Members::[brod:group_member()], TopicPartitionList::[{brod:topic(), brod:partition()}]) -> [{member_id(), [brod:partition_assignment()]}]

This function is called only when partition_assignment_strategy is set for callback_implemented in group config.

assignments_received/4

assignments_received(Pid::pid(), MemberId::member_id(), GenerationId::integer(), TopicAssignments::brod:received_assignments()) -> ok

Called by group coordinator when there is new assignment received.

assignments_revoked/1

assignments_revoked(Pid::pid()) -> ok

Called by group coordinator before re-joining the consumer group.

code_change/3

code_change(OldVsn, State, Extra) -> any()

commit/1

commit(Pid::pid()) -> ok

Commit all acked offsets. NOTE: This is an async call.

commit/4

commit(Pid::pid(), Topic::brod:topic(), Partition::brod:partition(), Offset::brod:offset()) -> ok

Commit offset for a topic. This is an asynchronous call

get_committed_offsets/2

get_committed_offsets(Pid::pid(), TopicPartitions::[{brod:topic(), brod:partition()}]) -> {ok, [{{brod:topic(), brod:partition()}, brod:offset()}]}

Called by group coordinator when initializing the assignments for subscriber.

NOTE: This function is called only when offset_commit_policy is set to consumer_managed in group config.

NOTE: The committed offsets should be the offsets for successfully processed (acknowledged) messages, not the begin_offset to start fetching from.

handle_call/3

handle_call(Call, From, State) -> any()

handle_cast/2

handle_cast(Cast, State) -> any()

handle_info/2

handle_info(Info, State0) -> any()

init/1

init(X1) -> any()

start_link/7

start_link(Client::brod:client(), GroupId::brod:group_id(), Topics::[brod:topic()], GroupConfig::brod:group_config(), ConsumerConfig::brod:consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Equivalent to start_link(Client, GroupId, Topics, GroupConfig, ConsumerConfig, message, CbModule, CbInitArg).

Handle a message. Return one of:

{ok, NewCallbackState}: The subscriber has received the message for processing async-ly. It should call brod_group_subscriber:ack/4 to acknowledge later.

{ok, ack, NewCallbackState}: The subscriber has completed processing the message.

{ok, ack_no_commit, NewCallbackState}: The subscriber has completed processing the message, but it is not ready to commit offset yet. It should call brod_group_subscriber:commit/4 later.

While this callback function is being evaluated, the fetch-ahead partition-consumers are fetching more messages behind the scene unless prefetch_count and prefetch_bytes are set to 0 in consumer config.

start_link/8

start_link(Client::brod:client(), GroupId::brod:group_id(), Topics::[brod:topic()], GroupConfig::brod:group_config(), ConsumerConfig::brod:consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Start (link) a group subscriber.

Client: Client ID (or pid, but not recommended) of the brod client.

GroupId: Consumer group ID which should be unique per kafka cluster

Topics: Predefined set of topic names to join the group.

NOTE: The group leader member will collect topics from all members and assign all collected topic-partitions to members in the group. i.e. members can join with arbitrary set of topics.

GroupConfig: For group coordinator, see brod_group_coordinator:start_link/6

ConsumerConfig: For partition consumer, see brod_consumer:start_link/4

MessageType: The type of message that is going to be handled by the callback module. Can be either message or message_set.

CbModule: Callback module which should have the callback functions implemented for message processing.

CbInitArg: The term() that is going to be passed to CbModule:init/2 as a second argument when initializing the subscriber.

stop/1

stop(Pid::pid()) -> ok

Stop group subscriber, wait for pid DOWN before return.

terminate/2

terminate(Reason, State) -> any()

user_data/1

user_data(Pid) -> any()


Generated by EDoc