Module brod_transaction

A brod_transaction is a process that orchestates a set of producers to store messages within a transaction, it also supports committing offsets in the same transaction.

Behaviours: gen_server.

Description

A brod_transaction is a process that orchestates a set of producers to store messages within a transaction, it also supports committing offsets in the same transaction.

Simple produce sample:

   {ok, Tx} = brod_transaction:new(Client, TxId, []),
   lists:foreach(fun(Partition) ->
                     Key = rand(), Value = rand(),
                     {ok, _Offset} =
                     brod_transaction:produce(Tx,
                                              Topic,
                                              Partition,
                                              Key,
                                              Value),
                 end, Partitions),
   brod_transaction:commit(Tx),

handle callback of a group subscriber using offset commit within a transaction:

   handle_message(Topic,
                  Partition,
                  #kafka_message{ offset = Offset
                                , key = Key
                                , value = Value},
                  #{ client := Client
                   , group_id := GroupId} =  State) ->
     {ok, Tx} = brod_transaction:new(Client),
     {ok, _ProducedOffset} = brod_transaction:produce(Tx, ?TOPIC_OUTPUT, Partition, Key, Value),
     ok = brod_transaction:txn_add_offsets(Tx, GroupId, #{{Topic, Partition} => Offset}),
     ok = brod_transaction:commit(Tx)
 
     {ok, ack_no_commit, State}.

Data Types

batch_input()

batch_input() = kpro:batch_input()

call_ref()

call_ref() = brod:call_ref()

client()

client() = client_id() | pid()

client_id()

client_id() = atom()

group_id()

group_id() = kpro:group_id()

key()

key() = brod:key()

offset()

offset() = kpro:offset()

offsets_to_commit()

offsets_to_commit() = kpro:offsets_to_commit()

partition()

partition() = kpro:partition()

topic()

topic() = kpro:topic()

transaction()

transaction() = pid()

transaction_config()

transaction_config() = [{timeout, non_neg_integer()} | {backoff_step, non_neg_integer()} | {max_retries, non_neg_integer()}]

transactional_id()

transactional_id() = kpro:transactional_id()

txn_ctx()

txn_ctx() = kpro:txn_ctx()

value()

value() = brod:value()

Function Index

abort/1Abort the transaction, after this, the gen_server will stop.
add_offsets/3Add the offset consumed by a group to the transaction.
commit/1Commit the transaction, after this, the gen_server will stop.
handle_call/3
handle_cast/2
init/1
new/3
produce/4Synchronously produce the batch of messages to the indicated topic-partition.
produce/5Produce the message (key and value) to the indicated topic-partition synchronously.
start_link/3Start a new transaction, TxIdwill be the id of the transaction Config is a proplist, all values are optional: timeout:Connection timeout in millis `backoff_step: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries
stop/1Stop the transaction.
terminate/2

Function Details

abort/1

abort(Transaction::transaction()) -> ok | {error, any()}

Abort the transaction, after this, the gen_server will stop

add_offsets/3

add_offsets(Transaction::transaction(), ConsumerGroup::group_id(), Offsets::offsets_to_commit()) -> ok | {error, any()}

Add the offset consumed by a group to the transaction.

commit/1

commit(Transaction::transaction()) -> ok | {error, any()}

Commit the transaction, after this, the gen_server will stop

handle_call/3

handle_call(Call, From, State) -> any()

handle_cast/2

handle_cast(Cast, State) -> any()

init/1

init(X1) -> any()

new/3

new(Client::client(), TxId::transactional_id(), Config::transaction_config()) -> {ok, transaction()}

See also: start_link/3.

produce/4

produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Batch::batch_input()) -> {ok, offset()} | {error, any()}

Synchronously produce the batch of messages to the indicated topic-partition

produce/5

produce(Transaction::transaction(), Topic::topic(), Partition::partition(), Key::key(), Value::value()) -> {ok, offset()} | {error, any()}

Produce the message (key and value) to the indicated topic-partition synchronously.

start_link/3

start_link(Client::client(), TxId::transactional_id(), Config::transaction_config()) -> {ok, pid()}

Start a new transaction, TxIdwill be the id of the transaction Config is a proplist, all values are optional: timeout:Connection timeout in millis `backoff_step: after each retry it will sleep for 2^Attempt * backoff_step millis max_retries

stop/1

stop(Transaction::transaction()) -> ok | {error, any()}

Stop the transaction.

terminate/2

terminate(Reason, State) -> any()


Generated by EDoc