Module brod

Behaviours: application.

Data Types

batch_input()

batch_input() = [msg_input()]

bootstrap()

bootstrap() = [endpoint()] | {[endpoint()], client_config()}

call_ref()

call_ref() = #brod_call_ref{caller = undefined | pid(), callee = undefined | pid(), ref = undefined | reference()}

A record with caller, callee, and ref.

cg()

cg() = #brod_cg{id = brod:group_id(), protocol_type = brod:cg_protocol_type()}

cg_protocol_type()

cg_protocol_type() = binary()

client()

client() = client_id() | pid()

client_config()

client_config() = brod_client:config()

client_id()

client_id() = atom()

compression()

compression() = no_compression | gzip | snappy

conn_config()

conn_config() = [{atom(), term()}] | kpro:conn_config()

Connection configuration that will be passed to kpro calls.

For more info, see the kpro_connection:config() type.

connection()

connection() = kpro:connection()

consumer_config()

consumer_config() = [{begin_offset, offset_time()} | {min_bytes, non_neg_integer()} | {max_bytes, non_neg_integer()} | {max_wait_time, integer()} | {sleep_timeout, integer()} | {prefetch_count, integer()} | {prefetch_bytes, non_neg_integer()} | {offset_reset_policy, brod_consumer:offset_reset_policy()} | {size_stat_window, non_neg_integer()} | {isolation_level, brod_consumer:isolation_level()} | {share_leader_conn, boolean()}]

Consumer configuration.

The meaning of the options is documented at brod_consumer:start_link/5.

endpoint()

endpoint() = {hostname(), portnum()}

error_code()

error_code() = kpro:error_code()

fetch_opts()

fetch_opts() = kpro:fetch_opts()

fold_acc()

fold_acc() = term()

fold_fun()

fold_fun(Acc) = fun((message(), Acc) -> {ok, Acc} | {error, any()})

fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.

fold_limits()

fold_limits() = #{message_count => pos_integer(), reach_offset => offset()}

fold_result()

fold_result() = {fold_acc(), OffsetToContinue::offset(), fold_stop_reason()}

fold_stop_reason()

fold_stop_reason() = reached_end_of_partition | reached_message_count_limit | reached_target_offset | {error, any()}

OffsetToContinue: begin offset for the next fold call

group_config()

group_config() = proplists:proplist()

group_generation_id()

group_generation_id() = non_neg_integer()

group_id()

group_id() = kpro:group_id()

group_member()

