00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _ACTIVEMQ_STATE_CONNECTIONSTATE_H_
00019 #define _ACTIVEMQ_STATE_CONNECTIONSTATE_H_
00020
00021 #include <activemq/util/Config.h>
00022 #include <activemq/commands/ConnectionInfo.h>
00023 #include <activemq/commands/DestinationInfo.h>
00024 #include <activemq/commands/SessionInfo.h>
00025 #include <activemq/commands/ConsumerId.h>
00026 #include <activemq/commands/ProducerId.h>
00027 #include <activemq/commands/TransactionId.h>
00028 #include <activemq/commands/LocalTransactionId.h>
00029 #include <activemq/state/ConsumerState.h>
00030 #include <activemq/state/ProducerState.h>
00031 #include <activemq/state/SessionState.h>
00032 #include <activemq/state/TransactionState.h>
00033
00034 #include <decaf/util/StlMap.h>
00035 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
00036 #include <decaf/util/concurrent/ConcurrentStlMap.h>
00037 #include <decaf/util/LinkedList.h>
00038 #include <decaf/lang/Pointer.h>
00039
00040 #include <string>
00041
00042 namespace activemq {
00043 namespace state {
00044
00045 using decaf::lang::Pointer;
00046 using namespace decaf::util;
00047 using namespace activemq::commands;
00048
00049 class AMQCPP_API ConnectionState {
00050 private:
00051
00052 Pointer< ConnectionInfo > info;
00053 ConcurrentStlMap< Pointer<LocalTransactionId>,
00054 Pointer<TransactionState>,
00055 LocalTransactionId::COMPARATOR > transactions;
00056 ConcurrentStlMap< Pointer<SessionId>,
00057 Pointer<SessionState>,
00058 SessionId::COMPARATOR > sessions;
00059 LinkedList< Pointer<DestinationInfo> > tempDestinations;
00060 decaf::util::concurrent::atomic::AtomicBoolean disposed;
00061
00062 bool connectionInterruptProcessingComplete;
00063 StlMap< Pointer<ConsumerId>,
00064 Pointer<ConsumerInfo>,
00065 ConsumerId::COMPARATOR > recoveringPullConsumers;
00066
00067 public:
00068
00069 ConnectionState(Pointer<ConnectionInfo> info);
00070
00071 virtual ~ConnectionState();
00072
00073 std::string toString() const;
00074
00075 const Pointer<commands::ConnectionInfo> getInfo() const {
00076 return this->info;
00077 }
00078
00079 void checkShutdown() const;
00080
00081 void shutdown();
00082
00083 void reset(Pointer<ConnectionInfo> info);
00084
00085 void addTempDestination(Pointer<DestinationInfo> info) {
00086 checkShutdown();
00087 tempDestinations.add(info);
00088 }
00089
00090 void removeTempDestination(Pointer<ActiveMQDestination> destination);
00091
00092 void addTransactionState(Pointer<TransactionId> id) {
00093 checkShutdown();
00094 transactions.put(id.dynamicCast<LocalTransactionId>(), Pointer<TransactionState>(new TransactionState(id)));
00095 }
00096
00097 const Pointer<TransactionState>& getTransactionState(Pointer<TransactionId> id) const {
00098 return transactions.get(id.dynamicCast<LocalTransactionId>());
00099 }
00100
00101 const decaf::util::Collection<Pointer<TransactionState> >& getTransactionStates() const {
00102 return transactions.values();
00103 }
00104
00105 Pointer<TransactionState> removeTransactionState(Pointer<TransactionId> id) {
00106 return transactions.remove(id.dynamicCast<LocalTransactionId>());
00107 }
00108
00109 void addSession(Pointer<SessionInfo> info) {
00110 checkShutdown();
00111 sessions.put(info->getSessionId(), Pointer<SessionState>(new SessionState(info)));
00112 }
00113
00114 Pointer<SessionState> removeSession(Pointer<SessionId> id) {
00115 return sessions.remove(id);
00116 }
00117
00118 const Pointer<SessionState> getSessionState(Pointer<SessionId> id) const {
00119 return sessions.get(id);
00120 }
00121
00122 const LinkedList<Pointer<DestinationInfo> >& getTempDesinations() const {
00123 return tempDestinations;
00124 }
00125
00126 const decaf::util::Collection<Pointer<SessionState> >& getSessionStates() const {
00127 return sessions.values();
00128 }
00129
00130 StlMap<Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR>& getRecoveringPullConsumers() {
00131 return recoveringPullConsumers;
00132 }
00133
00134 void setConnectionInterruptProcessingComplete(bool connectionInterruptProcessingComplete) {
00135 this->connectionInterruptProcessingComplete = connectionInterruptProcessingComplete;
00136 }
00137
00138 bool isConnectionInterruptProcessingComplete() {
00139 return this->connectionInterruptProcessingComplete;
00140 }
00141
00142 };
00143
00144 }}
00145
00146 #endif