00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _ACTIVEMQ_CMSUTIL_CMSTEMPLATE_H_
00019 #define _ACTIVEMQ_CMSUTIL_CMSTEMPLATE_H_
00020
00021 #include <activemq/util/Config.h>
00022 #include <activemq/cmsutil/CmsDestinationAccessor.h>
00023 #include <activemq/cmsutil/SessionCallback.h>
00024 #include <activemq/cmsutil/ProducerCallback.h>
00025 #include <activemq/cmsutil/SessionPool.h>
00026 #include <cms/ConnectionFactory.h>
00027 #include <cms/DeliveryMode.h>
00028 #include <string>
00029
00030 namespace activemq {
00031 namespace cmsutil {
00032
00033
00034 class MessageCreator;
00035
00060 class AMQCPP_API CmsTemplate : public CmsDestinationAccessor {
00061 public:
00062
00067 static const long long RECEIVE_TIMEOUT_NO_WAIT;
00068
00072 static const long long RECEIVE_TIMEOUT_INDEFINITE_WAIT;
00073
00077 static const int DEFAULT_PRIORITY;
00078
00082 static const long long DEFAULT_TIME_TO_LIVE;
00083
00084 public:
00085
00089 class ProducerExecutor;
00090 friend class ProducerExecutor;
00091 class ProducerExecutor : public SessionCallback {
00092 protected:
00093
00094 ProducerCallback* action;
00095 CmsTemplate* parent;
00096 cms::Destination* destination;
00097
00098 private:
00099
00100 ProducerExecutor(const ProducerExecutor&);
00101 ProducerExecutor& operator=(const ProducerExecutor&);
00102
00103 public:
00104
00105 ProducerExecutor(ProducerCallback* action,
00106 CmsTemplate* parent,
00107 cms::Destination* destination)
00108 : SessionCallback(), action(action), parent(parent), destination(destination) {
00109 }
00110
00111 virtual ~ProducerExecutor() {}
00112
00113 virtual void doInCms(cms::Session* session);
00114
00115 virtual cms::Destination* getDestination(cms::Session* session AMQCPP_UNUSED) {
00116 return destination;
00117 }
00118 };
00119
00123 class ResolveProducerExecutor;
00124 friend class ResolveProducerExecutor;
00125 class ResolveProducerExecutor : public ProducerExecutor {
00126 private:
00127
00128 std::string destinationName;
00129
00130 private:
00131
00132 ResolveProducerExecutor(const ResolveProducerExecutor&);
00133 ResolveProducerExecutor& operator=(const ResolveProducerExecutor&);
00134
00135 public:
00136
00137 ResolveProducerExecutor(ProducerCallback* action,
00138 CmsTemplate* parent,
00139 const std::string& destinationName)
00140 : ProducerExecutor(action, parent, NULL), destinationName(destinationName) {
00141 }
00142
00143 virtual ~ResolveProducerExecutor() {}
00144
00145 virtual cms::Destination* getDestination(cms::Session* session);
00146 };
00147
00151 class SendExecutor;
00152 friend class SendExecutor;
00153 class SendExecutor : public ProducerCallback {
00154 private:
00155
00156 MessageCreator* messageCreator;
00157 CmsTemplate* parent;
00158
00159 private:
00160
00161 SendExecutor(const SendExecutor&);
00162 SendExecutor& operator=(const SendExecutor&);
00163
00164 public:
00165
00166 SendExecutor(MessageCreator* messageCreator, CmsTemplate* parent) :
00167 ProducerCallback(), messageCreator(messageCreator), parent(parent) {
00168 }
00169
00170 virtual ~SendExecutor() {
00171 }
00172
00173 virtual void doInCms(cms::Session* session, cms::MessageProducer* producer) {
00174 parent->doSend(session, producer, messageCreator);
00175 }
00176 };
00177
00181 class ReceiveExecutor;
00182 friend class ReceiveExecutor;
00183 class ReceiveExecutor : public SessionCallback {
00184 protected:
00185
00186 cms::Destination* destination;
00187 std::string selector;
00188 bool noLocal;
00189 cms::Message* message;
00190 CmsTemplate* parent;
00191
00192 private:
00193
00194 ReceiveExecutor(const ReceiveExecutor&);
00195 ReceiveExecutor& operator=(const ReceiveExecutor&);
00196
00197 public:
00198
00199 ReceiveExecutor(CmsTemplate* parent, cms::Destination* destination,
00200 const std::string& selector, bool noLocal) :
00201 SessionCallback(), destination(destination), selector(selector), noLocal(noLocal), message(NULL), parent(parent) {
00202 }
00203
00204 virtual ~ReceiveExecutor() {}
00205
00206 virtual void doInCms(cms::Session* session);
00207
00208 virtual cms::Destination* getDestination(cms::Session* session AMQCPP_UNUSED) {
00209 return destination;
00210 }
00211
00212 cms::Message* getMessage() {
00213 return message;
00214 }
00215 };
00216
00220 class ResolveReceiveExecutor;
00221 friend class ResolveReceiveExecutor;
00222 class ResolveReceiveExecutor : public ReceiveExecutor {
00223 private:
00224
00225 std::string destinationName;
00226
00227 private:
00228
00229 ResolveReceiveExecutor(const ResolveReceiveExecutor&);
00230 ResolveReceiveExecutor& operator=(const ResolveReceiveExecutor&);
00231
00232 public:
00233
00234 ResolveReceiveExecutor(CmsTemplate* parent, const std::string& selector,
00235 bool noLocal, const std::string& destinationName) :
00236 ReceiveExecutor(parent, NULL, selector, noLocal), destinationName(destinationName) {
00237 }
00238
00239 virtual ~ResolveReceiveExecutor() {}
00240
00241 virtual cms::Destination* getDestination(cms::Session* session);
00242 };
00243
00244 private:
00245
00246 static const int NUM_SESSION_POOLS = (int)cms::Session::SESSION_TRANSACTED + 1;
00247
00248 cms::Connection* connection;
00249
00250 SessionPool* sessionPools[NUM_SESSION_POOLS];
00251
00252 cms::Destination* defaultDestination;
00253
00254 std::string defaultDestinationName;
00255
00256 bool messageIdEnabled;
00257
00258 bool messageTimestampEnabled;
00259
00260 bool noLocal;
00261
00262 long long receiveTimeout;
00263
00264 bool explicitQosEnabled;
00265
00266 int deliveryMode;
00267
00268 int priority;
00269
00270 long long timeToLive;
00271
00272 bool initialized;
00273
00274 private:
00275
00276 CmsTemplate(const CmsTemplate&);
00277 CmsTemplate& operator=(const CmsTemplate&);
00278
00279 public:
00280
00281 CmsTemplate();
00282 CmsTemplate(cms::ConnectionFactory* connectionFactory);
00283
00284 virtual ~CmsTemplate();
00285
00295 virtual void setDefaultDestination(cms::Destination* defaultDestination) {
00296 this->defaultDestination = defaultDestination;
00297 }
00298
00303 virtual const cms::Destination* getDefaultDestination() const {
00304 return this->defaultDestination;
00305 }
00306
00311 virtual cms::Destination* getDefaultDestination() {
00312 return this->defaultDestination;
00313 }
00314
00324 virtual void setDefaultDestinationName(const std::string& defaultDestinationName) {
00325 if (defaultDestinationName != this->defaultDestinationName) {
00326 this->defaultDestination = NULL;
00327 this->defaultDestinationName = defaultDestinationName;
00328 }
00329 }
00330
00338 virtual const std::string getDefaultDestinationName() const {
00339 return this->defaultDestinationName;
00340 }
00341
00349 virtual void setPubSubDomain(bool pubSubDomain) {
00350 if (pubSubDomain != isPubSubDomain()) {
00351 this->defaultDestination = NULL;
00352 CmsDestinationAccessor::setPubSubDomain(pubSubDomain);
00353 }
00354 }
00355
00356 virtual void setMessageIdEnabled(bool messageIdEnabled) {
00357 this->messageIdEnabled = messageIdEnabled;
00358 }
00359
00360 virtual bool isMessageIdEnabled() const {
00361 return this->messageIdEnabled;
00362 }
00363
00364 virtual void setMessageTimestampEnabled(bool messageTimestampEnabled) {
00365 this->messageTimestampEnabled = messageTimestampEnabled;
00366 }
00367
00368 virtual bool isMessageTimestampEnabled() const {
00369 return this->messageTimestampEnabled;
00370 }
00371
00372 virtual void setNoLocal(bool noLocal) {
00373 this->noLocal = noLocal;
00374 }
00375
00376 virtual bool isNoLocal() const {
00377 return this->noLocal;
00378 }
00379
00380 virtual void setReceiveTimeout(long long receiveTimeout) {
00381 this->receiveTimeout = receiveTimeout;
00382 }
00383
00384 virtual long long getReceiveTimeout() const {
00385 return this->receiveTimeout;
00386 }
00387
00396 virtual void setExplicitQosEnabled(bool explicitQosEnabled) {
00397 this->explicitQosEnabled = explicitQosEnabled;
00398 }
00399
00412 virtual bool isExplicitQosEnabled() const {
00413 return this->explicitQosEnabled;
00414 }
00415
00424 virtual void setDeliveryPersistent(bool deliveryPersistent) {
00425 this->deliveryMode = (deliveryPersistent ? 0 : 1);
00426 }
00427
00436 virtual void setDeliveryMode(int deliveryMode) {
00437 this->deliveryMode = deliveryMode;
00438 }
00439
00443 virtual int getDeliveryMode() const {
00444 return this->deliveryMode;
00445 }
00446
00454 virtual void setPriority(int priority) {
00455 this->priority = priority;
00456 }
00457
00461 virtual int getPriority() const {
00462 return this->priority;
00463 }
00464
00473 virtual void setTimeToLive(long long timeToLive) {
00474 this->timeToLive = timeToLive;
00475 }
00476
00480 virtual long long getTimeToLive() const {
00481 return this->timeToLive;
00482 }
00483
00490 virtual void execute(SessionCallback* action);
00491
00500 virtual void execute(ProducerCallback* action);
00501
00512 virtual void execute(cms::Destination* dest, ProducerCallback* action);
00513
00526 virtual void execute(const std::string& destinationName, ProducerCallback* action);
00527
00535 virtual void send(MessageCreator* messageCreator);
00536
00546 virtual void send(cms::Destination* dest, MessageCreator* messageCreator);
00547
00557 virtual void send(const std::string& destinationName, MessageCreator* messageCreator);
00558
00564 virtual cms::Message* receive();
00565
00573 virtual cms::Message* receive(cms::Destination* destination);
00574
00583 virtual cms::Message* receive(const std::string& destinationName);
00584
00594 virtual cms::Message* receiveSelected(const std::string& selector);
00595
00607 virtual cms::Message* receiveSelected(cms::Destination* destination, const std::string& selector);
00608
00621 virtual cms::Message* receiveSelected(const std::string& destinationName, const std::string& selector);
00622
00623 protected:
00624
00625 void init();
00626
00627 void destroy();
00628
00629 private:
00630
00634 void initDefaults();
00635
00639 void createSessionPools();
00640
00644 void destroySessionPools();
00645
00652 void checkDefaultDestination();
00653
00660 cms::Connection* getConnection();
00661
00668 PooledSession* takeSession();
00669
00676 void returnSession(PooledSession*& session);
00677
00689 cms::MessageProducer* createProducer(cms::Session* session, cms::Destination* dest);
00690
00697 void destroyProducer(cms::MessageProducer*& producer);
00698
00710 cms::MessageConsumer* createConsumer(cms::Session* session, cms::Destination* dest,
00711 const std::string& selector, bool noLocal);
00712
00719 void destroyConsumer(cms::MessageConsumer*& consumer);
00720
00726 void destroyMessage(cms::Message*& message);
00727
00738 void doSend(cms::Session* session, cms::MessageProducer* producer, MessageCreator* messageCreator);
00739
00747 cms::Message* doReceive(cms::MessageConsumer* consumer);
00748
00756 cms::Destination* resolveDefaultDestination(cms::Session* session);
00757
00758 };
00759
00760 }}
00761
00762 #endif