group_member() = {group_member_id(), #kafka_group_member_metadata{version = non_neg_integer(), topics = [brod:topic()], user_data = binary()}}

group_member_id()

group_member_id() = binary()

hostname()

hostname() = kpro:hostname()

key()

key() = undefined | binary()

message()

message() = kpro:message()

A record with offset, key, value, ts_type, ts, and headers.

message_set()

message_set() = #kafka_message_set{topic = brod:topic(), partition = brod:partition(), high_wm_offset = integer(), messages = [brod:message()] | kpro:incomplete_batch()}

A record with topic, partition, high_wm_offset (max offset of the partition), and messages.

See the definition for more information.

msg_input()

msg_input() = kpro:msg_input()

msg_ts()

msg_ts() = kpro:msg_ts()

Unix time in milliseconds

offset()

offset() = kpro:offset()

Physical offset (an integer)

offset_time()

offset_time() = msg_ts() | earliest | latest

offsets_to_commit()

offsets_to_commit() = kpro:offsets_to_commit()

partition()

partition() = kpro:partition()

partition_assignment()

partition_assignment() = {topic(), [partition()]}

partition_fun()

partition_fun() = fun((topic(), pos_integer(), key(), value()) -> {ok, partition()})

partitioner()

partitioner() = partition_fun() | random | hash

portnum()

portnum() = pos_integer()

produce_ack_cb()

produce_ack_cb() = fun((partition(), offset()) -> term())

produce_reply()

produce_reply() = #brod_produce_reply{call_ref = brod:call_ref(), base_offset = undefined | brod:offset(), result = brod:produce_result()}

A record with call_ref, base_offset, and result.

See the the definition for more information.

produce_result()

produce_result() = brod_produce_req_buffered | brod_produce_req_acked

producer_config()

producer_config() = brod_producer:config()

received_assignments()

received_assignments() = [#brod_received_assignment{topic = brod:topic(), partition = brod:partition(), begin_offset = undefined | brod:offset() | {begin_offset, brod:offset_time()}}]

topic()

topic() = kpro:topic()

topic_config()

topic_config() = kpro:struct()

topic_partition()

topic_partition() = {topic(), partition()}

transaction()

transaction() = brod_transaction:transaction()

transaction_config()

transaction_config() = brod_transaction:transaction_config()

transactional_id()

transactional_id() = brod_transaction:transactional_id()

txn_do_options()

txn_do_options() = brod_transaction_processor:do_options()

txn_function()

txn_function() = brod_transaction_processor:process_function()

value()

value() = undefined | iodata() | {msg_ts(), binary()} | [{key(), value()}] | [{msg_ts(), key(), value()}] | kpro:msg_input() | kpro:batch_input()

Function Index

abort/1Abort the transaction.
commit/1Commit the transaction.
connect_group_coordinator/3Connect to consumer group coordinator broker.
connect_leader/4Connect partition leader.
consume_ack/2See consume_ack/4 for more information.
consume_ack/4Acknowledge that one or more messages have been processed.
create_topics/3Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
create_topics/4Create topic(s) in kafka.
delete_topics/3Equivalent to delete_topics(Hosts, Topics, Timeout, []).
delete_topics/4Delete topic(s) from kafka.
describe_groups/3Describe consumer groups.
fetch/4Fetch a single message set from the given topic-partition.
fetch/5Fetch a single message set from the given topic-partition.
fetch/7(Deprecated.) Equivalent to fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, []).
fetch/8(Deprecated.) Fetch a single message set from the given topic-partition.
fetch_committed_offsets/2Same as {link fetch_committed_offsets/3}, but works with a started brod_client
fetch_committed_offsets/3Fetch committed offsets for ALL topics in the given consumer group.
fold/8Fold through messages in a partition.
get_consumer/3
get_metadata/1Fetch broker metadata for all topics.
get_metadata/2Fetch broker metadata for the given topics.
get_metadata/3Fetch broker metadata for the given topics using the given connection options.
get_partitions_count/2Get number of partitions for a given topic.
get_partitions_count_safe/2The same as get_partitions_count(Client, Topic) but ensured not to auto-create topics in Kafka even when Kafka has topic auto-creation configured.
get_producer/3Equivalent to brod_client:get_producer(Client, Topic, Partition).
list_all_groups/2List ALL consumer groups in the given kafka cluster.
list_groups/2List consumer groups in the given group coordinator broker.
produce/2Equivalent to produce(Pid, <<>>, Value).
produce/3Produce one or more messages.
produce/5Produce one or more messages.
produce_cb/4Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).
produce_cb/6Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).
produce_no_ack/5Find the partition worker and send message without any ack.
produce_sync/2Equivalent to produce_sync(Pid, <<>>, Value).
produce_sync/3Sync version of produce/3.
produce_sync/5Sync version of produce/5.
produce_sync_offset/5Version of produce_sync/5 that returns the offset assigned by Kafka.
resolve_offset/3Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).
resolve_offset/4Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).
resolve_offset/5Resolve semantic offset or timestamp to real offset.
resolve_offset/6Resolve semantic offset or timestamp to real offset.
start/0Start brod application.
start/2Application behaviour callback.
start_client/1Equivalent to start_client(BootstrapEndpoints, brod_default_client).
start_client/2Equivalent to start_client(BootstrapEndpoints, ClientId, []).
start_client/3Start a client (brod_client).
start_consumer/3Dynamically start topic consumer(s) and register it in the client.
start_link_client/1Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).
start_link_client/2Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
start_link_client/3
start_link_group_subscriber/7
start_link_group_subscriber/8
start_link_group_subscriber_v2/1Start group_subscriber_v2.
start_link_topic_subscriber/1
start_link_topic_subscriber/5(Deprecated.) Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).
start_link_topic_subscriber/6(Deprecated.) Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).
start_link_topic_subscriber/7(Deprecated.)
start_producer/3Dynamically start a per-topic producer and register it in the client.
stop/0Stop brod application.
stop/1Application behaviour callback.
stop_client/1Stop a client.
subscribe/3Subscribe to a data stream from the given consumer.
subscribe/5Subscribe to a data stream from the given topic-partition.
sync_produce_request/1Equivalent to sync_produce_request(CallRef, infinity).
sync_produce_request/2Block wait for sent produced request to be acked by kafka.
sync_produce_request_offset/1Equivalent to sync_produce_request_offset(CallRef, infinity).
sync_produce_request_offset/2As sync_produce_request/2, but also returning assigned offset.
transaction/3Start a new transaction, TxId will be the id of the transaction.
txn_add_offsets/3Add the offset consumed by a group to the transaction.
txn_do/3Execute the function in the context of a fetch-produce cycle with access to an open transaction.
txn_produce/4Produce the batch of messages to the indicated topic-partition synchronously.
txn_produce/5Produce the message (key and value) to the indicated topic-partition synchronously.
unsubscribe/1Unsubscribe the current subscriber.
unsubscribe/2Unsubscribe the current subscriber.
unsubscribe/3Unsubscribe the current subscriber.
unsubscribe/4Unsubscribe the current subscriber.

