00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
00019 #define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
00020
00021 #include <cms/Session.h>
00022 #include <cms/ExceptionListener.h>
00023
00024 #include <activemq/util/Config.h>
00025 #include <activemq/util/Usage.h>
00026 #include <activemq/exceptions/ActiveMQException.h>
00027 #include <activemq/core/ActiveMQTransactionContext.h>
00028 #include <activemq/core/kernels/ActiveMQConsumerKernel.h>
00029 #include <activemq/core/kernels/ActiveMQProducerKernel.h>
00030 #include <activemq/commands/ActiveMQTempDestination.h>
00031 #include <activemq/commands/Response.h>
00032 #include <activemq/commands/MessageAck.h>
00033 #include <activemq/commands/SessionInfo.h>
00034 #include <activemq/commands/ConsumerInfo.h>
00035 #include <activemq/commands/ConsumerId.h>
00036 #include <activemq/commands/ProducerId.h>
00037 #include <activemq/commands/TransactionId.h>
00038 #include <activemq/core/Dispatcher.h>
00039 #include <activemq/core/MessageDispatchChannel.h>
00040 #include <activemq/util/LongSequenceGenerator.h>
00041 #include <activemq/threads/Scheduler.h>
00042
00043 #include <decaf/lang/Pointer.h>
00044 #include <decaf/util/Properties.h>
00045 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
00046
00047 #include <string>
00048 #include <memory>
00049
00050 namespace activemq {
00051 namespace core {
00052
00053 class ActiveMQConnection;
00054 class ActiveMQConsumer;
00055 class ActiveMQProducer;
00056 class ActiveMQSessionExecutor;
00057
00058 namespace kernels {
00059
00060 using decaf::lang::Pointer;
00061 using decaf::util::concurrent::atomic::AtomicBoolean;
00062
00063 class SessionConfig;
00064
00065 class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher {
00066 private:
00067
00068 friend class activemq::core::ActiveMQSessionExecutor;
00069
00070 protected:
00071
00072 SessionConfig* config;
00073
00077 Pointer<commands::SessionInfo> sessionInfo;
00078
00082 Pointer<ActiveMQTransactionContext> transaction;
00083
00087 ActiveMQConnection* connection;
00088
00093 AtomicBoolean closed;
00094
00098 std::auto_ptr<ActiveMQSessionExecutor> executor;
00099
00103 cms::Session::AcknowledgeMode ackMode;
00104
00108 util::LongSequenceGenerator producerIds;
00109
00113 util::LongSequenceGenerator producerSequenceIds;
00114
00118 util::LongSequenceGenerator consumerIds;
00119
00123 long long lastDeliveredSequenceId;
00124
00125 private:
00126
00127 ActiveMQSessionKernel(const ActiveMQSessionKernel&);
00128 ActiveMQSessionKernel& operator=(const ActiveMQSessionKernel&);
00129
00130 public:
00131
00132 ActiveMQSessionKernel(ActiveMQConnection* connection,
00133 const Pointer<commands::SessionId>& id,
00134 cms::Session::AcknowledgeMode ackMode,
00135 const decaf::util::Properties& properties);
00136
00137 virtual ~ActiveMQSessionKernel();
00138
00143 virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
00144
00148 virtual void start();
00149
00153 virtual void stop();
00154
00159 bool isStarted() const;
00160
00161 virtual bool isAutoAcknowledge() const {
00162 return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
00163 }
00164
00165 virtual bool isDupsOkAcknowledge() const {
00166 return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
00167 }
00168
00169 virtual bool isClientAcknowledge() const {
00170 return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
00171 }
00172
00173 virtual bool isIndividualAcknowledge() const {
00174 return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
00175 }
00176
00180 void fire(const exceptions::ActiveMQException& ex);
00181
00182 public:
00183
00188 virtual void dispatch(const Pointer<MessageDispatch>& message);
00189
00190 public:
00191
00192 virtual void close();
00193
00194 virtual void commit();
00195
00196 virtual void rollback();
00197
00198 virtual void recover();
00199
00200 virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination);
00201
00202 virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
00203 const std::string& selector);
00204
00205 virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
00206 const std::string& selector,
00207 bool noLocal);
00208
00209 virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination,
00210 const std::string& name,
00211 const std::string& selector,
00212 bool noLocal = false);
00213
00214 virtual cms::MessageProducer* createProducer(const cms::Destination* destination);
00215
00216 virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue);
00217
00218 virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
00219
00220 virtual cms::Queue* createQueue(const std::string& queueName);
00221
00222 virtual cms::Topic* createTopic(const std::string& topicName);
00223
00224 virtual cms::TemporaryQueue* createTemporaryQueue();
00225
00226 virtual cms::TemporaryTopic* createTemporaryTopic();
00227
00228 virtual cms::Message* createMessage();
00229
00230 virtual cms::BytesMessage* createBytesMessage();
00231
00232 virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
00233
00234 virtual cms::StreamMessage* createStreamMessage();
00235
00236 virtual cms::TextMessage* createTextMessage();
00237
00238 virtual cms::TextMessage* createTextMessage( const std::string& text );
00239
00240 virtual cms::MapMessage* createMapMessage();
00241
00242 virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
00243
00244 virtual bool isTransacted() const;
00245
00246 virtual void unsubscribe(const std::string& name);
00247
00248 public:
00249
00277 void send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
00278 cms::Message* message, int deliveryMode, int priority, long long timeToLive,
00279 util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
00280
00289 cms::ExceptionListener* getExceptionListener();
00290
00298 virtual void setMessageTransformer(cms::MessageTransformer* transformer);
00299
00305 virtual cms::MessageTransformer* getMessageTransformer() const;
00306
00312 const commands::SessionInfo& getSessionInfo() const {
00313 this->checkClosed();
00314 return *( this->sessionInfo );
00315 }
00316
00322 const commands::SessionId& getSessionId() const {
00323 this->checkClosed();
00324 return *( this->sessionInfo->getSessionId() );
00325 }
00326
00330 ActiveMQConnection* getConnection() const {
00331 return this->connection;
00332 }
00333
00337 Pointer<threads::Scheduler> getScheduler() const;
00338
00344 long long getLastDeliveredSequenceId() const {
00345 return this->lastDeliveredSequenceId;
00346 }
00347
00354 void setLastDeliveredSequenceId(long long value) {
00355 this->lastDeliveredSequenceId = value;
00356 }
00357
00367 void oneway(Pointer<commands::Command> command);
00368
00383 Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
00384
00395 void addConsumer(Pointer<ActiveMQConsumerKernel> consumer);
00396
00406 void removeConsumer(Pointer<ActiveMQConsumerKernel> consumer);
00407
00418 void addProducer(Pointer<ActiveMQProducerKernel> producer);
00419
00429 void removeProducer(Pointer<ActiveMQProducerKernel> producer);
00430
00438 virtual void doStartTransaction();
00439
00445 Pointer<ActiveMQTransactionContext> getTransactionContext() {
00446 return this->transaction;
00447 }
00448
00453 void acknowledge();
00454
00459 void deliverAcks();
00460
00465 void clearMessagesInProgress();
00466
00470 void wakeup();
00471
00476 Pointer<commands::ConsumerId> getNextConsumerId();
00477
00482 Pointer<commands::ProducerId> getNextProducerId();
00483
00490 void doClose();
00491
00498 void dispose();
00499
00509 void setPrefetchSize(Pointer<commands::ConsumerId> id, int prefetch);
00510
00517 void close(Pointer<commands::ConsumerId> id);
00518
00524 bool isInUse(Pointer<commands::ActiveMQDestination> destination);
00525
00529 Pointer<ActiveMQProducerKernel> lookupProducerKernel(Pointer<commands::ProducerId> id);
00530
00534 Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId> id);
00535
00544 bool iterateConsumers();
00545
00551 void checkMessageListener() const;
00552
00558 virtual int getHashCode() const;
00559
00569 void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
00570
00571 private:
00572
00577 long long getNextProducerSequenceId() {
00578 return this->producerSequenceIds.getNextSequenceId();
00579 }
00580
00581
00582 void checkClosed() const;
00583
00584
00585
00586
00587 void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
00588
00589
00590
00591
00592 void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
00593
00594
00595
00596
00597 std::string createTemporaryDestinationName();
00598
00599 };
00600
00601 }}}
00602
00603 #endif