18#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
19#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
55 class ActiveMQConnection;
56 class ActiveMQConsumer;
57 class ActiveMQProducer;
58 class ActiveMQSessionExecutor;
205 const std::string& selector);
208 const std::string& selector,
212 const std::string& name,
213 const std::string& selector,
214 bool noLocal =
false);
280 cms::Message* message,
int deliveryMode,
int priority,
long long timeToLive,
316 return *( this->sessionInfo );
326 return *( this->sessionInfo->getSessionId() );
333 return this->connection;
347 return this->lastDeliveredSequenceId;
357 this->lastDeliveredSequenceId = value;
448 return this->transaction;
603 long long getNextProducerSequenceId() {
608 void checkClosed()
const;
623 std::string createTemporaryDestinationName();
#define AMQCPP_API
Definition: Config.h:30
Definition: ActiveMQTempDestination.h:36
Definition: SessionId.h:51
Definition: SessionInfo.h:48
Concrete connection used for all connectors to the ActiveMQ broker.
Definition: ActiveMQConnection.h:62
Delegate dispatcher for a single session.
Definition: ActiveMQSessionExecutor.h:44
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Definition: MessageDispatchChannel.h:34
Definition: ActiveMQProducerKernel.h:44
Definition: ActiveMQSessionKernel.h:67
SessionConfig * config
Definition: ActiveMQSessionKernel.h:74
util::LongSequenceGenerator consumerIds
Next available Consumer Id.
Definition: ActiveMQSessionKernel.h:120
void send(kernels::ActiveMQProducerKernel *producer, Pointer< commands::ActiveMQDestination > destination, cms::Message *message, int deliveryMode, int priority, long long timeToLive, util::MemoryUsage *producerWindow, long long sendTimeout, cms::AsyncCallback *onComplete)
Sends a message from the Producer specified using this session's connection the message will be sent ...
virtual cms::MessageTransformer * getMessageTransformer() const
Gets the currently configured MessageTransformer for this Session.
virtual void redispatch(MessageDispatchChannel &unconsumedMessages)
Redispatches the given set of unconsumed messages to the consumers.
virtual void doStartTransaction()
Starts if not already start a Transaction for this Session.
void oneway(Pointer< commands::Command > command)
Sends a Command to the broker without requesting any Response be returned.
virtual cms::TextMessage * createTextMessage()
Creates a new TextMessage.
virtual cms::BytesMessage * createBytesMessage(const unsigned char *bytes, int bytesSize)
Creates a BytesMessage and sets the payload to the passed value.
long long lastDeliveredSequenceId
Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:125
virtual bool isAutoAcknowledge() const
Definition: ActiveMQSessionKernel.h:163
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:356
Pointer< commands::ConsumerId > getNextConsumerId()
Get the Next available Consumer Id.
void fire(const exceptions::ActiveMQException &ex)
Fires the given exception to the exception listener of the connection.
virtual void commit()
Commits all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination)
Creates a MessageConsumer for the specified destination.
virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const
Returns the acknowledgment mode of the session.
void acknowledge()
Request that the Session inform all its consumers to Acknowledge all Message's that have been receive...
std::auto_ptr< ActiveMQSessionExecutor > executor
Sends incoming messages to the registered consumers.
Definition: ActiveMQSessionKernel.h:100
virtual cms::StreamMessage * createStreamMessage()
Creates a new StreamMessage.
virtual int getHashCode() const
Returns a Hash Code for this Session based on its SessionId.
const commands::SessionInfo & getSessionInfo() const
Gets the Session Information object for this session, if the session is closed than this method throw...
Definition: ActiveMQSessionKernel.h:314
void sendAck(decaf::lang::Pointer< commands::MessageAck > ack, bool async=false)
Sends the given MessageAck command to the Broker either via Synchronous call or an Asynchronous call ...
virtual void recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged ...
virtual void stop()
Starts asynchronous message delivery.
bool isStarted() const
Indicates whether or not the session is currently in the started state.
virtual bool isDupsOkAcknowledge() const
Definition: ActiveMQSessionKernel.h:167
Pointer< threads::Scheduler > getScheduler() const
Gets a Pointer to this Session's Scheduler instance.
virtual cms::TemporaryQueue * createTemporaryQueue()
Creates a TemporaryQueue object.
virtual cms::Topic * createTopic(const std::string &topicName)
Creates a topic identity given a Queue name.
void checkMessageListener() const
Checks if any MessageConsumer owned by this Session has a set MessageListener and throws an exception...
virtual cms::MessageConsumer * createDurableConsumer(const cms::Topic *destination, const std::string &name, const std::string &selector, bool noLocal=false)
Creates a durable subscriber to the specified topic, using a Message selector.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue, const std::string &selector)
Creates a new QueueBrowser to peek at Messages on the given Queue.
void setSessionAsyncDispatch(bool sessionAsyncDispatch)
Configures asynchronous message dispatch to this session's consumers.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector, bool noLocal)
Creates a MessageConsumer for the specified destination, using a message selector.
void dispose()
Cleans up the Session object's resources without attempting to send the Remove command to the broker,...
virtual cms::TemporaryTopic * createTemporaryTopic()
Creates a TemporaryTopic object.
AtomicBoolean closed
Indicates that this connection has been closed, it is no longer usable after this becomes true.
Definition: ActiveMQSessionKernel.h:95
ActiveMQConnection * getConnection() const
Gets the ActiveMQConnection that is associated with this session.
Definition: ActiveMQSessionKernel.h:332
decaf::util::ArrayList< Pointer< ActiveMQConsumerKernel > > getConsumers() const
Returns an ArrayList containing a copy of all consumers currently in use on this Session.
Pointer< commands::ProducerId > getNextProducerId()
Get the Next available Producer Id.
void removeConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Dispose of a MessageConsumer from this session.
void addConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Adds a MessageConsumerKernel to this session registering it with the Connection and store a reference...
Pointer< ActiveMQProducerKernel > lookupProducerKernel(Pointer< commands::ProducerId > id)
virtual bool isTransacted() const
Gets if the Sessions is a Transacted Session.
virtual void start()
Stops asynchronous message delivery.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue)
Creates a new QueueBrowser to peek at Messages on the given Queue.
cms::ExceptionListener * getExceptionListener()
This method gets any registered exception listener of this sessions connection and returns it.
virtual bool isClientAcknowledge() const
Definition: ActiveMQSessionKernel.h:171
virtual cms::MessageProducer * createProducer(const cms::Destination *destination)
Creates a MessageProducer to send messages to the specified destination.
virtual cms::Queue * createQueue(const std::string &queueName)
Creates a queue identity given a Queue name.
virtual void dispatch(const Pointer< MessageDispatch > &message)
Dispatches a message to a particular consumer.
void clearMessagesInProgress(decaf::lang::Pointer< decaf::util::concurrent::atomic::AtomicInteger > transportsInterrupted)
Request that this Session inform all of its consumers to clear all messages that are currently in pro...
util::LongSequenceGenerator producerSequenceIds
Next available Producer Sequence Id.
Definition: ActiveMQSessionKernel.h:115
virtual void setMessageTransformer(cms::MessageTransformer *transformer)
Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer objec...
void close(Pointer< commands::ConsumerId > id)
Close the specified consumer if present in this Session.
virtual cms::Message * createMessage()
Creates a new Message.
virtual void rollback()
Rolls back all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector)
Creates a MessageConsumer for the specified destination, using a message selector.
void addProducer(Pointer< ActiveMQProducerKernel > producer)
Adds a MessageProducer to this session registering it with the Connection and store a reference to it...
virtual ~ActiveMQSessionKernel()
util::LongSequenceGenerator producerIds
Next available Producer Id.
Definition: ActiveMQSessionKernel.h:110
virtual void unsubscribe(const std::string &name)
Unsubscribes a durable subscription that has been created by a client.
bool isInUse(Pointer< commands::ActiveMQDestination > destination)
Checks if the given destination is currently in use by any consumers in this Session.
const commands::SessionId & getSessionId() const
Gets the Session Id object for this session, if the session is closed than this method throws an exce...
Definition: ActiveMQSessionKernel.h:324
void doClose()
Performs the actual Session close operations.
virtual cms::MapMessage * createMapMessage()
Creates a new MapMessage.
ActiveMQConnection * connection
Connection.
Definition: ActiveMQSessionKernel.h:89
Pointer< commands::Response > syncRequest(Pointer< commands::Command > command, unsigned int timeout=0)
Sends a synchronous request and returns the response from the broker.
Pointer< commands::SessionInfo > sessionInfo
SessionInfo for this Session.
Definition: ActiveMQSessionKernel.h:79
Pointer< ActiveMQTransactionContext > transaction
Transaction Management object.
Definition: ActiveMQSessionKernel.h:84
void removeProducer(Pointer< ActiveMQProducerKernel > producer)
Dispose of a MessageProducer from this session.
virtual cms::TextMessage * createTextMessage(const std::string &text)
Creates a new TextMessage and set the text to the value given.
void deliverAcks()
Request that this Session inform all of its consumers to deliver their pending acks.
Pointer< ActiveMQTransactionContext > getTransactionContext()
Gets the Pointer to this Session's TransactionContext.
Definition: ActiveMQSessionKernel.h:447
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:346
cms::Session::AcknowledgeMode ackMode
This Sessions Acknowledgment mode.
Definition: ActiveMQSessionKernel.h:105
void wakeup()
Causes the Session to wakeup its executer and ensure all messages are dispatched.
Pointer< ActiveMQConsumerKernel > lookupConsumerKernel(Pointer< commands::ConsumerId > id)
virtual cms::BytesMessage * createBytesMessage()
Creates a BytesMessage.
ActiveMQSessionKernel(ActiveMQConnection *connection, const Pointer< commands::SessionId > &id, cms::Session::AcknowledgeMode ackMode, const decaf::util::Properties &properties)
void setPrefetchSize(Pointer< commands::ConsumerId > id, int prefetch)
Set the prefetch level for the given consumer if it exists in this Session to the value specified.
bool isSessionAsyncDispatch() const
Returns true if this session is dispatching messages to its consumers asynchronously.
bool iterateConsumers()
Gives each consumer a chance to dispatch messages that have been enqueued by calling each consumers i...
virtual void close()
Closes this session as well as any active child consumers or producers.
virtual bool isIndividualAcknowledge() const
Definition: ActiveMQSessionKernel.h:175
Definition: ActiveMQException.h:35
This class is used to generate a sequence of long long values that are incremented each time a new va...
Definition: LongSequenceGenerator.h:32
long long getNextSequenceId()
Definition: MemoryUsage.h:28
Asynchronous event interface for CMS asynchronous operations.
Definition: AsyncCallback.h:37
A BytesMessage object is used to send a message containing a stream of unsigned bytes.
Definition: BytesMessage.h:66
A Destination object encapsulates a provider-specific address.
Definition: Destination.h:39
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition: ExceptionListener.h:37
A MapMessage object is used to send a set of name-value pairs.
Definition: MapMessage.h:71
A client uses a MessageConsumer to received messages from a destination.
Definition: MessageConsumer.h:63
Root of all messages.
Definition: Message.h:88
A client uses a MessageProducer object to send messages to a Destination.
Definition: MessageProducer.h:60
This class implements in interface for browsing the messages in a Queue without removing them.
Definition: QueueBrowser.h:53
An interface encapsulating a provider-specific queue name.
Definition: Queue.h:37
A Session object is a single-threaded context for producing and consuming messages.
Definition: Session.h:105
AcknowledgeMode
Definition: Session.h:108
@ INDIVIDUAL_ACKNOWLEDGE
Message will be acknowledged individually.
Definition: Session.h:146
@ AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition: Session.h:117
@ DUPS_OK_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition: Session.h:128
@ CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed message by calling the message's ac...
Definition: Session.h:134
Interface for a StreamMessage.
Definition: StreamMessage.h:61
Defines a Temporary Queue based Destination.
Definition: TemporaryQueue.h:39
Defines a Temporary Topic based Destination.
Definition: TemporaryTopic.h:39
Interface for a text message.
Definition: TextMessage.h:41
An interface encapsulating a provider-specific topic name.
Definition: Topic.h:36
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Definition: ArrayList.h:39
Java-like properties class for mapping string names to string values.
Definition: Properties.h:53
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition: CachedConsumer.h:24