Function Details

abort/1

abort(Transaction::transaction()) -> ok | {error, any()}

Abort the transaction

See also: brod_transaction:abort/1.

commit/1

commit(Transaction::transaction()) -> ok | {error, any()}

Commit the transaction

See also: brod_transaction:commit/1.

connect_group_coordinator/3

connect_group_coordinator(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, pid()} | {error, any()}

Connect to consumer group coordinator broker.

Done in steps:
  1. Connect to any of the given bootstrap ednpoints
  2. Send group_coordinator_request to resolve group coordinator endpoint
  3. Connect to the resolved endpoint and return the connection pid

connect_leader/4

connect_leader(Hosts::[endpoint()], Topic::topic(), Partition::partition(), ConnConfig::conn_config()) -> {ok, pid()}

Connect partition leader.

consume_ack/2

consume_ack(ConsumerPid::pid(), Offset::offset()) -> ok | {error, any()}

Equivalent to brod_consumer:ack(ConsumerPid, Offset).

See consume_ack/4 for more information.

consume_ack/4

consume_ack(Client::client(), Topic::topic(), Partition::partition(), Offset::offset()) -> ok | {error, any()}

Acknowledge that one or more messages have been processed.

brod_consumer sends message-sets to the subscriber process, and keep the messages in a 'pending' queue. The subscriber may choose to ack any received offset. Acknowledging a greater offset will automatically acknowledge the messages before this offset. For example, if message [1, 2, 3, 4] have been sent to (as one or more message-sets) to the subscriber, the subscriber may acknowledge with offset 3 to indicate that the first three messages are successfully processed, leaving behind only message 4 pending.

The 'pending' queue has a size limit (see prefetch_count consumer config) which is to provide a mechanism to handle back-pressure. If there are too many messages pending on ack, the consumer will stop fetching new ones so the subscriber won't get overwhelmed.

Note, there is no range check done for the acknowledging offset, meaning if offset [M, N] are pending to be acknowledged, acknowledging with Offset > N will cause all offsets to be removed from the pending queue, and acknowledging with Offset < M has no effect.

Use this function only with plain partition subscribers (i.e., when you manually call subscribe/5). Behaviours like brod_topic_subscriber have their own way how to ack messages.

create_topics/3

create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32()}) -> ok | {error, any()}

Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).

create_topics/4

create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32()}, Options::conn_config()) -> ok | {error, any()}

Create topic(s) in kafka.

