00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
00018 #define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
00019
00020 #include <cms/MessageConsumer.h>
00021 #include <cms/MessageListener.h>
00022 #include <cms/MessageAvailableListener.h>
00023 #include <cms/Message.h>
00024 #include <cms/CMSException.h>
00025
00026 #include <activemq/util/Config.h>
00027 #include <activemq/exceptions/ActiveMQException.h>
00028 #include <activemq/commands/ConsumerInfo.h>
00029 #include <activemq/commands/MessageAck.h>
00030 #include <activemq/commands/MessageDispatch.h>
00031 #include <activemq/core/Dispatcher.h>
00032 #include <activemq/core/RedeliveryPolicy.h>
00033 #include <activemq/core/MessageDispatchChannel.h>
00034
00035 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
00036 #include <decaf/lang/Pointer.h>
00037 #include <decaf/util/concurrent/Mutex.h>
00038
00039 namespace activemq {
00040 namespace core {
00041 namespace kernels {
00042
00043 using decaf::lang::Pointer;
00044 using decaf::util::concurrent::atomic::AtomicBoolean;
00045
00046 class ActiveMQSessionKernel;
00047 class ActiveMQConsumerKernelConfig;
00048
00049 class AMQCPP_API ActiveMQConsumerKernel : public cms::MessageConsumer, public Dispatcher {
00050 private:
00051
00055 ActiveMQConsumerKernelConfig* internal;
00056
00060 ActiveMQSessionKernel* session;
00061
00065 Pointer<commands::ConsumerInfo> consumerInfo;
00066
00067 private:
00068
00069 ActiveMQConsumerKernel(const ActiveMQConsumerKernel&);
00070 ActiveMQConsumerKernel& operator=(const ActiveMQConsumerKernel&);
00071
00072 public:
00073
00074 ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
00075 const Pointer<commands::ConsumerId>& id,
00076 const Pointer<commands::ActiveMQDestination>& destination,
00077 const std::string& name,
00078 const std::string& selector,
00079 int prefetch,
00080 int maxPendingMessageCount,
00081 bool noLocal,
00082 bool browser,
00083 bool dispatchAsync,
00084 cms::MessageListener* listener);
00085
00086 virtual ~ActiveMQConsumerKernel();
00087
00088 public:
00089
00090 virtual void start();
00091
00092 virtual void stop();
00093
00094 virtual void close();
00095
00096 virtual cms::Message* receive();
00097
00098 virtual cms::Message* receive(int millisecs);
00099
00100 virtual cms::Message* receiveNoWait();
00101
00102 virtual void setMessageListener(cms::MessageListener* listener);
00103
00104 virtual cms::MessageListener* getMessageListener() const;
00105
00106 virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener);
00107
00108 virtual cms::MessageAvailableListener* getMessageAvailableListener() const;
00109
00110 virtual std::string getMessageSelector() const;
00111
00112 virtual void setMessageTransformer(cms::MessageTransformer* transformer);
00113
00114 virtual cms::MessageTransformer* getMessageTransformer() const;
00115
00116 public:
00117
00118 virtual void dispatch( const Pointer<MessageDispatch>& message );
00119
00120 virtual int getHashCode() const;
00121
00122 public:
00123
00129 void acknowledge();
00130
00136 void acknowledge(Pointer<commands::MessageDispatch> dispatch);
00137
00143 void acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType);
00144
00150 void commit();
00151
00157 void rollback();
00158
00164 void doClose();
00165
00171 void dispose();
00172
00177 const Pointer<commands::ConsumerInfo>& getConsumerInfo() const;
00178
00183 const Pointer<commands::ConsumerId>& getConsumerId() const;
00184
00188 bool isClosed() const;
00189
00194 bool isSynchronizationRegistered() const ;
00195
00200 void setSynchronizationRegistered(bool value);
00201
00207 bool iterate();
00208
00214 void deliverAcks();
00215
00219 void clearMessagesInProgress();
00220
00225 void inProgressClearRequired();
00226
00232 long long getLastDeliveredSequenceId() const;
00233
00239 bool isTransactedIndividualAck() const;
00240
00247 void setTransactedIndividualAck(bool value);
00248
00254 long long setFailoverRedeliveryWaitPeriod() const;
00255
00263 void setFailoverRedeliveryWaitPeriod(long long value);
00264
00271 void setLastDeliveredSequenceId(long long value);
00272
00276 int getMessageAvailableCount() const;
00277
00287 void setRedeliveryPolicy(RedeliveryPolicy* policy);
00288
00295 RedeliveryPolicy* getRedeliveryPolicy() const;
00296
00303 void setFailureError(decaf::lang::Exception* error);
00304
00311 decaf::lang::Exception* getFailureError() const;
00312
00317 void setPrefetchSize(int prefetchSize);
00318
00324 bool isInUse(Pointer<commands::ActiveMQDestination> destination) const;
00325
00332 long long getOptimizedAckScheduledAckInterval() const;
00333
00342 void setOptimizedAckScheduledAckInterval(long long value);
00343
00347 bool isOptimizeAcknowledge() const;
00348
00355 void setOptimizeAcknowledge(bool value);
00356
00357 protected:
00358
00374 Pointer<MessageDispatch> dequeue(long long timeout);
00375
00380 void beforeMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch);
00381
00387 void afterMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch, bool messageExpired);
00388
00389 private:
00390
00391 Pointer<cms::Message> createCMSMessage(Pointer<commands::MessageDispatch> dispatch);
00392
00393 void applyDestinationOptions(Pointer<commands::ConsumerInfo> info);
00394
00395 void sendPullRequest(long long timeout);
00396
00397 void checkClosed() const;
00398
00399 void checkMessageListener() const;
00400
00401 void ackLater(Pointer<commands::MessageDispatch> message, int ackType);
00402
00403 void immediateIndividualTransactedAck(Pointer<commands::MessageDispatch> dispatch);
00404
00405 Pointer<commands::MessageAck> makeAckForAllDeliveredMessages(int type);
00406
00407 bool isAutoAcknowledgeEach() const;
00408
00409 bool isAutoAcknowledgeBatch() const;
00410
00411 void registerSync();
00412
00413 void clearDispatchList();
00414
00415 };
00416
00417 }}}
00418
00419 #endif