brod_client in brod is a gen_server responsible for establishing
and maintaining tcp sockets connecting to kafka brokers.
Behaviours: gen_server.
A brod_client in brod is a gen_server responsible for establishing
and maintaining tcp sockets connecting to kafka brokers. It also manages
per-topic-partition producer and consumer processes under two-level
supervision trees.
client() = brod:client()
client_id() = brod:client_id()
config() = proplists:proplist()
endpoint() = brod:endpoint()
get_consumer_error() = client_down | {client_down, any()} | {consumer_down, any()} | {consumer_not_found, topic()} | {consumer_not_found, topic(), partition()}
get_producer_error() = client_down | {client_down, any()} | {producer_down, any()} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}
group_id() = brod:group_id()
partition() = brod:partition()
topic() = brod:topic()
transactional_id() = brod:transactional_id()
| deregister_consumer/3 | De-register the consumer for a partition. |
| deregister_producer/3 | De-register the producer for a partition. |
| find_consumer/3 | |
| find_producer/3 | |
| get_bootstrap/1 | |
| get_connection/3 | Get connection to a kafka broker. |
| get_consumer/3 | Get consumer of the given topic-partition. |
| get_group_coordinator/2 | Get broker endpoint and connection config for connecting a group coordinator. |
| get_leader_connection/3 | Get the connection to kafka broker which is a leader for given Topic-Partition. |
| get_metadata/2 | Get topic metadata, if topic is undefined, will fetch ALL metadata. |
| get_metadata_safe/2 | Ensure not topic auto creation even if Kafka has it enabled. |
| get_partitions_count/2 | Get number of partitions for a given topic. |
| get_partitions_count_safe/2 | Get number of partitions for an existing topic. |
| get_producer/3 | Get producer of the given topic-partition. |
| get_transactional_coordinator/2 | Get broker endpoint and connection config for connecting a transactional coordinator. |
| lookup_partitions_count_cache/2 | |
| register_consumer/3 | Register self() as a partition consumer. |
| register_producer/3 | Register self() as a partition producer. |
| start_consumer/3 | Dynamically start a topic consumer. |
| start_link/3 | |
| start_producer/3 | Dynamically start a per-topic producer. |
| stop/1 | |
| stop_consumer/2 | Stop all partition consumers of the given topic. |
| stop_producer/2 | Stop all partition producers of the given topic. |
deregister_consumer(Client::client(), Topic::topic(), Partition::partition()) -> ok
De-register the consumer for a partition.
The partition consumer entry is deleted from the ETS table to allow cleanup of purposefully %% stopped consumers and allow later restart.deregister_producer(Client::client(), Topic::topic(), Partition::partition()) -> ok
De-register the producer for a partition.
The partition producer entry is deleted from the ETS table to allow cleanup of purposefully stopped producers and allow later restart.find_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_consumer_error()}
find_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_producer_error()}
get_bootstrap(Client::client()) -> {ok, brod:bootstrap()} | {error, any()}
get_connection(Client::client(), Host::brod:hostname(), Port::brod:portnum()) -> {ok, pid()} | {error, any()}
Get connection to a kafka broker.
Return already established connection towards the broker, otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago,{error, LastReason} is returned.
get_consumer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_consumer_error()}
Get consumer of the given topic-partition.
get_group_coordinator(Client::client(), GroupId::group_id()) -> {ok, {endpoint(), brod:conn_config()}} | {error, any()}
Get broker endpoint and connection config for connecting a group coordinator.
get_leader_connection(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, any()}
Get the connection to kafka broker which is a leader for given Topic-Partition.
Return already established connection towards the leader broker, Otherwise a new one is established and cached in client state. If the old connection was dead less than a configurable N seconds ago,{error, LastReason} is returned.
get_metadata(Client::client(), Topic::all | undefined | topic()) -> {ok, kpro:struct()} | {error, any()}
Get topic metadata, if topic is undefined, will fetch ALL metadata.
get_metadata_safe(Client::client(), Topic::topic()) -> {ok, kpro:struct()} | {error, any()}
Ensure not topic auto creation even if Kafka has it enabled.
Get number of partitions for a given topic.
Get number of partitions for an existing topic. Ensured not to auto create a topic even when Kafka is configured with topic auto creation enabled.
get_producer(Client::client(), Topic::topic(), Partition::partition()) -> {ok, pid()} | {error, get_producer_error()}
Get producer of the given topic-partition.
The producer is started if auto_start_producers is enabled in client config.get_transactional_coordinator(Client::client(), TransactionId::transactional_id()) -> {ok, {endpoint(), brod:conn_config()}} | {error, any()}
Get broker endpoint and connection config for connecting a transactional coordinator.
lookup_partitions_count_cache(Ets::ets:tab(), Topic::undefined | topic()) -> {ok, pos_integer()} | {error, any()} | false
register_consumer(Client::client(), Topic::topic(), Partition::partition()) -> ok
Register self() as a partition consumer.
The pid is registered in an ETS table, then the callers may lookup a consumer pid from the table and make subscribe calls to the process directly.register_producer(Client::client(), Topic::topic(), Partition::partition()) -> ok
Register self() as a partition producer.
The pid is registered in an ETS table, then the callers may lookup a producer pid from the table and make produce requests to the producer process directly.start_consumer(Client::client(), TopicName::topic(), ConsumerConfig::brod:consumer_config()) -> ok | {error, any()}
Dynamically start a topic consumer.
Returns ok if the consumer is already started.start_link(BootstrapEndpoints::[endpoint()], ClientId::client_id(), Config::config()) -> {ok, pid()} | {error, any()}
start_producer(Client::client(), TopicName::topic(), ProducerConfig::brod:producer_config()) -> ok | {error, any()}
Dynamically start a per-topic producer.
Return ok if the producer is already started.stop(Client::client()) -> ok
Stop all partition consumers of the given topic.
Stop all partition producers of the given topic.
Generated by EDoc