TopicConfigs is a list of topic configurations. A topic configuration is a map (or tuple list for backward compatibility) with the following keys (all of them are reuired): Example:
  > TopicConfigs = [
      #{
        name => <<"my_topic">>,
        num_partitions => 1,
        replication_factor => 1,
        assignments => [],
        configs => [ #{name  => <<"cleanup.policy">>, value => "compact"}]
      }
    ].
  > brod:create_topics([{"localhost", 9092}], TopicConfigs, #{timeout => 1000}, []).
  ok

delete_topics/3

delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer()) -> ok | {error, any()}

Equivalent to delete_topics(Hosts, Topics, Timeout, []).

delete_topics/4

delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer(), Options::conn_config()) -> ok | {error, any()}

Delete topic(s) from kafka.

Example:
  > brod:delete_topics([{"localhost", 9092}], ["my_topic"], 5000, []).
  ok

describe_groups/3

describe_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config(), IDs::[group_id()]) -> {ok, [kpro:struct()]} | {error, any()}

Describe consumer groups.

The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Return describe_groups response body field named groups. See kpro_schema.erl for struct details.

fetch/4

fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::integer()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}

Fetch a single message set from the given topic-partition.

Calls fetch/5 with the default options: max_wait_time = 1 second, min_bytes = 1 B, and max_bytes = 2^20 B (1 MB).

See fetch/5 for more information.

fetch/5

fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}

Fetch a single message set from the given topic-partition.

The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} (or just Endpoints) so to establish a new connection before fetch.

The fourth argument is the start offset of the query. Messages with offset greater or equal will be fetched.

You can also pass options for the fetch query. See the kpro_req_lib:fetch_opts() type for their documentation. Only max_wait_time, min_bytes, max_bytes, and isolation_level options are currently supported. The defaults are the same as documented in the linked type, except for min_bytes which defaults to 1 in brod. Note that max_bytes will be rounded up so that full messages are retrieved. For example, if you specify max_bytes = 42 and there are three messages of size 40 bytes, two of them will be fetched.

On success, the function returns the messages along with the last stable offset (when using read_committed mode, the last committed offset) or the high watermark offset (offset of the last message that was successfully copied to all replicas, incremented by 1), whichever is lower. In essence, this is the offset up to which it was possible to read the messages at the time of fetching. This is similar to what resolve_offset/6 with latest returns. You can use this information to determine how far from the end of the topic you currently are. Note that when you use this offset as the start offset for a subseuqent call, an empty list of messages will be returned (assuming the topic hasn't changed, e.g. no new message arrived). Only when you use an offset greater than this one, {error, offset_out_of_range} will be returned.

Note also that Kafka batches messages in a message set only up to the end of a topic segment in which the first retrieved message is, so there may actually be more messages behind the last fetched offset even if the fetched size is significantly less than max_bytes provided in fetch_opts(). See this issue for more details.

Example (the topic has only two messages):
  > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 0, #{max_bytes => 1024}).
  {ok,{2,
       [{kafka_message,0,<<"some_key">>,<<"Hello world!">>,
                       create,1663940976473,[]},
        {kafka_message,1,<<"another_key">>,<<"This is a message with offset 1.">>,
                       create,1663940996335,[]}]}}
 
  > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 2, #{max_bytes => 1024}).
  {ok,{2,[]}}
 
  > brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 3, #{max_bytes => 1024}).
  {error,offset_out_of_range}

fetch/7

fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer()) -> {ok, [message()]} | {error, any()}

Equivalent to fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, []).

This function is deprecated: Please use fetch/5 instead

fetch/8

fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer(), ConnConfig::conn_config()) -> {ok, [message()]} | {error, any()}

This function is deprecated: Please use fetch/5 instead

Fetch a single message set from the given topic-partition.

fetch_committed_offsets/2

