activemq-cpp-3.9.5
ActiveMQSessionKernel.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
19#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
20
21#include <cms/Session.h>
23
25#include <activemq/util/Usage.h>
42
43#include <decaf/lang/Pointer.h>
48
49#include <string>
50#include <memory>
51
52namespace activemq {
53namespace core {
54
55 class ActiveMQConnection;
56 class ActiveMQConsumer;
57 class ActiveMQProducer;
58 class ActiveMQSessionExecutor;
59
60namespace kernels {
61
64
65 class SessionConfig;
66
68 private:
69
71
72 protected:
73
74 SessionConfig* config;
75
80
85
90
96
100 std::auto_ptr<ActiveMQSessionExecutor> executor;
101
106
111
116
121
126
127 private:
128
131
132 public:
133
137 const decaf::util::Properties& properties);
138
140
145 virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
146
150 virtual void start();
151
155 virtual void stop();
156
161 bool isStarted() const;
162
163 virtual bool isAutoAcknowledge() const {
164 return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
165 }
166
167 virtual bool isDupsOkAcknowledge() const {
168 return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
169 }
170
171 virtual bool isClientAcknowledge() const {
172 return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
173 }
174
175 virtual bool isIndividualAcknowledge() const {
176 return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
177 }
178
183
184 public: // Methods from ActiveMQMessageDispatcher
185
190 virtual void dispatch(const Pointer<MessageDispatch>& message);
191
192 public: // Implements Methods
193
194 virtual void close();
195
196 virtual void commit();
197
198 virtual void rollback();
199
200 virtual void recover();
201
203
205 const std::string& selector);
206
208 const std::string& selector,
209 bool noLocal);
210
212 const std::string& name,
213 const std::string& selector,
214 bool noLocal = false);
215
217
219
220 virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
221
222 virtual cms::Queue* createQueue(const std::string& queueName);
223
224 virtual cms::Topic* createTopic(const std::string& topicName);
225
227
229
231
233
234 virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
235
237
239
240 virtual cms::TextMessage* createTextMessage( const std::string& text );
241
243
245
246 virtual bool isTransacted() const;
247
248 virtual void unsubscribe(const std::string& name);
249
250 public: // ActiveMQSessionKernel specific Methods
251
280 cms::Message* message, int deliveryMode, int priority, long long timeToLive,
281 util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
282
292
301
308
315 this->checkClosed();
316 return *( this->sessionInfo );
317 }
318
325 this->checkClosed();
326 return *( this->sessionInfo->getSessionId() );
327 }
328
333 return this->connection;
334 }
335
340
346 long long getLastDeliveredSequenceId() const {
347 return this->lastDeliveredSequenceId;
348 }
349
356 void setLastDeliveredSequenceId(long long value) {
357 this->lastDeliveredSequenceId = value;
358 }
359
370
386
398
409
421
432
440 virtual void doStartTransaction();
441
448 return this->transaction;
449 }
450
456
462
469
473 void wakeup();
474
480
486
493 void doClose();
494
501 void dispose();
502
513
521
528
533
538
548
555
561 virtual int getHashCode() const;
562
573
580
586 void setSessionAsyncDispatch(bool sessionAsyncDispatch);
587
596
597 private:
598
603 long long getNextProducerSequenceId() {
604 return this->producerSequenceIds.getNextSequenceId();
605 }
606
607 // Checks for the closed state and throws if so.
608 void checkClosed() const;
609
610 // Send the Destination Creation Request to the Broker, alerting it
611 // that we've created a new Temporary Destination.
612 // @param tempDestination - The new Temporary Destination
613 void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
614
615 // Send the Destination Destruction Request to the Broker, alerting
616 // it that we've removed an existing Temporary Destination.
617 // @param tempDestination - The Temporary Destination to remove
618 void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
619
620 // Creates a new Temporary Destination name using the connection id
621 // and a rolling count.
622 // @return a unique Temporary Destination name
623 std::string createTemporaryDestinationName();
624
625 };
626
627}}}
628
629#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_ */
#define AMQCPP_API
Definition: Config.h:30
Definition: ActiveMQTempDestination.h:36
Definition: SessionId.h:51
Definition: SessionInfo.h:48
Concrete connection used for all connectors to the ActiveMQ broker.
Definition: ActiveMQConnection.h:62
Delegate dispatcher for a single session.
Definition: ActiveMQSessionExecutor.h:44
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Definition: MessageDispatchChannel.h:34
Definition: ActiveMQProducerKernel.h:44
Definition: ActiveMQSessionKernel.h:67
SessionConfig * config
Definition: ActiveMQSessionKernel.h:74
util::LongSequenceGenerator consumerIds
Next available Consumer Id.
Definition: ActiveMQSessionKernel.h:120
void send(kernels::ActiveMQProducerKernel *producer, Pointer< commands::ActiveMQDestination > destination, cms::Message *message, int deliveryMode, int priority, long long timeToLive, util::MemoryUsage *producerWindow, long long sendTimeout, cms::AsyncCallback *onComplete)
Sends a message from the Producer specified using this session's connection the message will be sent ...
virtual cms::MessageTransformer * getMessageTransformer() const
Gets the currently configured MessageTransformer for this Session.
virtual void redispatch(MessageDispatchChannel &unconsumedMessages)
Redispatches the given set of unconsumed messages to the consumers.
virtual void doStartTransaction()
Starts if not already start a Transaction for this Session.
void oneway(Pointer< commands::Command > command)
Sends a Command to the broker without requesting any Response be returned.
virtual cms::TextMessage * createTextMessage()
Creates a new TextMessage.
virtual cms::BytesMessage * createBytesMessage(const unsigned char *bytes, int bytesSize)
Creates a BytesMessage and sets the payload to the passed value.
long long lastDeliveredSequenceId
Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:125
virtual bool isAutoAcknowledge() const
Definition: ActiveMQSessionKernel.h:163
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:356
Pointer< commands::ConsumerId > getNextConsumerId()
Get the Next available Consumer Id.
void fire(const exceptions::ActiveMQException &ex)
Fires the given exception to the exception listener of the connection.
virtual void commit()
Commits all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination)
Creates a MessageConsumer for the specified destination.
virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const
Returns the acknowledgment mode of the session.
void acknowledge()
Request that the Session inform all its consumers to Acknowledge all Message's that have been receive...
std::auto_ptr< ActiveMQSessionExecutor > executor
Sends incoming messages to the registered consumers.
Definition: ActiveMQSessionKernel.h:100
virtual cms::StreamMessage * createStreamMessage()
Creates a new StreamMessage.
virtual int getHashCode() const
Returns a Hash Code for this Session based on its SessionId.
const commands::SessionInfo & getSessionInfo() const
Gets the Session Information object for this session, if the session is closed than this method throw...
Definition: ActiveMQSessionKernel.h:314
void sendAck(decaf::lang::Pointer< commands::MessageAck > ack, bool async=false)
Sends the given MessageAck command to the Broker either via Synchronous call or an Asynchronous call ...
virtual void recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged ...
virtual void stop()
Starts asynchronous message delivery.
bool isStarted() const
Indicates whether or not the session is currently in the started state.
virtual bool isDupsOkAcknowledge() const
Definition: ActiveMQSessionKernel.h:167
Pointer< threads::Scheduler > getScheduler() const
Gets a Pointer to this Session's Scheduler instance.
virtual cms::TemporaryQueue * createTemporaryQueue()
Creates a TemporaryQueue object.
virtual cms::Topic * createTopic(const std::string &topicName)
Creates a topic identity given a Queue name.
void checkMessageListener() const
Checks if any MessageConsumer owned by this Session has a set MessageListener and throws an exception...
virtual cms::MessageConsumer * createDurableConsumer(const cms::Topic *destination, const std::string &name, const std::string &selector, bool noLocal=false)
Creates a durable subscriber to the specified topic, using a Message selector.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue, const std::string &selector)
Creates a new QueueBrowser to peek at Messages on the given Queue.
void setSessionAsyncDispatch(bool sessionAsyncDispatch)
Configures asynchronous message dispatch to this session's consumers.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector, bool noLocal)
Creates a MessageConsumer for the specified destination, using a message selector.
void dispose()
Cleans up the Session object's resources without attempting to send the Remove command to the broker,...
virtual cms::TemporaryTopic * createTemporaryTopic()
Creates a TemporaryTopic object.
AtomicBoolean closed
Indicates that this connection has been closed, it is no longer usable after this becomes true.
Definition: ActiveMQSessionKernel.h:95
ActiveMQConnection * getConnection() const
Gets the ActiveMQConnection that is associated with this session.
Definition: ActiveMQSessionKernel.h:332
decaf::util::ArrayList< Pointer< ActiveMQConsumerKernel > > getConsumers() const
Returns an ArrayList containing a copy of all consumers currently in use on this Session.
Pointer< commands::ProducerId > getNextProducerId()
Get the Next available Producer Id.
void removeConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Dispose of a MessageConsumer from this session.
void addConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Adds a MessageConsumerKernel to this session registering it with the Connection and store a reference...
Pointer< ActiveMQProducerKernel > lookupProducerKernel(Pointer< commands::ProducerId > id)
virtual bool isTransacted() const
Gets if the Sessions is a Transacted Session.
virtual void start()
Stops asynchronous message delivery.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue)
Creates a new QueueBrowser to peek at Messages on the given Queue.
cms::ExceptionListener * getExceptionListener()
This method gets any registered exception listener of this sessions connection and returns it.
virtual bool isClientAcknowledge() const
Definition: ActiveMQSessionKernel.h:171
virtual cms::MessageProducer * createProducer(const cms::Destination *destination)
Creates a MessageProducer to send messages to the specified destination.
virtual cms::Queue * createQueue(const std::string &queueName)
Creates a queue identity given a Queue name.
virtual void dispatch(const Pointer< MessageDispatch > &message)
Dispatches a message to a particular consumer.
void clearMessagesInProgress(decaf::lang::Pointer< decaf::util::concurrent::atomic::AtomicInteger > transportsInterrupted)
Request that this Session inform all of its consumers to clear all messages that are currently in pro...
util::LongSequenceGenerator producerSequenceIds
Next available Producer Sequence Id.
Definition: ActiveMQSessionKernel.h:115
virtual void setMessageTransformer(cms::MessageTransformer *transformer)
Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer objec...
void close(Pointer< commands::ConsumerId > id)
Close the specified consumer if present in this Session.
virtual cms::Message * createMessage()
Creates a new Message.
virtual void rollback()
Rolls back all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector)
Creates a MessageConsumer for the specified destination, using a message selector.
void addProducer(Pointer< ActiveMQProducerKernel > producer)
Adds a MessageProducer to this session registering it with the Connection and store a reference to it...
util::LongSequenceGenerator producerIds
Next available Producer Id.
Definition: ActiveMQSessionKernel.h:110
virtual void unsubscribe(const std::string &name)
Unsubscribes a durable subscription that has been created by a client.
bool isInUse(Pointer< commands::ActiveMQDestination > destination)
Checks if the given destination is currently in use by any consumers in this Session.
const commands::SessionId & getSessionId() const
Gets the Session Id object for this session, if the session is closed than this method throws an exce...
Definition: ActiveMQSessionKernel.h:324
void doClose()
Performs the actual Session close operations.
virtual cms::MapMessage * createMapMessage()
Creates a new MapMessage.
ActiveMQConnection * connection
Connection.
Definition: ActiveMQSessionKernel.h:89
Pointer< commands::Response > syncRequest(Pointer< commands::Command > command, unsigned int timeout=0)
Sends a synchronous request and returns the response from the broker.
Pointer< commands::SessionInfo > sessionInfo
SessionInfo for this Session.
Definition: ActiveMQSessionKernel.h:79
Pointer< ActiveMQTransactionContext > transaction
Transaction Management object.
Definition: ActiveMQSessionKernel.h:84
void removeProducer(Pointer< ActiveMQProducerKernel > producer)
Dispose of a MessageProducer from this session.
virtual cms::TextMessage * createTextMessage(const std::string &text)
Creates a new TextMessage and set the text to the value given.
void deliverAcks()
Request that this Session inform all of its consumers to deliver their pending acks.
Pointer< ActiveMQTransactionContext > getTransactionContext()
Gets the Pointer to this Session's TransactionContext.
Definition: ActiveMQSessionKernel.h:447
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:346
cms::Session::AcknowledgeMode ackMode
This Sessions Acknowledgment mode.
Definition: ActiveMQSessionKernel.h:105
void wakeup()
Causes the Session to wakeup its executer and ensure all messages are dispatched.
Pointer< ActiveMQConsumerKernel > lookupConsumerKernel(Pointer< commands::ConsumerId > id)
virtual cms::BytesMessage * createBytesMessage()
Creates a BytesMessage.
ActiveMQSessionKernel(ActiveMQConnection *connection, const Pointer< commands::SessionId > &id, cms::Session::AcknowledgeMode ackMode, const decaf::util::Properties &properties)
void setPrefetchSize(Pointer< commands::ConsumerId > id, int prefetch)
Set the prefetch level for the given consumer if it exists in this Session to the value specified.
bool isSessionAsyncDispatch() const
Returns true if this session is dispatching messages to its consumers asynchronously.
bool iterateConsumers()
Gives each consumer a chance to dispatch messages that have been enqueued by calling each consumers i...
virtual void close()
Closes this session as well as any active child consumers or producers.
virtual bool isIndividualAcknowledge() const
Definition: ActiveMQSessionKernel.h:175
Definition: ActiveMQException.h:35
This class is used to generate a sequence of long long values that are incremented each time a new va...
Definition: LongSequenceGenerator.h:32
Definition: MemoryUsage.h:28
Asynchronous event interface for CMS asynchronous operations.
Definition: AsyncCallback.h:37
A BytesMessage object is used to send a message containing a stream of unsigned bytes.
Definition: BytesMessage.h:66
A Destination object encapsulates a provider-specific address.
Definition: Destination.h:39
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition: ExceptionListener.h:37
A MapMessage object is used to send a set of name-value pairs.
Definition: MapMessage.h:71
A client uses a MessageConsumer to received messages from a destination.
Definition: MessageConsumer.h:63
Root of all messages.
Definition: Message.h:88
A client uses a MessageProducer object to send messages to a Destination.
Definition: MessageProducer.h:60
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition: MessageTransformer.h:37
This class implements in interface for browsing the messages in a Queue without removing them.
Definition: QueueBrowser.h:53
An interface encapsulating a provider-specific queue name.
Definition: Queue.h:37
A Session object is a single-threaded context for producing and consuming messages.
Definition: Session.h:105
AcknowledgeMode
Definition: Session.h:108
@ INDIVIDUAL_ACKNOWLEDGE
Message will be acknowledged individually.
Definition: Session.h:146
@ AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition: Session.h:117
@ DUPS_OK_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition: Session.h:128
@ CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed message by calling the message's ac...
Definition: Session.h:134
Interface for a StreamMessage.
Definition: StreamMessage.h:61
Defines a Temporary Queue based Destination.
Definition: TemporaryQueue.h:39
Defines a Temporary Topic based Destination.
Definition: TemporaryTopic.h:39
Interface for a text message.
Definition: TextMessage.h:41
An interface encapsulating a provider-specific topic name.
Definition: Topic.h:36
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Definition: ArrayList.h:39
Java-like properties class for mapping string names to string values.
Definition: Properties.h:53
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition: CachedConsumer.h:24