brod_transaction is a process that orchestates a set of
producers to store messages within a transaction, it also supports
committing offsets in the same transaction.
Behaviours: gen_server.
A brod_transaction is a process that orchestates a set of
producers to store messages within a transaction, it also supports
committing offsets in the same transaction.
Simple produce sample:
{ok, Tx} = brod_transaction:new(Client, TxId, []),
lists:foreach(fun(Partition) ->
Key = rand(), Value = rand(),
{ok, _Offset} =
brod_transaction:produce(Tx,
Topic,
Partition,
Key,
Value),
end, Partitions),
brod_transaction:commit(Tx),
handle callback of a group subscriber using offset commit within a transaction:
handle_message(Topic,
Partition,
#kafka_message{ offset = Offset
, key = Key
, value = Value},
#{ client := Client
, group_id := GroupId} = State) ->
{ok, Tx} = brod_transaction:new(Client),
{ok, _ProducedOffset} = brod_transaction:produce(Tx, ?TOPIC_OUTPUT, Partition, Key, Value),
ok = brod_transaction:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}),
ok = brod_transaction:commit(Tx)
{ok, ack_no_commit, State}.
batch_input() = kpro:batch_input()
call_ref() = brod:call_ref()
client() = client_id() | pid()
client_id() = atom()
group_id() = kpro:group_id()
key() = brod:key()
offset() = kpro:offset()
offsets_to_commit() = kpro:offsets_to_commit()
partition() = kpro:partition()
topic() = kpro:topic()
transaction() = pid()
transaction_config() = [{timeout, non_neg_integer()} | {backoff_step, non_neg_integer()} | {max_retries, non_neg_integer()}]
transactional_id() = kpro:transactional_id()
txn_ctx() = kpro:txn_ctx()
value() = brod:value()
| abort/1 | Abort the transaction, after this, the gen_server will stop. |
| add_offsets/3 | Add the offset consumed by a group to the transaction. |
| commit/1 | Commit the transaction, after this, the gen_server will stop. |
| handle_call/3 | |
| handle_cast/2 | |
| init/1 | |
| new/3 | |
| produce/4 | Synchronously produce the batch of messages to the indicated topic-partition. |
| produce/5 | Produce the message (key and value) to the indicated topic-partition synchronously. |
| start_link/3 | Start a new transaction, TxIdwill be the id of the transaction
Config is a proplist, all values are optional:
timeout:Connection timeout in millis
`backoff_step: after each retry it will sleep for 2^Attempt * backoff_step
millis
max_retries |
| stop/1 | Stop the transaction. |
| terminate/2 |
abort(Transaction::transaction()) -> ok | {error, any()}
Abort the transaction, after this, the gen_server will stop
add_offsets(Transaction::transaction(), ConsumerGroup::group_id(), Offsets::offsets_to_commit()) -> ok | {error, any()}
Add the offset consumed by a group to the transaction.
commit(Transaction::transaction()) -> ok | {error, any()}
Commit the transaction, after this, the gen_server will stop
handle_call(Call, From, State) -> any()
handle_cast(Cast, State) -> any()
init(X1) -> any()
new(Client::client(), TxId::transactional_id(), Config::transaction_config()) -> {ok, transaction()}
See also: start_link/3.
produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Batch::batch_input()) -> {ok, offset()} | {error, any()}
Synchronously produce the batch of messages to the indicated topic-partition
produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}
Produce the message (key and value) to the indicated topic-partition synchronously.
start_link(Client::client(), TxId::transactional_id(), Config::transaction_config()) -> {ok, pid()}
Start a new transaction, TxIdwill be the id of the transaction
Config is a proplist, all values are optional:
timeout:Connection timeout in millis
`backoff_step: after each retry it will sleep for 2^Attempt * backoff_step
millis
max_retries
stop(Transaction::transaction()) -> ok | {error, any()}
Stop the transaction.
terminate(Reason, State) -> any()
Generated by EDoc