fetch_committed_offsets(Client::client(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}

Same as {link fetch_committed_offsets/3}, but works with a started brod_client

fetch_committed_offsets/3

fetch_committed_offsets(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}

Fetch committed offsets for ALL topics in the given consumer group.

Return the responses field of the offset_fetch response. See kpro_schema.erl for struct details.

fold/8

fold(Bootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts(), Acc, Fun::fold_fun(Acc), Limits::fold_limits()) -> fold_result()

Fold through messages in a partition.

Works like lists:foldl/2 but with below stop conditions: NOTE: Exceptions from evaluating FoldFun are not caught.

get_consumer/3

get_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}

get_metadata/1

get_metadata(Hosts::[endpoint()]) -> {ok, kpro:struct()} | {error, any()}

Fetch broker metadata for all topics.

See get_metadata/3 for more information.

get_metadata/2

get_metadata(Hosts::[endpoint()], Topics::all | [topic()]) -> {ok, kpro:struct()} | {error, any()}

Fetch broker metadata for the given topics.

See get_metadata/3 for more information.

get_metadata/3

get_metadata(Hosts::[endpoint()], Topics::all | [topic()], Options::conn_config()) -> {ok, kpro:struct()} | {error, any()}

Fetch broker metadata for the given topics using the given connection options.

The response differs in each version of the Metadata API call. The last supported Metadata API version is 2, so this will be probably used (if your Kafka supports it too). See kafka.bnf (search for MetadataResponseV2) for response schema with comments.

Beware that when auto.create.topics.enable is set to true in the broker configuration, fetching metadata with a concrete topic specified (in the Topics parameter) may cause creation of the topic when it does not exist. If you want a safe get_metadata call, always pass all as Topics and then filter them.

  > brod:get_metadata([{"localhost", 9092}], [<<"my_topic">>], []).
  {ok,#{brokers =>
            [#{host => <<"localhost">>,node_id => 1,port => 9092,
               rack => <<>>}],
        cluster_id => <<"jTb2faMLRf6p21yD1y3v-A">>,
        controller_id => 1,
        topics =>
            [#{error_code => no_error,is_internal => false,
               name => <<"my_topic">>,
               partitions =>
                   [#{error_code => no_error,
                      isr_nodes => [1],
                      leader_id => 1,partition_index => 1,
                      replica_nodes => [1]},
                    #{error_code => no_error,
                      isr_nodes => [1],
                      leader_id => 1,partition_index => 0,
                      replica_nodes => [1]}]}]}}

get_partitions_count/2

get_partitions_count(Client::client(), Topic::topic()) -> {ok, pos_integer()} | {error, any()}

Get number of partitions for a given topic.

The higher level producers may need the partition numbers to find the partition producer pid – if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions. NOTE: The partitions count is cached.

get_partitions_count_safe/2

get_partitions_count_safe(Client::client(), Topic::topic()) -> {ok, pos_integer()} | {error, any()}

The same as get_partitions_count(Client, Topic) but ensured not to auto-create topics in Kafka even when Kafka has topic auto-creation configured.

get_producer/3

get_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}

Equivalent to brod_client:get_producer(Client, Topic, Partition).

list_all_groups/2

list_all_groups(Endpoints::[endpoint()], ConnCfg::conn_config()) -> [{endpoint(), [cg()] | {error, any()}}]

List ALL consumer groups in the given kafka cluster.

NOTE: Exception if failed to connect any of the coordinator brokers.

list_groups/2

list_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config()) -> {ok, [cg()]} | {error, any()}

List consumer groups in the given group coordinator broker.

produce/2

produce(Pid::pid(), Value::value()) -> {ok, call_ref()} | {error, any()}

Equivalent to produce(Pid, <<>>, Value).

produce/3

