brod_transaction_processor allows the execution of a function in the context
of a transaction.
brod_transaction_processor allows the execution of a function in the context
of a transaction. It abstracts the usage of a group subscriber reading and writing
using a transaction in each fetch cycle.
For example, the following snippets are equivalent
-------------------------------------------------
function_that_does_something(Messages, ...) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...).
handle_message(Topic, Partition, Messages, State) -> {ok, Tx} = brod:transaction(...) % opens a transaction function_that_does_something(Messages, ...) % adds the writes to the transaction ok = brod:txn_add_offsets(...) % add offsets to the transsaction ok = btrod:commit(Tx) % commit {ok, ack_no_commit, State}
-------------------------------------------------
brod_transaction_processor:do( fun(Transaction, Messages) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...) end, ...)
-------------------------------------------------client() = client_id() | pid()
client_id() = atom()
do_options() = #{group_config => proplists:proplist(), consumer_config => proplists:proplist(), transaction_config => proplists:proplist(), group_id => binary(), topics => [binary()]}
message_set() = #kafka_message_set{topic = brod:topic(), partition = brod:partition(), high_wm_offset = integer(), messages = [brod:message()] | kpro:incomplete_batch()}
process_function() = fun((transaction(), message_set()) -> ok | {error, any()})
transaction() = brod_transaction:transaction()
| do/3 | executes the ProcessFunction within the context of a transaction. |
| get_committed_offsets/3 | |
| handle_message/4 | |
| init/2 |
do(ProcessFun::process_function(), Client::client(), Opts::do_options()) -> {ok, pid()} | {error, any()}
executes the ProcessFunction within the context of a transaction.
Options is a map that can include
group_config as the configuration for the group suscriber.
consumer_config as the configuration for the consumer suscriber.
transaction_config transacction config.
group_id as the subscriber group id.
topics topics to fetch from.
FizzBuzz sample:
fizz_buzz(N) when (N rem 15) == 0 -> "FizzBuzz" fizz_buzz(N) when (N rem 3) == 0 -> "Fizz" fizz_buzz(N) when (N rem 5) == 0 -> "Buzz"; fizz_buzz(N) -> N end.
brod_transaction_processor:do( fun(Transaction, #kafka_message_set{ topic = _Topic , partition = Partition , messages = Messages} = _MessageSet) -> FizzBuzzed = lists:map(fun(#kafka_message{ key = Key , value = Value}) -> #{ key => Key , value => fizz_buzz(Value)} end, Messages),
brod:txn_produce(Transaction, ?OUTPUT_TOPIC, Partition, FizzBuzzed),
ok end, Client, #{ topics => [?INPUT_TOPIC] , group_id => ?PROCESSOR_GROUP_ID}).get_committed_offsets(GroupId, TPs, State) -> any()
handle_message(Topic, Partition, Kafka_message_set, State) -> any()
init(GroupId, Opts) -> any()
Generated by EDoc