00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one or more 00003 * contributor license agreements. See the NOTICE file distributed with 00004 * this work for additional information regarding copyright ownership. 00005 * The ASF licenses this file to You under the Apache License, Version 2.0 00006 * (the "License"); you may not use this file except in compliance with 00007 * the License. You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 #ifndef ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_ 00019 #define ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_ 00020 00021 #include <activemq/util/Config.h> 00022 #include <activemq/core/MessageDispatchChannel.h> 00023 #include <activemq/commands/ConsumerId.h> 00024 #include <activemq/commands/MessageDispatch.h> 00025 #include <activemq/threads/Task.h> 00026 #include <activemq/threads/TaskRunner.h> 00027 #include <decaf/lang/Pointer.h> 00028 00029 namespace activemq{ 00030 namespace core{ 00031 namespace kernels{ 00032 class ActiveMQSessionKernel; 00033 } 00034 00035 using decaf::lang::Pointer; 00036 using activemq::commands::MessageDispatch; 00037 00038 class ActiveMQConsumer; 00039 00044 class AMQCPP_API ActiveMQSessionExecutor : activemq::threads::Task { 00045 private: 00046 00048 activemq::core::kernels::ActiveMQSessionKernel* session; 00049 00051 Pointer<MessageDispatchChannel> messageQueue; 00052 00054 Pointer<activemq::threads::TaskRunner> taskRunner; 00055 00056 private: 00057 00058 ActiveMQSessionExecutor(const ActiveMQSessionExecutor&); 00059 ActiveMQSessionExecutor& operator=(const ActiveMQSessionExecutor&); 00060 00061 public: 00062 00066 ActiveMQSessionExecutor(activemq::core::kernels::ActiveMQSessionKernel* session); 00067 00071 virtual ~ActiveMQSessionExecutor(); 00072 00078 virtual void execute(const Pointer<MessageDispatch>& data); 00079 00085 virtual void executeFirst(const Pointer<MessageDispatch>& data); 00086 00090 virtual void clearMessagesInProgress() { 00091 this->messageQueue->clear(); 00092 } 00093 00097 virtual bool hasUncomsumedMessages() const { 00098 return !messageQueue->isClosed() && messageQueue->isRunning() && !messageQueue->isEmpty(); 00099 } 00100 00104 virtual void wakeup(); 00105 00109 virtual void start(); 00110 00114 virtual void stop(); 00115 00120 virtual void close() { 00121 this->messageQueue->close(); 00122 } 00123 00127 virtual bool isRunning() const { 00128 return this->messageQueue->isRunning(); 00129 } 00130 00134 virtual bool isEmpty() { 00135 return messageQueue->isEmpty(); 00136 } 00137 00141 virtual void clear() { 00142 this->messageQueue->clear(); 00143 } 00144 00151 virtual bool iterate(); 00152 00157 std::vector< Pointer<MessageDispatch> > getUnconsumedMessages() { 00158 return messageQueue->removeAll(); 00159 } 00160 00161 private: 00162 00167 virtual void dispatch(const Pointer<MessageDispatch>& data); 00168 00169 }; 00170 00171 }} 00172 00173 #endif /*ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_*/
1.6.1