Qpid Proton C API  0.17.0
receive.c

Subscribes to the 'example' node and prints the message bodies received.

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/link.h>
#include <proton/message.h>
#include <proton/session.h>
#include <proton/url.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
typedef char str[1024];
typedef struct app_data_t {
str address;
str container_id;
pn_rwbytes_t message_buffer;
int message_count;
int received;
pn_proactor_t *proactor;
bool finished;
} app_data_t;
static const int BATCH = 100; /* Batch size for unlimited receive */
static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
exit_code = 1;
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
}
}
#define MAX_SIZE 1024
static void decode_message(pn_delivery_t *dlv) {
static char buffer[MAX_SIZE];
ssize_t len;
// try to decode the message body
if (pn_delivery_pending(dlv) < MAX_SIZE) {
// read in the raw data
len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
if (len > 0) {
// decode it into a proton message
if (PN_OK == pn_message_decode(m, buffer, len)) {
pn_string_t *s = pn_string(NULL);
pn_inspect(pn_message_body(m), s);
printf("%s\n", pn_string_get(s));
pn_free(s);
}
}
}
}
static void handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
pn_connection_set_container(c, app->container_id);
pn_link_t* l = pn_receiver(s, "my_receiver");
/* cannot receive without granting credit: */
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
} break;
case PN_DELIVERY: {
/* A message has been received */
pn_link_t *link = NULL;
link = pn_delivery_link(dlv);
decode_message(dlv);
/* Accept the delivery */
/* done with the delivery, move to the next and free it */
pn_delivery_settle(dlv); /* dlv is now freed */
if (app->message_count == 0) {
/* receive forever - see if more credit is needed */
if (pn_link_credit(link) < BATCH/2) {
/* Grant enough credit to bring it up to BATCH: */
pn_link_flow(link, BATCH - pn_link_credit(link));
}
} else if (++app->received >= app->message_count) {
/* done receiving, close the endpoints */
printf("%d messages received\n", app->received);
}
}
} break;
check_condition(event, pn_transport_condition(pn_event_transport(event)));
break;
check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
break;
check_condition(event, pn_session_remote_condition(pn_event_session(event)));
break;
check_condition(event, pn_link_remote_condition(pn_event_link(event)));
break;
app->finished = true;
break;
default: break;
}
}
static void usage(const char *arg0) {
fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
exit(1);
}
int main(int argc, char **argv) {
/* Default values for application and connection. */
app_data_t app = {{0}};
app.message_count = 100;
const char* urlstr = NULL;
int opt;
while((opt = getopt(argc, argv, "a:m:")) != -1) {
switch(opt) {
case 'a': urlstr = optarg; break;
case 'm': app.message_count = atoi(optarg); break;
default: usage(argv[0]); break;
}
}
if (optind < argc)
usage(argv[0]);
snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
/* Parse the URL or use default values */
pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
const char *host = url ? pn_url_get_host(url) : NULL;
const char *port = url ? pn_url_get_port(url) : "amqp";
strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
/* Create the proactor and connect */
app.proactor = pn_proactor();
pn_proactor_connect(app.proactor, pn_connection(), host, port);
if (url) pn_url_free(url);
do {
pn_event_batch_t *events = pn_proactor_wait(app.proactor);
while ((e = pn_event_batch_next(events))) {
handle(&app, e);
}
pn_proactor_done(app.proactor, events);
} while(!app.finished);
pn_proactor_free(app.proactor);
free(app.message_buffer.start);
return exit_code;
}
pn_delivery_settle
void pn_delivery_settle(pn_delivery_t *delivery)
Settle a delivery.
pn_proactor_done
void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events)
Call when done handling a batch of events.
pn_connection_remote_condition
pn_condition_t * pn_connection_remote_condition(pn_connection_t *connection)
Get the remote condition associated with the connection endpoint.
pn_proactor_wait
pn_event_batch_t * pn_proactor_wait(pn_proactor_t *proactor)
Wait for events to handle.
pn_event_t
struct pn_event_t pn_event_t
Notification of a state change in the protocol engine.
Definition: event.h:75
pn_terminus_set_address
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
Set the address of a terminus object.
pn_message_free
void pn_message_free(pn_message_t *msg)
Free a previously constructed pn_message_t.
PN_DELIVERY
A delivery has been created or updated.
Definition: event.h:254
PN_LINK_REMOTE_CLOSE
The remote endpoint has closed the link.
Definition: event.h:223
pn_condition_get_name
const char * pn_condition_get_name(pn_condition_t *condition)
Returns the name associated with the exceptional condition, or NULL if there is no conditional inform...
PN_PROACTOR_INACTIVE
The proactor becaome inactive: all listeners and connections are closed and their events processed,...
Definition: event.h:353
pn_session_open
void pn_session_open(pn_session_t *session)
Open a session.
pn_session
pn_session_t * pn_session(pn_connection_t *connection)
Factory for creating a new session on a given connection object.
pn_message_t
struct pn_message_t pn_message_t
An AMQP Message object.
Definition: message.h:51
pn_condition_get_description
const char * pn_condition_get_description(pn_condition_t *condition)
Gets the description associated with the exceptional condition.
pn_delivery_update
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
Update the disposition of a delivery.
pn_url_t
struct pn_url_t pn_url_t
A parsed URL.
Definition: url.h:42
pn_message
pn_message_t * pn_message(void)
Construct a new pn_message_t.
pn_rwbytes_t
A non-const byte buffer.
Definition: types.h:225
pn_session_close
void pn_session_close(pn_session_t *session)
Close a session.
pn_proactor_t
struct pn_proactor_t pn_proactor_t
The proactor, see pn_proactor()
Definition: proactor.h:63
pn_event_delivery
pn_delivery_t * pn_event_delivery(pn_event_t *event)
Get the delivery associated with an event.
connection_driver.h
message.h
PN_CONNECTION_INIT
The connection has been created.
Definition: event.h:113
pn_proactor
pn_proactor_t * pn_proactor(void)
Create a proactor.
pn_delivery_t
struct pn_delivery_t pn_delivery_t
An AMQP Delivery object.
Definition: types.h:395
pn_event_batch_next
pn_event_t * pn_event_batch_next(pn_event_batch_t *batch)
Experimental - Remove the next event from the batch and return it.
pn_condition_t
struct pn_condition_t pn_condition_t
An AMQP Condition object.
Definition: condition.h:64
pn_event_batch_t
struct pn_event_batch_t pn_event_batch_t
Experimental - A batch of events to handle.
Definition: event.h:544
PN_LINK_REMOTE_DETACH
The remote endpoint has detached the link.
Definition: event.h:235
pn_proactor_connect
int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, const char *host, const char *port)
Connect connection to host/port.
pn_proactor_free
void pn_proactor_free(pn_proactor_t *proactor)
Free the proactor.
PN_OK
#define PN_OK
No error.
Definition: error.h:46
pn_event_connection
pn_connection_t * pn_event_connection(pn_event_t *event)
Get the connection associated with an event.
PN_CONNECTION_REMOTE_CLOSE
The remote endpoint has closed the connection.
Definition: event.h:149
pn_message_decode
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
Decode/load message content from AMQP formatted binary data.
pn_condition_is_set
bool pn_condition_is_set(pn_condition_t *condition)
Returns true if the condition object is holding some information, i.e.
pn_connection_t
struct pn_connection_t pn_connection_t
An AMQP Connection object.
Definition: types.h:275
PN_SESSION_REMOTE_CLOSE
The remote endpoint has closed the session.
Definition: event.h:186
pn_connection_close
void pn_connection_close(pn_connection_t *connection)
Close a connection.
pn_event_session
pn_session_t * pn_event_session(pn_event_t *event)
Get the session associated with an event.
proactor.h
connection.h
pn_delivery_partial
bool pn_delivery_partial(pn_delivery_t *delivery)
Check if a delivery only has partial message data.
url.h
pn_connection_open
void pn_connection_open(pn_connection_t *connection)
Open a connection.
PN_ACCEPTED
#define PN_ACCEPTED
The PN_ACCEPTED delivery state is a terminal state indicating that the delivery was successfully proc...
Definition: disposition.h:64
pn_delivery_link
pn_link_t * pn_delivery_link(pn_delivery_t *delivery)
Get the parent link for a delivery object.
pn_connection_set_container
void pn_connection_set_container(pn_connection_t *connection, const char *container)
Set the AMQP Container name advertised by a connection object.
pn_delivery_readable
bool pn_delivery_readable(pn_delivery_t *delivery)
Check if a delivery is readable.
pn_session_remote_condition
pn_condition_t * pn_session_remote_condition(pn_session_t *session)
Get the remote condition associated with the session endpoint.
delivery.h
transport.h
session.h
pn_session_t
struct pn_session_t pn_session_t
An AMQP Session object.
Definition: types.h:286
pn_url_free
void pn_url_free(pn_url_t *url)
Free a URL.
pn_url_parse
pn_url_t * pn_url_parse(const char *url)
Parse a string URL as a pn_url_t.
pn_connection
pn_connection_t * pn_connection(void)
Factory to construct a new Connection.
PN_TRANSPORT_ERROR
Indicates that a transport error has occurred.
Definition: event.h:275
pn_message_body
pn_data_t * pn_message_body(pn_message_t *msg)
Get/set the body of a message.
pn_transport_condition
pn_condition_t * pn_transport_condition(pn_transport_t *transport)
Get additional information about the condition of the transport.
pn_event_type
pn_event_type_t pn_event_type(pn_event_t *event)
Get the type of an event.
pn_event_type_name
const char * pn_event_type_name(pn_event_type_t type)
Get a human readable name for an event type.
pn_delivery_pending
size_t pn_delivery_pending(pn_delivery_t *delivery)
Get the amount of pending message data for a delivery.
pn_session_connection
pn_connection_t * pn_session_connection(pn_session_t *session)
Get the parent connection for a session object.
pn_event_transport
pn_transport_t * pn_event_transport(pn_event_t *event)
Get the transport associated with an event.
pn_event_link
pn_link_t * pn_event_link(pn_event_t *event)
Get the link associated with an event.