produce(ProducerPid::pid(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}

Produce one or more messages.

See produce/5 for information about possible shapes of Value.

The pid should be a partition producer pid, NOT client pid.

The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.

produce/5

produce(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}

Produce one or more messages.

Value can have many different forms:

When Value is a batch, the Key argument is only used as partitioner input and all messages are written on the same partition.

ts field is dropped for kafka prior to version 0.10 (produce API version 0, magic version 0). headers field is dropped for kafka prior to version 0.11 (produce API version 0-2, magic version 0-1).

Partition may be either a concrete partition (an integer) or a partitioner (see partitioner() for more info).

A producer for the particular topic has to be already started (by calling start_producer/3), unless you have specified auto_start_producers = true when starting the client.

This function first looks up the producer pid, then calls produce/3 to do the real work.

The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} (see the produce_reply() type) message after the produce request has been acked by Kafka.

Example:
  > brod:produce(my_client, <<"my_topic">>, 0, "key", <<"Hello from erlang!">>).
  {ok,{brod_call_ref,<0.83.0>,<0.133.0>,#Ref<0.3024768151.2556690436.92841>}}
  > flush().
  Shell got {brod_produce_reply,
                {brod_call_ref,<0.83.0>,<0.133.0>,
                    #Ref<0.3024768151.2556690436.92841>},
                12,brod_produce_req_acked}

produce_cb/4

produce_cb(ProducerPid::pid(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {error, any()}

Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).

produce_cb/6

produce_cb(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {ok, partition()} | {error, any()}

Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka (see the produce_ack_cb() type).

Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.

produce_no_ack/5

produce_no_ack(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}

Find the partition worker and send message without any ack.

NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.

produce_sync/2

produce_sync(Pid::pid(), Value::value()) -> ok | {error, any()}

Equivalent to produce_sync(Pid, <<>>, Value).

produce_sync/3

produce_sync(Pid::pid(), Key::key(), Value::value()) -> ok | {error, any()}

Sync version of produce/3.

This function will not return until the response is received from Kafka. But when producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.

produce_sync/5

produce_sync(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}

Sync version of produce/5.

This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.

produce_sync_offset/5

produce_sync_offset(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}

Version of produce_sync/5 that returns the offset assigned by Kafka.

If producer is started with required_acks set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET.

resolve_offset/3

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition()) -> {ok, offset()} | {error, any()}

Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).

resolve_offset/4

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time()) -> {ok, offset()} | {error, any()}

Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).

resolve_offset/5

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config()) -> {ok, offset()} | {error, any()}

Resolve semantic offset or timestamp to real offset.

The same as resolve_offset/6 but the timeout is extracted from connection config.

resolve_offset/6

resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config(), Opts::#{timeout => kpro:int32()}) -> {ok, offset()} | {error, any()}

Resolve semantic offset or timestamp to real offset.

The function returns the offset of the first message with the given timestamp, or of the first message after the given timestamp (in case no message matches the timestamp exactly), or -1 if the timestamp is newer than (>) all messages in the topic.

You can also use two semantic offsets instead of a timestamp: earliest gives you the offset of the first message in the topic and latest gives you the offset of the last message incremented by 1.

If the topic is empty, both earliest and latest return the same value (which is 0 unless some messages were deleted from the topic), and any timestamp returns -1.

An example for illustration:
  Messages:
  offset       0   1   2   3
  timestamp    10  20  20  30
 
  Calls:
  resolve_offset(Endpoints, Topic, Partition, 5) → 0
  resolve_offset(Endpoints, Topic, Partition, 10) → 0
  resolve_offset(Endpoints, Topic, Partition, 13) → 1
  resolve_offset(Endpoints, Topic, Partition, 20) → 1
  resolve_offset(Endpoints, Topic, Partition, 31) → -1
  resolve_offset(Endpoints, Topic, Partition, earliest) → 0
  resolve_offset(Endpoints, Topic, Partition, latest) → 4

start/0

start() -> ok | no_return()

Start brod application.

start/2

start(StartType, StartArgs) -> any()

Application behaviour callback

start_client/1

start_client(BootstrapEndpoints::[endpoint()]) -> ok | {error, any()}

Equivalent to start_client(BootstrapEndpoints, brod_default_client).

start_client/2

start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> ok | {error, any()}

Equivalent to start_client(BootstrapEndpoints, ClientId, []).

start_client/3

start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> ok | {error, any()}

