Behaviours: application.
batch_input() = [msg_input()]
bootstrap() = [endpoint()] | {[endpoint()], client_config()}
call_ref() = #brod_call_ref{caller = undefined | pid(), callee = undefined | pid(), ref = undefined | reference()}
A record with caller, callee, and ref.
cg() = #brod_cg{id = brod:group_id(), protocol_type = brod:cg_protocol_type()}
cg_protocol_type() = binary()
client() = client_id() | pid()
client_config() = brod_client:config()
client_id() = atom()
compression() = no_compression | gzip | snappy
conn_config() = [{atom(), term()}] | kpro:conn_config()
Connection configuration that will be passed to kpro calls.
kpro_connection:config() type.
connection() = kpro:connection()
consumer_config() = [{begin_offset, offset_time()} | {min_bytes, non_neg_integer()} | {max_bytes, non_neg_integer()} | {max_wait_time, integer()} | {sleep_timeout, integer()} | {prefetch_count, integer()} | {prefetch_bytes, non_neg_integer()} | {offset_reset_policy, brod_consumer:offset_reset_policy()} | {size_stat_window, non_neg_integer()} | {isolation_level, brod_consumer:isolation_level()} | {share_leader_conn, boolean()}]
Consumer configuration.
The meaning of the options is documented atbrod_consumer:start_link/5.
endpoint() = {hostname(), portnum()}
error_code() = kpro:error_code()
fetch_opts() = kpro:fetch_opts()
fold_acc() = term()
fold_fun(Acc) = fun((message(), Acc) -> {ok, Acc} | {error, any()})
fold always returns when reaches the high watermark offset. fold
also returns when any of the limits is hit.
fold_limits() = #{message_count => pos_integer(), reach_offset => offset()}
fold_result() = {fold_acc(), OffsetToContinue::offset(), fold_stop_reason()}
fold_stop_reason() = reached_end_of_partition | reached_message_count_limit | reached_target_offset | {error, any()}
OffsetToContinue: begin offset for the next fold call
group_config() = proplists:proplist()
group_generation_id() = non_neg_integer()
group_id() = kpro:group_id()
group_member() = {group_member_id(), #kafka_group_member_metadata{version = non_neg_integer(), topics = [brod:topic()], user_data = binary()}}
group_member_id() = binary()
hostname() = kpro:hostname()
key() = undefined | binary()
message() = kpro:message()
A record with offset, key, value, ts_type, ts, and headers.
message_set() = #kafka_message_set{topic = brod:topic(), partition = brod:partition(), high_wm_offset = integer(), messages = [brod:message()] | kpro:incomplete_batch()}
A record with topic, partition, high_wm_offset (max offset of the partition), and messages.
See the definition for more information.msg_input() = kpro:msg_input()
msg_ts() = kpro:msg_ts()
Unix time in milliseconds
offset() = kpro:offset()
Physical offset (an integer)
offset_time() = msg_ts() | earliest | latest
offsets_to_commit() = kpro:offsets_to_commit()
partition() = kpro:partition()
partition_assignment() = {topic(), [partition()]}
partition_fun() = fun((topic(), pos_integer(), key(), value()) -> {ok, partition()})
partitioner() = partition_fun() | random | hash
portnum() = pos_integer()
produce_ack_cb() = fun((partition(), offset()) -> term())
produce_reply() = #brod_produce_reply{call_ref = brod:call_ref(), base_offset = undefined | brod:offset(), result = brod:produce_result()}
A record with call_ref, base_offset, and result.
See the the definition for more information.produce_result() = brod_produce_req_buffered | brod_produce_req_acked
producer_config() = brod_producer:config()
received_assignments() = [#brod_received_assignment{topic = brod:topic(), partition = brod:partition(), begin_offset = undefined | brod:offset() | {begin_offset, brod:offset_time()}}]
topic() = kpro:topic()
topic_config() = kpro:struct()
topic_partition() = {topic(), partition()}
transaction() = brod_transaction:transaction()
transaction_config() = brod_transaction:transaction_config()
transactional_id() = brod_transaction:transactional_id()
txn_do_options() = brod_transaction_processor:do_options()
txn_function() = brod_transaction_processor:process_function()
value() = undefined | iodata() | {msg_ts(), binary()} | [{key(), value()}] | [{msg_ts(), key(), value()}] | kpro:msg_input() | kpro:batch_input()
| abort/1 | Abort the transaction. |
| commit/1 | Commit the transaction. |
| connect_group_coordinator/3 | Connect to consumer group coordinator broker. |
| connect_leader/4 | Connect partition leader. |
| consume_ack/2 | See consume_ack/4 for more information. |
| consume_ack/4 | Acknowledge that one or more messages have been processed. |
| create_topics/3 | Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
|
| create_topics/4 | Create topic(s) in kafka. |
| delete_topics/3 | Equivalent to delete_topics(Hosts, Topics, Timeout, []).
|
| delete_topics/4 | Delete topic(s) from kafka. |
| describe_groups/3 | Describe consumer groups. |
| fetch/4 | Fetch a single message set from the given topic-partition. |
| fetch/5 | Fetch a single message set from the given topic-partition. |
| fetch/7 | (Deprecated.) Equivalent to fetch(Hosts,
Topic,
Partition,
Offset,
Wait,
MinBytes,
MaxBytes,
[]).
|
| fetch/8 | (Deprecated.) Fetch a single message set from the given topic-partition. |
| fetch_committed_offsets/2 | Same as {link fetch_committed_offsets/3},
but works with a started brod_client |
| fetch_committed_offsets/3 | Fetch committed offsets for ALL topics in the given consumer group. |
| fold/8 | Fold through messages in a partition. |
| get_consumer/3 | |
| get_metadata/1 | Fetch broker metadata for all topics. |
| get_metadata/2 | Fetch broker metadata for the given topics. |
| get_metadata/3 | Fetch broker metadata for the given topics using the given connection options. |
| get_partitions_count/2 | Get number of partitions for a given topic. |
| get_partitions_count_safe/2 | The same as get_partitions_count(Client, Topic)
but ensured not to auto-create topics in Kafka even
when Kafka has topic auto-creation configured. |
| get_producer/3 | Equivalent to brod_client:get_producer(Client, Topic, Partition).
|
| list_all_groups/2 | List ALL consumer groups in the given kafka cluster. |
| list_groups/2 | List consumer groups in the given group coordinator broker. |
| produce/2 | Equivalent to produce(Pid, <<>>, Value).
|
| produce/3 | Produce one or more messages. |
| produce/5 | Produce one or more messages. |
| produce_cb/4 | Same as produce/3, only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka (see the produce_ack_cb() type). |
| produce_cb/6 | Same as produce/5 only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka (see the produce_ack_cb() type). |
| produce_no_ack/5 | Find the partition worker and send message without any ack. |
| produce_sync/2 | Equivalent to produce_sync(Pid, <<>>, Value).
|
| produce_sync/3 | Sync version of produce/3. |
| produce_sync/5 | Sync version of produce/5. |
| produce_sync_offset/5 | Version of produce_sync/5 that returns the offset assigned by Kafka. |
| resolve_offset/3 | Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).
|
| resolve_offset/4 | Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).
|
| resolve_offset/5 | Resolve semantic offset or timestamp to real offset. |
| resolve_offset/6 | Resolve semantic offset or timestamp to real offset. |
| start/0 | Start brod application. |
| start/2 | Application behaviour callback. |
| start_client/1 | Equivalent to start_client(BootstrapEndpoints, brod_default_client).
|
| start_client/2 | Equivalent to start_client(BootstrapEndpoints, ClientId, []).
|
| start_client/3 | Start a client (brod_client). |
| start_consumer/3 | Dynamically start topic consumer(s) and register it in the client. |
| start_link_client/1 | Equivalent to start_link_client(BootstrapEndpoints,
brod_default_client).
|
| start_link_client/2 | Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
|
| start_link_client/3 | |
| start_link_group_subscriber/7 | |
| start_link_group_subscriber/8 | |
| start_link_group_subscriber_v2/1 | Start group_subscriber_v2. |
| start_link_topic_subscriber/1 | |
| start_link_topic_subscriber/5 | (Deprecated.) Equivalent to start_link_topic_subscriber(Client,
Topic,
all,
ConsumerConfig,
CbModule,
CbInitArg).
|
| start_link_topic_subscriber/6 | (Deprecated.) Equivalent to start_link_topic_subscriber(Client,
Topic,
Partitions,
ConsumerConfig,
message,
CbModule,
CbInitArg).
|
| start_link_topic_subscriber/7 | (Deprecated.) |
| start_producer/3 | Dynamically start a per-topic producer and register it in the client. |
| stop/0 | Stop brod application. |
| stop/1 | Application behaviour callback. |
| stop_client/1 | Stop a client. |
| subscribe/3 | Subscribe to a data stream from the given consumer. |
| subscribe/5 | Subscribe to a data stream from the given topic-partition. |
| sync_produce_request/1 | Equivalent to sync_produce_request(CallRef, infinity).
|
| sync_produce_request/2 | Block wait for sent produced request to be acked by kafka. |
| sync_produce_request_offset/1 | Equivalent to sync_produce_request_offset(CallRef, infinity).
|
| sync_produce_request_offset/2 | As sync_produce_request/2, but also returning assigned offset. |
| transaction/3 | Start a new transaction, TxId will be the id of the transaction. |
| txn_add_offsets/3 | Add the offset consumed by a group to the transaction. |
| txn_do/3 | Execute the function in the context of a fetch-produce cycle with access to an open transaction. |
| txn_produce/4 | Produce the batch of messages to the indicated topic-partition synchronously. |
| txn_produce/5 | Produce the message (key and value) to the indicated topic-partition synchronously. |
| unsubscribe/1 | Unsubscribe the current subscriber. |
| unsubscribe/2 | Unsubscribe the current subscriber. |
| unsubscribe/3 | Unsubscribe the current subscriber. |
| unsubscribe/4 | Unsubscribe the current subscriber. |
abort(Transaction::transaction()) -> ok | {error, any()}
Abort the transaction
See also: brod_transaction:abort/1.
commit(Transaction::transaction()) -> ok | {error, any()}
Commit the transaction
See also: brod_transaction:commit/1.
connect_group_coordinator(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, pid()} | {error, any()}
Connect to consumer group coordinator broker.
Done in steps:connect_leader(Hosts::[endpoint()], Topic::topic(), Partition::partition(), ConnConfig::conn_config()) -> {ok, pid()}
Connect partition leader.
consume_ack(ConsumerPid::pid(), Offset::offset()) -> ok | {error, any()}
Equivalent to brod_consumer:ack(ConsumerPid, Offset).
See consume_ack/4 for more information.
consume_ack(Client::client(), Topic::topic(), Partition::partition(), Offset::offset()) -> ok | {error, any()}
Acknowledge that one or more messages have been processed.
brod_consumer sends message-sets to the subscriber process, and keep
the messages in a 'pending' queue.
The subscriber may choose to ack any received offset.
Acknowledging a greater offset will automatically acknowledge
the messages before this offset.
For example, if message [1, 2, 3, 4] have been sent to (as one or more message-sets)
to the subscriber, the subscriber may acknowledge with offset 3 to indicate that
the first three messages are successfully processed, leaving behind only message 4
pending.
The 'pending' queue has a size limit (see prefetch_count consumer config)
which is to provide a mechanism to handle back-pressure.
If there are too many messages pending on ack, the consumer will stop
fetching new ones so the subscriber won't get overwhelmed.
Note, there is no range check done for the acknowledging offset, meaning if offset [M, N]
are pending to be acknowledged, acknowledging with Offset > N will cause all offsets to be
removed from the pending queue, and acknowledging with Offset < M has no effect.
subscribe/5). Behaviours like
brod_topic_subscriber have their own way how to ack messages.
create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32()}) -> ok | {error, any()}
Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).
create_topics(Hosts::[endpoint()], TopicConfigs::[topic_config()], RequestConfigs::#{timeout => kpro:int32()}, Options::conn_config()) -> ok | {error, any()}
Create topic(s) in kafka.
TopicConfigs is a list of topic configurations.
A topic configuration is a map (or tuple list for backward compatibility)
with the following keys (all of them are reuired):
name
num_partitions
replication_factor
assignments
partition_index and broker_ids (a list of of brokers to
place the partition on).configs
name and value. You can find possible
options in the Kafka documentation. > TopicConfigs = [
#{
name => <<"my_topic">>,
num_partitions => 1,
replication_factor => 1,
assignments => [],
configs => [ #{name => <<"cleanup.policy">>, value => "compact"}]
}
].
> brod:create_topics([{"localhost", 9092}], TopicConfigs, #{timeout => 1000}, []).
ok
delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer()) -> ok | {error, any()}
Equivalent to delete_topics(Hosts, Topics, Timeout, []).
delete_topics(Hosts::[endpoint()], Topics::[topic()], Timeout::pos_integer(), Options::conn_config()) -> ok | {error, any()}
Delete topic(s) from kafka.
Example: > brod:delete_topics([{"localhost", 9092}], ["my_topic"], 5000, []).
ok
describe_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config(), IDs::[group_id()]) -> {ok, [kpro:struct()]} | {error, any()}
Describe consumer groups.
The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Returndescribe_groups response body field named groups.
See kpro_schema.erl for struct details.
fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::integer()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}
Fetch a single message set from the given topic-partition.
Calls fetch/5 with the default options: max_wait_time = 1 second,
min_bytes = 1 B, and max_bytes = 2^20 B (1 MB).
fetch/5 for more information.
fetch(ConnOrBootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts()) -> {ok, {HwOffset::offset(), [message()]}} | {error, any()}
Fetch a single message set from the given topic-partition.
The first arg can either be an already established connection to leader,
or {Endpoints, ConnConfig} (or just Endpoints) so to establish a new
connection before fetch.
The fourth argument is the start offset of the query. Messages with offset greater or equal will be fetched.
You can also pass options for the fetch query.
See the kpro_req_lib:fetch_opts() type for their documentation.
Only max_wait_time, min_bytes, max_bytes, and isolation_level
options are currently supported. The defaults are the same as documented
in the linked type, except for min_bytes which defaults to 1 in brod.
Note that max_bytes will be rounded up so that full messages are
retrieved. For example, if you specify max_bytes = 42 and there
are three messages of size 40 bytes, two of them will be fetched.
On success, the function returns the messages along with the last stable
offset (when using read_committed mode, the last committed offset) or the
high watermark offset (offset of the last message that was successfully
copied to all replicas, incremented by 1), whichever is lower. In essence, this
is the offset up to which it was possible to read the messages at the time of
fetching. This is similar to what resolve_offset/6 with latest
returns. You can use this information to determine how far from the end of the
topic you currently are. Note that when you use this offset as the start offset
for a subseuqent call, an empty list of messages will be returned (assuming the
topic hasn't changed, e.g. no new message arrived). Only when you use an offset
greater than this one, {error, offset_out_of_range} will be returned.
Note also that Kafka batches messages in a message set only up to the end of
a topic segment in which the first retrieved message is, so there may actually
be more messages behind the last fetched offset even if the fetched size is
significantly less than max_bytes provided in fetch_opts().
See this issue
for more details.
> brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 0, #{max_bytes => 1024}).
{ok,{2,
[{kafka_message,0,<<"some_key">>,<<"Hello world!">>,
create,1663940976473,[]},
{kafka_message,1,<<"another_key">>,<<"This is a message with offset 1.">>,
create,1663940996335,[]}]}}
> brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 2, #{max_bytes => 1024}).
{ok,{2,[]}}
> brod:fetch([{"localhost", 9092}], <<"my_topic">>, 0, 3, #{max_bytes => 1024}).
{error,offset_out_of_range}
fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer()) -> {ok, [message()]} | {error, any()}
Equivalent to fetch(Hosts,
Topic,
Partition,
Offset,
Wait,
MinBytes,
MaxBytes,
[]).
This function is deprecated: Please use fetch/5 instead
fetch(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Offset::offset(), MaxWaitTime::non_neg_integer(), MinBytes::non_neg_integer(), MaxBytes::pos_integer(), ConnConfig::conn_config()) -> {ok, [message()]} | {error, any()}
This function is deprecated: Please use fetch/5 instead
Fetch a single message set from the given topic-partition.
fetch_committed_offsets(Client::client(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}
Same as {link fetch_committed_offsets/3},
but works with a started brod_client
fetch_committed_offsets(BootstrapEndpoints::[endpoint()], ConnCfg::conn_config(), GroupId::group_id()) -> {ok, [kpro:struct()]} | {error, any()}
Fetch committed offsets for ALL topics in the given consumer group.
Return theresponses field of the offset_fetch response.
See kpro_schema.erl for struct details.
fold(Bootstrap::connection() | client_id() | bootstrap(), Topic::topic(), Partition::partition(), Offset::offset(), Opts::fetch_opts(), Acc, Fun::fold_fun(Acc), Limits::fold_limits()) -> fold_result()
Acc = fold_acc()
Fold through messages in a partition.
Works likelists:foldl/2 but with below stop conditions:
FoldFun returns an {error, Reason} tuple FoldFun are not caught.
get_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}
Reason = client_down | {client_down, any()} | {consumer_down, any()} | {consumer_not_found, topic()} | {consumer_not_found, topic(), partition()}
get_metadata(Hosts::[endpoint()]) -> {ok, kpro:struct()} | {error, any()}
Fetch broker metadata for all topics.
Seeget_metadata/3 for more information.
get_metadata(Hosts::[endpoint()], Topics::all | [topic()]) -> {ok, kpro:struct()} | {error, any()}
Fetch broker metadata for the given topics.
Seeget_metadata/3 for more information.
get_metadata(Hosts::[endpoint()], Topics::all | [topic()], Options::conn_config()) -> {ok, kpro:struct()} | {error, any()}
Fetch broker metadata for the given topics using the given connection options.
The response differs in each version of the Metadata API call.
The last supported Metadata API version is 2, so this will be
probably used (if your Kafka supports it too). See
kafka.bnf
(search for MetadataResponseV2) for response schema with comments.
Beware that when auto.create.topics.enable is set to true in
the broker configuration, fetching metadata with a concrete
topic specified (in the Topics parameter) may cause creation of
the topic when it does not exist. If you want a safe get_metadata
call, always pass all as Topics and then filter them.
> brod:get_metadata([{"localhost", 9092}], [<<"my_topic">>], []).
{ok,#{brokers =>
[#{host => <<"localhost">>,node_id => 1,port => 9092,
rack => <<>>}],
cluster_id => <<"jTb2faMLRf6p21yD1y3v-A">>,
controller_id => 1,
topics =>
[#{error_code => no_error,is_internal => false,
name => <<"my_topic">>,
partitions =>
[#{error_code => no_error,
isr_nodes => [1],
leader_id => 1,partition_index => 1,
replica_nodes => [1]},
#{error_code => no_error,
isr_nodes => [1],
leader_id => 1,partition_index => 0,
replica_nodes => [1]}]}]}}
Get number of partitions for a given topic.
The higher level producers may need the partition numbers to find the partition producer pid – if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions. NOTE: The partitions count is cached.The same as get_partitions_count(Client, Topic)
but ensured not to auto-create topics in Kafka even
when Kafka has topic auto-creation configured.
get_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, Reason}
Reason = client_down | {client_down, any()} | {producer_down, any()} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}
Equivalent to brod_client:get_producer(Client, Topic, Partition).
list_all_groups(Endpoints::[endpoint()], ConnCfg::conn_config()) -> [{endpoint(), [cg()] | {error, any()}}]
List ALL consumer groups in the given kafka cluster.
NOTE: Exception if failed to connect any of the coordinator brokers.list_groups(CoordinatorEndpoint::endpoint(), ConnCfg::conn_config()) -> {ok, [cg()]} | {error, any()}
List consumer groups in the given group coordinator broker.
produce(Pid::pid(), Value::value()) -> {ok, call_ref()} | {error, any()}
Equivalent to produce(Pid, <<>>, Value).
produce(ProducerPid::pid(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}
Produce one or more messages.
See produce/5 for information about possible shapes
of Value.
The pid should be a partition producer pid, NOT client pid.
The return value is a call reference of typecall_ref(),
so the caller can use it to expect (match)
a #brod_produce_reply{result = brod_produce_req_acked}
message after the produce request has been acked by Kafka.
produce(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, call_ref()} | {error, any()}
Produce one or more messages.
Value can have many different forms:
binary(): Single message with key from the Key argument{brod:msg_ts(), binary()}: Single message with
its create-time timestamp and key from Key#{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}:
Single message; if this map does not have a key
field, Key is used instead[{K, V} | {T, K, V}]: A batch, where V could be
a nested list of such representation[#{key => K, value => V, ts => T, headers => [{_, _}]}]:
A batchWhen Value is a batch, the Key argument is only used
as partitioner input and all messages are written on the
same partition.
ts field is dropped for kafka prior to version 0.10
(produce API version 0, magic version 0). headers field
is dropped for kafka prior to version 0.11 (produce API
version 0-2, magic version 0-1).
Partition may be either a concrete partition (an integer)
or a partitioner (see partitioner() for more info).
A producer for the particular topic has to be already started
(by calling start_producer/3), unless you have specified
auto_start_producers = true when starting the client.
This function first looks up the producer pid, then calls produce/3
to do the real work.
The return value is a call reference of type call_ref(), so the caller
can used it to expect (match)
a #brod_produce_reply{result = brod_produce_req_acked}
(see the produce_reply() type) message after the
produce request has been acked by Kafka.
> brod:produce(my_client, <<"my_topic">>, 0, "key", <<"Hello from erlang!">>).
{ok,{brod_call_ref,<0.83.0>,<0.133.0>,#Ref<0.3024768151.2556690436.92841>}}
> flush().
Shell got {brod_produce_reply,
{brod_call_ref,<0.83.0>,<0.133.0>,
#Ref<0.3024768151.2556690436.92841>},
12,brod_produce_req_acked}
produce_cb(ProducerPid::pid(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {error, any()}
Same as produce/3, only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka (see the produce_ack_cb() type).
produce_cb(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value(), AckCb::produce_ack_cb()) -> ok | {ok, partition()} | {error, any()}
Same as produce/5 only the ack is not delivered as a message,
instead, the callback is evaluated by producer worker when ack is received
from kafka (see the produce_ack_cb() type).
{ok, Partition} for caller
to correlate the callback when the 3rd arg is not a partition number.
produce_no_ack(Client::client(), Topic::topic(), Part::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}
Find the partition worker and send message without any ack.
NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.produce_sync(Pid::pid(), Value::value()) -> ok | {error, any()}
Equivalent to produce_sync(Pid, <<>>, Value).
Sync version of produce/3.
required_acks set to 0,
this function will return once the messages are buffered in the
producer process.
produce_sync(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> ok | {error, any()}
Sync version of produce/5.
required_acks set to 0, this function
will return once the messages are buffered in the producer process.
produce_sync_offset(Client::client(), Topic::topic(), Partition::partition() | partitioner(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}
Version of produce_sync/5 that returns the offset assigned by Kafka.
required_acks set to 0, the offset will be
?BROD_PRODUCE_UNKNOWN_OFFSET.
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition()) -> {ok, offset()} | {error, any()}
Equivalent to resolve_offset(Hosts, Topic, Partition, latest, []).
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time()) -> {ok, offset()} | {error, any()}
Equivalent to resolve_offset(Hosts, Topic, Partition, Time, []).
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config()) -> {ok, offset()} | {error, any()}
Resolve semantic offset or timestamp to real offset.
The same asresolve_offset/6 but the timeout is
extracted from connection config.
resolve_offset(Hosts::[endpoint()], Topic::topic(), Partition::partition(), Time::offset_time(), ConnCfg::conn_config(), Opts::#{timeout => kpro:int32()}) -> {ok, offset()} | {error, any()}
Resolve semantic offset or timestamp to real offset.
The function returns the offset of the first message with the given timestamp, or of the first message after the given timestamp (in case no message matches the timestamp exactly), or -1 if the timestamp is newer than (>) all messages in the topic.
You can also use two semantic offsets instead of
a timestamp: earliest gives you the offset of the
first message in the topic and latest gives you
the offset of the last message incremented by 1.
If the topic is empty, both earliest and latest
return the same value (which is 0 unless some messages
were deleted from the topic), and any timestamp returns
-1.
Messages: offset 0 1 2 3 timestamp 10 20 20 30 Calls: resolve_offset(Endpoints, Topic, Partition, 5) → 0 resolve_offset(Endpoints, Topic, Partition, 10) → 0 resolve_offset(Endpoints, Topic, Partition, 13) → 1 resolve_offset(Endpoints, Topic, Partition, 20) → 1 resolve_offset(Endpoints, Topic, Partition, 31) → -1 resolve_offset(Endpoints, Topic, Partition, earliest) → 0 resolve_offset(Endpoints, Topic, Partition, latest) → 4
start() -> ok | no_return()
Start brod application.
start(StartType, StartArgs) -> any()
Application behaviour callback
start_client(BootstrapEndpoints::[endpoint()]) -> ok | {error, any()}
Equivalent to start_client(BootstrapEndpoints, brod_default_client).
start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> ok | {error, any()}
Equivalent to start_client(BootstrapEndpoints, ClientId, []).
start_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> ok | {error, any()}
Start a client (brod_client).
BootstrapEndpoints:
Kafka cluster endpoints, can be any of the brokers in the cluster,
which does not necessarily have to be the leader of any partition,
e.g. a load-balanced entrypoint to the remote Kafka cluster.
ClientId: Atom to identify the client process.
Config is a proplist, possible values:
restart_delay_seconds (optional, default=10)
get_metadata_timeout_seconds (optional, default=5)
{error, timeout} from brod_client:get_xxx calls if
responses for APIs such as metadata, find_coordinator
are not received in time.reconnect_cool_down_seconds (optional, default=1)
allow_topic_auto_creation (optional, default=true)
auto.create.topics.enable is set in the broker configuration.
However if allow_topic_auto_creation is set to false in
client config, brod will avoid sending metadata requests that
may cause an auto-creation of the topic regardless of what
broker config is.auto_start_producers (optional, default=false)
produce but did not call brod:start_producer
explicitly. Can be useful for applications which don't know beforehand
which topics they will be working with.default_producer_config (optional, default=[])
brod_producer:start_link/4 for details about producer configunknown_topic_cache_ttl (optional, default=120000)
Connection options can be added to the same proplist. See
kpro_connection.erl in kafka_protocol for the details:
ssl (optional, default=false)
true | false | ssl:ssl_option()
true is translated to [] as ssl:ssl_option() i.e. all default.
sasl (optional, default=undefined)
{mechanism(), Filename} or {mechanism(), UserName, Password}
where mechanism can be atoms: plain (for "PLAIN"), scram_sha_256
(for "SCRAM-SHA-256") or scram_sha_512 (for SCRAM-SHA-512).
Filename should be a file consisting two lines, first line
is the username and the second line is the password.
Both Username and Password should be string() | binary()connect_timeout (optional, default=5000)
request_timeout (optional, default=240000, constraint: >= 1000)
query_api_versions (optional, default=true)
true, at connection start, brod will send a query request
to get the broker supported API version ranges.
When set to 'false', brod will always use the lowest supported API version
when sending requests to kafka.
Supported API version ranges can be found in:
brod_kafka_apis:supported_versions/1extra_sock_opts (optional, default=[])
[{sndbuf, 1 bsl 20}].
More info
start_consumer(Client::client(), TopicName::topic(), ConsumerConfig::consumer_config()) -> ok | {error, any()}
Dynamically start topic consumer(s) and register it in the client.
A brod_consumer is started for each partition of the given topic.
Note that you can have only one consumer per client-topic.
See brod_consumer:start_link/5 for details about consumer config.
start_link_client(BootstrapEndpoints::[endpoint()]) -> {ok, pid()} | {error, any()}
Equivalent to start_link_client(BootstrapEndpoints,
brod_default_client).
start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id()) -> {ok, pid()} | {error, any()}
Equivalent to start_link_client(BootstrapEndpoints, ClientId, []).
start_link_client(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::client_config()) -> {ok, pid()} | {error, any()}
start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
See also: brod_group_subscriber:start_link/7.
start_link_group_subscriber(Client::client(), GroupId::group_id(), Topics::[topic()], GroupConfig::group_config(), ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
See also: brod_group_subscriber:start_link/8.
start_link_group_subscriber_v2(Config::brod_group_subscriber_v2:subscriber_config()) -> {ok, pid()} | {error, any()}
Start group_subscriber_v2.
start_link_topic_subscriber(Config::brod_topic_subscriber:topic_subscriber_config()) -> {ok, pid()} | {error, any()}
See also: brod_topic_subscriber:start_link/1.
start_link_topic_subscriber(Client::client(), Topic::topic(), ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
Equivalent to start_link_topic_subscriber(Client,
Topic,
all,
ConsumerConfig,
CbModule,
CbInitArg).
This function is deprecated: Please use start_link_topic_subscriber/1 instead
start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
Equivalent to start_link_topic_subscriber(Client,
Topic,
Partitions,
ConsumerConfig,
message,
CbModule,
CbInitArg).
This function is deprecated: Please use start_link_topic_subscriber/1 instead
start_link_topic_subscriber(Client::client(), Topic::topic(), Partitions::all | [partition()], ConsumerConfig::consumer_config(), MessageType::message | message_set, CbModule::module(), CbInitArg::term()) -> {ok, pid()} | {error, any()}
This function is deprecated: Please use start_link_topic_subscriber/1 instead
See also: brod_topic_subscriber:start_link/7.
start_producer(Client::client(), TopicName::topic(), ProducerConfig::producer_config()) -> ok | {error, any()}
Dynamically start a per-topic producer and register it in the client.
You have to start a producer for each topic you want to produce messages
into, unless you have specified auto_start_producers = true when starting
the client (in that case you don't have to call this function at all).
After starting the producer, you can call produce/5 and friends
for producing messages.
You can read more about producers in the overview.
A client has to be already started before making this call (e.g. by calling
start_client/3).
See brod_producer:start_link/4 for a list of available configuration
options.
> brod:start_producer(my_client, <<"my_topic">>, [{max_retries, 5}]).
ok
stop() -> ok
Stop brod application.
stop(State) -> any()
Application behaviour callback
stop_client(Client::client()) -> ok
Stop a client.
subscribe(ConsumerPid::pid(), SubscriberPid::pid(), Options::consumer_config()) -> ok | {error, any()}
Subscribe to a data stream from the given consumer.
Seesubscribe/5 for more information.
subscribe(Client::client(), SubscriberPid::pid(), Topic::topic(), Partition::partition(), Options::consumer_config()) -> {ok, pid()} | {error, any()}
Subscribe to a data stream from the given topic-partition.
A client has to be already started (by calling start_client/3,
one client per multiple topics is enough) and a corresponding consumer
for the topic and partition as well (by calling start_consumer/3),
before calling this function.
Caller may specify a set of options extending consumer config.
See brod_consumer:subscribe/3 for more info on that.
If {error, Reason} is returned, the caller should perhaps retry later.
{ok, ConsumerPid} is returned on success. The caller may want to
monitor the consumer pid and re-subscribe should the ConsumerPid crash.
Upon successful subscription the subscriber process should expect messages
of pattern:
{ConsumerPid, #kafka_message_set{}} and
{ConsumerPid, #kafka_fetch_error{}}.
-include_lib("brod/include/brod.hrl") to access the records.
In case #kafka_fetch_error{} is received the subscriber should
re-subscribe itself to resume the data stream.
To provide a mechanism to handle backpressure, brod requires all messages
sent to a subscriber to be acked by calling consume_ack/4 after
they are processed. If there are too many not-acked messages received by
the subscriber, the consumer will stop to fetch new ones so the subscriber
won't get overwhelmed.
sync_produce_request(CallRef::call_ref()) -> ok | {error, Reason::any()}
Equivalent to sync_produce_request(CallRef, infinity).
sync_produce_request(CallRef::call_ref(), Timeout::timeout()) -> ok | {error, Reason::any()}
Block wait for sent produced request to be acked by kafka.
This way, you can turn asynchronous requests, made by produce/5
and friends, into synchronous ones.
{ok, CallRef} = brod:produce(
brod_client_1, <<"my_topic">>, 0, <<"some-key">>, <<"some-value">>)
). % returns immediately
% the following call waits and returns after the ack is received or timed out
brod:sync_produce_request(CallRef, 5_000).
sync_produce_request_offset(CallRef::call_ref()) -> {ok, offset()} | {error, Reason::any()}
Equivalent to sync_produce_request_offset(CallRef, infinity).
sync_produce_request_offset(CallRef::call_ref(), Timeout::timeout()) -> {ok, offset()} | {error, Reason::any()}
As sync_produce_request/2, but also returning assigned offset.
transaction(Client::client(), TxnId::transactional_id(), Config::transaction_config()) -> {ok, transaction()}
Equivalent to brod_transaction:start_link / 3.
Start a new transaction, TxId will be the id of the transaction
txn_add_offsets(Transaction::transaction(), ConsumerGroup::group_id(), Offsets::offsets_to_commit()) -> ok | {error, any()}
Add the offset consumed by a group to the transaction.
See also: brod_transaction:add_offsets/3.
txn_do(ProcessFun::txn_function(), Client::client(), Options::txn_do_options()) -> {ok, pid()} | {error, any()}
Execute the function in the context of a fetch-produce cycle with access to an open transaction.
See also: brod_transaction_processor:do/3.
txn_produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Batch::batch_input()) -> {ok, offset()} | {error, any()}
Produce the batch of messages to the indicated topic-partition synchronously.
See also: brod_transaction:produce/5.
txn_produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}
Produce the message (key and value) to the indicated topic-partition synchronously.
See also: brod_transaction:produce/5.
unsubscribe(ConsumerPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Assuming the subscriber is %%self().
unsubscribe(ConsumerPid::pid(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
unsubscribe(Client::client(), Topic::topic(), Partition::partition()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Assuming the subscriber is %%self().
unsubscribe(Client::client(), Topic::topic(), Partition::partition(), SubscriberPid::pid()) -> ok | {error, any()}
Unsubscribe the current subscriber.
Generated by EDoc