Module brod_transaction_processor

brod_transaction_processor allows the execution of a function in the context of a transaction.

Description

brod_transaction_processor allows the execution of a function in the context of a transaction. It abstracts the usage of a group subscriber reading and writing using a transaction in each fetch cycle. For example, the following snippets are equivalent

-------------------------------------------------

function_that_does_something(Messages, ...) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...).

handle_message(Topic, Partition, Messages, State) -> {ok, Tx} = brod:transaction(...) % opens a transaction function_that_does_something(Messages, ...) % adds the writes to the transaction ok = brod:txn_add_offsets(...) % add offsets to the transsaction ok = btrod:commit(Tx) % commit {ok, ack_no_commit, State}

-------------------------------------------------

brod_transaction_processor:do( fun(Transaction, Messages) -> write_some_messages_into_some_topic(Messages, ...), write_some_other_messages_into_yet_another_topic(Messages, ...) end, ...)

-------------------------------------------------

Data Types

client()

client() = client_id() | pid()

client_id()

client_id() = atom()

do_options()

do_options() = #{group_config => proplists:proplist(), consumer_config => proplists:proplist(), transaction_config => proplists:proplist(), group_id => binary(), topics => [binary()]}

message_set()

message_set() = #kafka_message_set{topic = brod:topic(), partition = brod:partition(), high_wm_offset = integer(), messages = [brod:message()] | kpro:incomplete_batch()}

process_function()

process_function() = fun((transaction(), message_set()) -> ok | {error, any()})

transaction()

transaction() = brod_transaction:transaction()

Function Index

do/3executes the ProcessFunction within the context of a transaction.
get_committed_offsets/3
handle_message/4
init/2

Function Details

do/3

do(ProcessFun::process_function(), Client::client(), Opts::do_options()) -> {ok, pid()} | {error, any()}

executes the ProcessFunction within the context of a transaction. Options is a map that can include group_config as the configuration for the group suscriber. consumer_config as the configuration for the consumer suscriber. transaction_config transacction config. group_id as the subscriber group id. topics topics to fetch from.

FizzBuzz sample:

fizz_buzz(N) when (N rem 15) == 0 -> "FizzBuzz" fizz_buzz(N) when (N rem 3) == 0 -> "Fizz" fizz_buzz(N) when (N rem 5) == 0 -> "Buzz"; fizz_buzz(N) -> N end.

brod_transaction_processor:do( fun(Transaction, #kafka_message_set{ topic = _Topic , partition = Partition , messages = Messages} = _MessageSet) -> FizzBuzzed = lists:map(fun(#kafka_message{ key = Key , value = Value}) -> #{ key => Key , value => fizz_buzz(Value)} end, Messages),

brod:txn_produce(Transaction, ?OUTPUT_TOPIC, Partition, FizzBuzzed),

ok end, Client, #{ topics => [?INPUT_TOPIC] , group_id => ?PROCESSOR_GROUP_ID}).

get_committed_offsets/3

get_committed_offsets(GroupId, TPs, State) -> any()

handle_message/4

handle_message(Topic, Partition, Kafka_message_set, State) -> any()

init/2

init(GroupId, Opts) -> any()


Generated by EDoc