Start a client (brod_client).

BootstrapEndpoints: Kafka cluster endpoints, can be any of the brokers in the cluster, which does not necessarily have to be the leader of any partition, e.g. a load-balanced entrypoint to the remote Kafka cluster.

ClientId: Atom to identify the client process.

Config is a proplist, possible values:

Connection options can be added to the same proplist. See kpro_connection.erl in kafka_protocol for the details:

You can read more about clients in the overview.

start_consumer/3

start_consumer(Client::client(), TopicName::topic(), ConsumerConfig::consumer_config()) -> ok | {error, any()}

Dynamically start topic consumer(s) and register it in the client.

A brod_consumer is started for each partition of the given topic. Note that you can have only one consumer per client-topic.

See brod_consumer:start_link/5 for details about consumer config.

You can read more about consumers in the overview.

start_link_client/1

start_link_client(BootstrapEndpoints::[endpoint()]) -> {ok, pid()} | {error, any()}

Equivalent to start_link_client(BootstrapEndpoints, brod_default_client).

start_link_client/2

start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> {ok, pid()} | {error, any()}

Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).

start_link_client/3

start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> {ok, pid()} | {error, any()}

start_link_group_subscriber/7

start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

See also: brod_group_subscriber:start_link/7.

start_link_group_subscriber/8

start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

See also: brod_group_subscriber:start_link/8.

start_link_group_subscriber_v2/1

start_link_group_subscriber_v2(Config::brod_group_subscriber_v2:subscriber_config()) -> {ok, pid()} | {error, any()}

Start group_subscriber_v2.

start_link_topic_subscriber/1

start_link_topic_subscriber(Config::brod_topic_subscriber:topic_subscriber_config()) -> {ok, pid()} | {error, any()}

See also: brod_topic_subscriber:start_link/1.

start_link_topic_subscriber/5

