Qpid Proton C API  0.17.0
broker.c

A simple multithreaded broker that works with the send and receive examples.Requires C++11

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <proton/engine.h>
#include <proton/sasl.h>
#include <proton/url.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
/* TODO aconway 2016-10-14: this example does not require libuv IO,
it uses uv.h only for portable mutex and thread functions.
*/
#include <uv.h>
bool enable_debug = false;
void debug(const char* fmt, ...) {
if (enable_debug) {
va_list(ap);
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
fputc('\n', stderr);
fflush(stderr);
}
}
void check(int err, const char* s) {
if (err != 0) {
perror(s);
exit(1);
}
}
void pcheck(int err, const char* s) {
if (err != 0) {
fprintf(stderr, "%s: %s", s, pn_code(err));
exit(1);
}
}
/* Simple re-sizable vector that acts as a queue */
#define VEC(T) struct { T* data; size_t len, cap; }
#define VEC_INIT(V) \
do { \
V.len = 0; \
V.cap = 16; \
void **vp = (void**)&V.data; \
*vp = malloc(V.cap * sizeof(*V.data)); \
} while(0)
#define VEC_FINAL(V) free(V.data)
#define VEC_PUSH(V, X) \
do { \
if (V.len == V.cap) { \
V.cap *= 2; \
void **vp = (void**)&V.data; \
*vp = realloc(V.data, V.cap * sizeof(*V.data)); \
} \
V.data[V.len++] = X; \
} while(0) \
#define VEC_POP(V) \
do { \
if (V.len > 0) \
memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
} while(0)
/* Simple thread-safe queue implementation */
typedef struct queue_t {
uv_mutex_t lock;
char* name;
VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
struct queue_t *next; /* Next queue in chain */
size_t sent; /* Count of messages sent, used as delivery tag */
} queue_t;
static void queue_init(queue_t *q, const char* name, queue_t *next) {
debug("created queue %s", name);
uv_mutex_init(&q->lock);
q->name = strdup(name);
VEC_INIT(q->messages);
VEC_INIT(q->waiting);
q->next = next;
q->sent = 0;
}
static void queue_destroy(queue_t *q) {
uv_mutex_destroy(&q->lock);
free(q->name);
for (size_t i = 0; i < q->messages.len; ++i)
free(q->messages.data[i].start);
VEC_FINAL(q->messages);
for (size_t i = 0; i < q->waiting.len; ++i)
pn_decref(q->waiting.data[i]);
VEC_FINAL(q->waiting);
}
/* Send a message on s, or record s as eating if no messages.
Called in s dispatch loop, assumes s has credit.
*/
static void queue_send(queue_t *q, pn_link_t *s) {
pn_rwbytes_t m = { 0 };
size_t tag = 0;
uv_mutex_lock(&q->lock);
if (q->messages.len == 0) { /* Empty, record connection as waiting */
debug("queue is empty %s", q->name);
/* Record connection for wake-up if not already on the list. */
size_t i = 0;
for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
;
if (i == q->waiting.len) {
VEC_PUSH(q->waiting, c);
}
} else {
debug("sending from queue %s", q->name);
m = q->messages.data[0];
VEC_POP(q->messages);
tag = ++q->sent;
}
uv_mutex_unlock(&q->lock);
if (m.start) {
pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
pn_link_send(s, m.start, m.size);
pn_delivery_settle(d); /* Pre-settled: unreliable, there will bea no ack/ */
free(m.start);
}
}
/* Data associated with each broker connection */
typedef struct broker_data_t {
bool check_queues; /* Check senders on the connection for available data in queues. */
} broker_data_t;
/* Use the context pointer as a boolean flag to indicate we need to check queues */
void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
pn_connection_set_context(c, (void*)check);
}
bool pn_connection_get_check_queues(pn_connection_t *c) {
return (bool)pn_connection_get_context(c);
}
/* Put a message on the queue, called in receiver dispatch loop.
If the queue was previously empty, notify waiting senders.
*/
static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
debug("received to queue %s", q->name);
uv_mutex_lock(&q->lock);
VEC_PUSH(q->messages, m);
if (q->messages.len == 1) { /* Was empty, notify waiting connections */
for (size_t i = 0; i < q->waiting.len; ++i) {
pn_connection_t *c = q->waiting.data[i];
pn_connection_set_check_queues(c, true);
pn_connection_wake(c); /* Wake the connection */
}
q->waiting.len = 0;
}
uv_mutex_unlock(&q->lock);
}
/* Thread safe set of queues */
typedef struct queues_t {
uv_mutex_t lock;
queue_t *queues;
size_t sent;
} queues_t;
void queues_init(queues_t *qs) {
uv_mutex_init(&qs->lock);
qs->queues = NULL;
}
void queues_destroy(queues_t *qs) {
for (queue_t *q = qs->queues; q; q = q->next) {
queue_destroy(q);
free(q);
}
uv_mutex_destroy(&qs->lock);
}
queue_t* queues_get(queues_t *qs, const char* name) {
uv_mutex_lock(&qs->lock);
queue_t *q;
for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
;
if (!q) {
q = (queue_t*)malloc(sizeof(queue_t));
queue_init(q, name, qs->queues);
qs->queues = q;
}
uv_mutex_unlock(&qs->lock);
return q;
}
/* The broker implementation */
typedef struct broker_t {
pn_proactor_t *proactor;
queues_t queues;
const char *container_id; /* AMQP container-id */
size_t threads;
pn_millis_t heartbeat;
bool finished;
} broker_t;
void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
memset(b, 0, sizeof(*b));
b->proactor = pn_proactor();
queues_init(&b->queues);
b->container_id = container_id;
b->threads = threads;
b->heartbeat = 0;
}
void broker_stop(broker_t *b) {
/* In this broker an interrupt stops a thread, stopping all threads stops the broker */
for (size_t i = 0; i < b->threads; ++i)
pn_proactor_interrupt(b->proactor);
}
/* Try to send if link is sender and has credit */
static void link_send(broker_t *b, pn_link_t *s) {
if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
const char *qname = pn_terminus_get_address(pn_link_source(s));
queue_t *q = queues_get(&b->queues, qname);
queue_send(q, s);
}
}
static void queue_unsub(queue_t *q, pn_connection_t *c) {
uv_mutex_lock(&q->lock);
for (size_t i = 0; i < q->waiting.len; ++i) {
if (q->waiting.data[i] == c){
q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
VEC_POP(q->waiting);
break;
}
}
uv_mutex_unlock(&q->lock);
}
/* Unsubscribe from the queue of interest to this link. */
static void link_unsub(broker_t *b, pn_link_t *s) {
const char *qname = pn_terminus_get_address(pn_link_source(s));
if (qname) {
queue_t *q = queues_get(&b->queues, qname);
}
}
}
/* Called in connection's event loop when a connection is woken for messages.*/
static void connection_unsub(broker_t *b, pn_connection_t *c) {
for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
link_unsub(b, l);
}
static void session_unsub(broker_t *b, pn_session_t *ssn) {
for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
if (pn_link_session(l) == ssn)
link_unsub(b, l);
}
}
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
fprintf(stderr, "%s: %s: %s\n", ename,
}
}
const int WINDOW=10; /* Incoming credit window */
static void handle(broker_t* b, pn_event_t* e) {
switch (pn_event_type(e)) {
break;
pn_connection_set_container(c, b->container_id);
break;
/* Turn off security */
pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
pn_transport_set_idle_timeout(t, 2 * b->heartbeat);
}
pn_connection_open(pn_event_connection(e)); /* Complete the open */
break;
}
if (pn_connection_get_check_queues(c)) {
pn_connection_set_check_queues(c, false);
for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
link_send(b, l);
}
break;
}
break;
}
} else {
pn_link_flow(l, WINDOW);
}
break;
}
case PN_LINK_FLOW: {
link_send(b, pn_event_link(e));
break;
}
case PN_DELIVERY: {
{
size_t size = pn_delivery_pending(d);
/* The broker does not decode the message, just forwards it. */
pn_rwbytes_t m = { size, (char*)malloc(size) };
pn_link_recv(r, m.start, m.size);
const char *qname = pn_terminus_get_address(pn_link_target(r));
queue_receive(b->proactor, queues_get(&b->queues, qname), m);
pn_link_flow(r, WINDOW - pn_link_credit(r));
}
break;
}
connection_unsub(b, pn_event_connection(e));
check_condition(e, pn_transport_condition(pn_event_transport(e)));
break;
connection_unsub(b, pn_event_connection(e));
break;
session_unsub(b, pn_event_session(e));
break;
check_condition(e, pn_link_remote_condition(pn_event_link(e)));
link_unsub(b, pn_event_link(e));
break;
check_condition(e, pn_listener_condition(pn_event_listener(e)));
break;
case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
broker_stop(b);
break;
b->finished = true;
break;
default:
break;
}
}
static void broker_thread(void *void_broker) {
broker_t *b = (broker_t*)void_broker;
do {
pn_event_batch_t *events = pn_proactor_wait(b->proactor);
while ((e = pn_event_batch_next(events))) {
handle(b, e);
}
pn_proactor_done(b->proactor, events);
} while(!b->finished);
}
static void usage(const char *arg0) {
fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
exit(1);
}
int main(int argc, char **argv) {
/* Command line options */
char *urlstr = NULL;
char container_id[256];
/* Default container-id is program:pid */
snprintf(container_id, sizeof(container_id), "%s:%d", argv[0], getpid());
size_t nthreads = 4;
pn_millis_t heartbeat = 0;
int opt;
while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) {
switch (opt) {
case 'a': urlstr = optarg; break;
case 't': nthreads = atoi(optarg); break;
case 'd': enable_debug = true; break;
case 'h': heartbeat = atoi(optarg); break;
case 'c': strncpy(container_id, optarg, sizeof(container_id)); break;
default: usage(argv[0]); break;
}
}
if (optind < argc)
usage(argv[0]);
broker_t b;
broker_init(&b, container_id, nthreads, heartbeat);
/* Parse the URL or use default values */
pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
/* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default,
this will also listen for mapped IPv4 on the same port.
*/
const char *host = url ? pn_url_get_host(url) : "::";
const char *port = url ? pn_url_get_port(url) : "amqp";
pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
if (url) pn_url_free(url);
if (b.threads <= 0) {
fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads);
exit(1);
}
/* Start n-1 threads and use main thread */
uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads);
for (size_t i = 0; i < b.threads-1; ++i) {
check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
}
broker_thread(&b); /* Use the main thread too. */
for (size_t i = 0; i < b.threads-1; ++i) {
check(uv_thread_join(&threads[i]), "pthread_join");
}
pn_proactor_free(b.proactor);
free(threads);
return 0;
}
pn_delivery_settle
void pn_delivery_settle(pn_delivery_t *delivery)
Settle a delivery.
pn_proactor_done
void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when done handling a batch of events.
pn_connection_remote_condition
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
pn_connection_wake
void pn_connection_wake(pn_connection_t *connection)
Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if there are no IO events pendi...
PN_LOCAL_ACTIVE
#define PN_LOCAL_ACTIVE
The local endpoint state is active.
Definition: connection.h:56
pn_proactor_wait
pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait for events to handle.
pn_connection_transport
pn_transport_t * pn_connection_transport(pn_connection_t *connection)
Get the transport bound to a connection object.
pn_event_t
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition: event.h:75
PN_REMOTE_ACTIVE
#define PN_REMOTE_ACTIVE
The remote endpoint state is active.
Definition: connection.h:71
pn_transport_set_idle_timeout
void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
Set the idle timeout for a transport.
pn_terminus_set_address
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
PN_DELIVERY
A delivery has been created or updated.
Definition: event.h:254
PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition: event.h:223
pn_connection_set_context
void pn_connection_set_context(pn_connection_t *connection, void *context)
PN_CONNECTION_BOUND
The connection has been bound to a transport.
Definition: event.h:119
PN_TRANSPORT_CLOSED
Indicates that the both the head and tail of the transport are closed.
Definition: event.h:307
pn_condition_get_name
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
pn_listener_accept
int pn_listener_accept(pn_listener_t *, pn_connection_t *connection)
Asynchronously accept a connection using the listener.
PN_PROACTOR_INACTIVE
The proactor becaome inactive: all listeners and connections are closed and their events processed,...
Definition: event.h:353
pn_session_open
void pn_session_open(pn_session_t *session)
Open a session.
pn_transport_t
struct pn_transport_t pn_transport_t
An AMQP Transport object.
Definition: types.h:419
pn_condition_get_description
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
pn_delivery_update
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
Update the disposition of a delivery.
pn_url_t
struct pn_url_t pn_url_t
A parsed URL.
Definition: url.h:42
pn_code
const char * pn_code(int code)
Get the name of the error code.
pn_rwbytes_t
A non-const byte buffer.
Definition: types.h:225
pn_listener_condition
pn_condition_t * pn_listener_condition(pn_listener_t *l)
Get the error condition for a listener.
pn_session_close
void pn_session_close(pn_session_t *session)
Close a session.
pn_proactor_t
struct pn_proactor_t pn_proactor_t
The proactor, see pn_proactor()
Definition: proactor.h:63
pn_event_delivery
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
connection_driver.h
PN_CONNECTION_INIT
The connection has been created.
Definition: event.h:113
pn_proactor
pn_proactor_t * pn_proactor(void)
Create a proactor.
PN_LISTENER_ACCEPT
Indicates the listener is ready to call pn_listener_accept() Events of this type point to the pn_list...
Definition: event.h:327
pn_delivery_t
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition: types.h:395
pn_event_batch_next
pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Experimental - Remove the next event from the batch and return it.
pn_terminus_get_address
const char * pn_terminus_get_address(pn_terminus_t *terminus)
Get the address of a terminus object.
pn_millis_t
uint32_t pn_millis_t
A span of time in milliseconds.
Definition: types.h:139
PN_PROACTOR_INTERRUPT
Indicates pn_proactor_interrupt() was called to interrupt a proactor thread Events of this type point...
Definition: event.h:339
pn_condition_t
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition: condition.h:64
pn_connection_get_context
void * pn_connection_get_context(pn_connection_t *connection)
pn_listener
pn_listener_t * pn_listener(void)
Create a listener.
pn_event_batch_t
struct pn_event_batch_t pn_event_batch_t
Experimental - A batch of events to handle.
Definition: event.h:544
pn_proactor_free
void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
pn_event_connection
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition: event.h:149
sasl.h
pn_condition_is_set
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
pn_connection_t
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition: types.h:275
PN_SESSION_REMOTE_OPEN
The remote endpoint has opened the session.
Definition: event.h:174
pn_sasl_allowed_mechs
void pn_sasl_allowed_mechs(pn_sasl_t *sasl, const char *mechs)
SASL mechanisms that are to be considered for authentication.
PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition: event.h:186
PN_CONNECTION_WAKE
pn_connection_wake() was called.
Definition: event.h:321
pn_connection_close
void pn_connection_close(pn_connection_t *connection)
Close a connection.
pn_event_session
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
proactor.h
pn_proactor_listen
int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, const char *host, const char *port, int backlog)
Start listening with listener.
pn_delivery_partial
bool pn_delivery_partial(pn_delivery_t *delivery)
Check if a delivery only has partial message data.
url.h
pn_connection_open
void pn_connection_open(pn_connection_t *connection)
Open a connection.
PN_CONNECTION_REMOTE_OPEN
The remote endpoint has opened the connection.
Definition: event.h:137
pn_session_free
void pn_session_free(pn_session_t *session)
Free a session object.
PN_ACCEPTED
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition: disposition.h:64
pn_delivery
pn_delivery_t * pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
Create a delivery on a link.
pn_delivery_link
pn_link_t * pn_delivery_link(pn_delivery_t *delivery)
Get the parent link for a delivery object.
pn_connection_set_container
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
pn_delivery_readable
bool pn_delivery_readable(pn_delivery_t *delivery)
Check if a delivery is readable.
pn_session_remote_condition
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
transport.h
PN_LINK_FLOW
The flow control state for a link has changed.
Definition: event.h:241
pn_session_t
struct pn_session_t pn_session_t
An AMQP Session object.
Definition: types.h:286
pn_url_free
void pn_url_free(pn_url_t *url)
Free a URL.
pn_url_parse
pn_url_t * pn_url_parse(const char *url)
Parse a string URL as a pn_url_t.
pn_connection
pn_connection_t * pn_connection(void)
Factory to construct a new Connection.
PN_LINK_REMOTE_OPEN
The remote endpoint has opened the link.
Definition: event.h:211
pn_transport_condition
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
pn_event_listener
pn_listener_t * pn_event_listener(pn_event_t *event)
Return the listener associated with an event or NULL.
pn_sasl
pn_sasl_t * pn_sasl(pn_transport_t *transport)
Construct an Authentication and Security Layer object.
pn_dtag
pn_delivery_tag_t pn_dtag(const char *bytes, size_t size)
Construct a delivery tag.
pn_event_type
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_event_type_name
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_transport_require_auth
void pn_transport_require_auth(pn_transport_t *transport, bool required)
Set whether a non-authenticated transport connection is allowed.
pn_delivery_pending
size_t pn_delivery_pending(pn_delivery_t *delivery)
Get the amount of pending message data for a delivery.
pn_session_connection
pn_connection_t * pn_session_connection(pn_session_t *session)
Get the parent connection for a session object.
pn_proactor_interrupt
void pn_proactor_interrupt(pn_proactor_t *proactor)
Cause PN_PROACTOR_INTERRUPT to be returned to exactly one call of pn_proactor_wait().
pn_event_transport
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
pn_event_link
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.
PN_LISTENER_CLOSE
Indicates the listener has closed.
Definition: event.h:333