start_link_topic_subscriber(Client::client(), Topic::topic(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Equivalent to start_link_topic_subscriber(Client, Topic, all, ConsumerConfig, CbModule, CbInitArg).

This function is deprecated: Please use start_link_topic_subscriber/1 instead

start_link_topic_subscriber/6

start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

Equivalent to start_link_topic_subscriber(Client, Topic, Partitions, ConsumerConfig, message, CbModule, CbInitArg).

This function is deprecated: Please use start_link_topic_subscriber/1 instead

start_link_topic_subscriber/7

start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}

This function is deprecated: Please use start_link_topic_subscriber/1 instead

See also: brod_topic_subscriber:start_link/7.

start_producer/3

start_producer(Client::client(), TopicName::topic(), ProducerConfig::producer_config()) -> ok | {error, any()}

Dynamically start a per-topic producer and register it in the client.

You have to start a producer for each topic you want to produce messages into, unless you have specified auto_start_producers = true when starting the client (in that case you don't have to call this function at all).

After starting the producer, you can call produce/5 and friends for producing messages.

You can read more about producers in the overview.

A client has to be already started before making this call (e.g. by calling start_client/3).

See brod_producer:start_link/4 for a list of available configuration options.

Example:
  > brod:start_producer(my_client, <<"my_topic">>, [{max_retries, 5}]).
  ok

stop/0

stop() -> ok

Stop brod application.

stop/1

stop(State) -> any()

Application behaviour callback

stop_client/1

stop_client(Client::client()) -> ok

Stop a client.

subscribe/3

subscribe(ConsumerPid::pid(), SubscriberPid::pid(), Options::consumer_config()) -> ok | {error, any()}

Subscribe to a data stream from the given consumer.

See subscribe/5 for more information.

subscribe/5

subscribe(Client::client(), SubscriberPid::pid(), Topic::topic(), Partition::partition(), Options::consumer_config()) -> {ok, pid()} | {error, any()}

Subscribe to a data stream from the given topic-partition.

A client has to be already started (by calling start_client/3, one client per multiple topics is enough) and a corresponding consumer for the topic and partition as well (by calling start_consumer/3), before calling this function.

Caller may specify a set of options extending consumer config. See brod_consumer:subscribe/3 for more info on that.

If {error, Reason} is returned, the caller should perhaps retry later.

{ok, ConsumerPid} is returned on success. The caller may want to monitor the consumer pid and re-subscribe should the ConsumerPid crash.

Upon successful subscription the subscriber process should expect messages of pattern: {ConsumerPid, #kafka_message_set{}} and {ConsumerPid, #kafka_fetch_error{}}.

-include_lib("brod/include/brod.hrl") to access the records.

In case #kafka_fetch_error{} is received the subscriber should re-subscribe itself to resume the data stream.

To provide a mechanism to handle backpressure, brod requires all messages sent to a subscriber to be acked by calling consume_ack/4 after they are processed. If there are too many not-acked messages received by the subscriber, the consumer will stop to fetch new ones so the subscriber won't get overwhelmed.

Only one process can be subscribed to a consumer. This means that if you want to read at different places (or at different paces), you have to create separate consumers (and thus also separate clients).

sync_produce_request/1

sync_produce_request(CallRef::call_ref()) -> ok | {error, Reason::any()}

Equivalent to sync_produce_request(CallRef, infinity).

sync_produce_request/2

sync_produce_request(CallRef::call_ref(), Timeout::timeout()) -> ok | {error, Reason::any()}

Block wait for sent produced request to be acked by kafka.

This way, you can turn asynchronous requests, made by produce/5 and friends, into synchronous ones.

Example:
  {ok, CallRef} = brod:produce(
    brod_client_1, <<"my_topic">>, 0, <<"some-key">>, <<"some-value">>)
  ). % returns immediately
  % the following call waits and returns after the ack is received or timed out
  brod:sync_produce_request(CallRef, 5_000).

sync_produce_request_offset/1

sync_produce_request_offset(CallRef::call_ref()) -> {ok, offset()} | {error, Reason::any()}

Equivalent to sync_produce_request_offset(CallRef, infinity).

sync_produce_request_offset/2

sync_produce_request_offset(CallRef::call_ref(), Timeout::timeout()) -> {ok, offset()} | {error, Reason::any()}

As sync_produce_request/2, but also returning assigned offset.

See {link produce_sync_offset/5}.

transaction/3

transaction(Client::client(), TxnId::transactional_id(), Config::transaction_config()) -> {ok, transaction()}

Equivalent to brod_transaction:start_link / 3.

Start a new transaction, TxId will be the id of the transaction

txn_add_offsets/3

txn_add_offsets(Transaction::transaction(), ConsumerGroup::group_id(), Offsets::offsets_to_commit()) -> ok | {error, any()}

Add the offset consumed by a group to the transaction.

See also: brod_transaction:add_offsets/3.

txn_do/3

txn_do(ProcessFun::txn_function(), Client::client(), Options::txn_do_options()) -> {ok, pid()} | {error, any()}

Execute the function in the context of a fetch-produce cycle with access to an open transaction.

See also: brod_transaction_processor:do/3.

txn_produce/4

txn_produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Batch::batch_input()) -> {ok, offset()} | {error, any()}

Produce the batch of messages to the indicated topic-partition synchronously.

See also: brod_transaction:produce/5.

txn_produce/5

txn_produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}

Produce the message (key and value) to the indicated topic-partition synchronously.

See also: brod_transaction:produce/5.

unsubscribe/1

unsubscribe(ConsumerPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.

Assuming the subscriber is %% self().

unsubscribe/2

unsubscribe(ConsumerPid::pid(), SubscriberPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.

unsubscribe/3

unsubscribe(Client::client(), Topic::topic(), Partition::partition()) -> ok | {error, any()}

Unsubscribe the current subscriber.

Assuming the subscriber is %% self().

unsubscribe/4

unsubscribe(Client::client(), Topic::topic(), Partition::partition(), SubscriberPid::pid()) -> ok | {error, any()}

Unsubscribe the current subscriber.


Generated by EDoc