00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one or more 00003 * contributor license agreements. See the NOTICE file distributed with 00004 * this work for additional information regarding copyright ownership. 00005 * The ASF licenses this file to You under the Apache License, Version 2.0 00006 * (the "License"); you may not use this file except in compliance with 00007 * the License. You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_ 00019 #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_ 00020 00021 #include <cms/EnhancedConnection.h> 00022 #include <activemq/util/Config.h> 00023 #include <activemq/core/Dispatcher.h> 00024 #include <activemq/commands/ActiveMQTempDestination.h> 00025 #include <activemq/commands/ConnectionInfo.h> 00026 #include <activemq/commands/ConsumerInfo.h> 00027 #include <activemq/commands/SessionId.h> 00028 #include <activemq/exceptions/ActiveMQException.h> 00029 #include <activemq/transport/Transport.h> 00030 #include <activemq/transport/TransportListener.h> 00031 #include <activemq/threads/Scheduler.h> 00032 #include <activemq/core/kernels/ActiveMQProducerKernel.h> 00033 #include <activemq/core/kernels/ActiveMQSessionKernel.h> 00034 #include <decaf/util/Properties.h> 00035 #include <decaf/util/concurrent/atomic/AtomicBoolean.h> 00036 #include <decaf/util/concurrent/ExecutorService.h> 00037 #include <decaf/lang/exceptions/UnsupportedOperationException.h> 00038 #include <decaf/lang/exceptions/NullPointerException.h> 00039 #include <decaf/lang/exceptions/IllegalStateException.h> 00040 00041 #include <string> 00042 #include <memory> 00043 00044 namespace activemq{ 00045 namespace core{ 00046 00047 using decaf::lang::Pointer; 00048 00049 class ActiveMQSession; 00050 class ConnectionConfig; 00051 class PrefetchPolicy; 00052 class RedeliveryPolicy; 00053 00060 class AMQCPP_API ActiveMQConnection : public virtual cms::EnhancedConnection, 00061 public transport::TransportListener { 00062 private: 00063 00064 ConnectionConfig* config; 00065 00069 std::auto_ptr<cms::ConnectionMetaData> connectionMetaData; 00070 00074 decaf::util::concurrent::atomic::AtomicBoolean started; 00075 00080 decaf::util::concurrent::atomic::AtomicBoolean closed; 00081 00086 decaf::util::concurrent::atomic::AtomicBoolean closing; 00087 00091 decaf::util::concurrent::atomic::AtomicBoolean transportFailed; 00092 00093 private: 00094 00095 ActiveMQConnection(const ActiveMQConnection&); 00096 ActiveMQConnection& operator=(const ActiveMQConnection&); 00097 00098 public: 00099 00108 ActiveMQConnection(const Pointer<transport::Transport> transport, 00109 const Pointer<decaf::util::Properties> properties); 00110 00111 virtual ~ActiveMQConnection(); 00112 00121 virtual void addSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session); 00122 00131 virtual void removeSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session); 00132 00141 virtual void addProducer(Pointer<kernels::ActiveMQProducerKernel> producer); 00142 00148 virtual void removeProducer(const Pointer<commands::ProducerId>& producerId); 00149 00156 virtual void addDispatcher(const Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher); 00157 00163 virtual void removeDispatcher(const Pointer<commands::ConsumerId>& consumer); 00164 00175 virtual void sendPullRequest(const commands::ConsumerInfo* consumer, long long timeout); 00176 00181 bool isClosed() const { 00182 return this->closed.get(); 00183 } 00184 00189 bool isStarted() const { 00190 return this->started.get(); 00191 } 00192 00197 bool isTransportFailed() const { 00198 return this->transportFailed.get(); 00199 } 00200 00219 virtual void destroyDestination(const commands::ActiveMQDestination* destination); 00220 00239 virtual void destroyDestination(const cms::Destination* destination); 00240 00251 bool isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message); 00252 00261 void rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message); 00262 00263 public: // Connection Interface Methods 00264 00268 virtual const cms::ConnectionMetaData* getMetaData() const { 00269 return connectionMetaData.get(); 00270 } 00271 00275 virtual cms::Session* createSession(); 00276 00280 virtual std::string getClientID() const; 00281 00285 virtual void setClientID(const std::string& clientID); 00286 00290 virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode); 00291 00295 virtual void close(); 00296 00300 virtual void start(); 00301 00305 virtual void stop(); 00306 00310 virtual cms::ExceptionListener* getExceptionListener() const; 00311 00315 virtual void setExceptionListener(cms::ExceptionListener* listener); 00316 00320 virtual void setMessageTransformer(cms::MessageTransformer* transformer); 00321 00325 virtual cms::MessageTransformer* getMessageTransformer() const; 00326 00330 virtual cms::DestinationSource* getDestinationSource(); 00331 00332 public: // Configuration Options 00333 00338 void setUsername(const std::string& username); 00339 00345 const std::string& getUsername() const; 00346 00351 void setPassword(const std::string& password); 00352 00358 const std::string& getPassword() const; 00359 00364 void setDefaultClientId(const std::string& clientId); 00365 00371 void setBrokerURL(const std::string& brokerURL); 00372 00378 const std::string& getBrokerURL() const; 00379 00388 void setPrefetchPolicy(PrefetchPolicy* policy); 00389 00395 PrefetchPolicy* getPrefetchPolicy() const; 00396 00405 void setRedeliveryPolicy(RedeliveryPolicy* policy); 00406 00412 RedeliveryPolicy* getRedeliveryPolicy() const; 00413 00417 bool isDispatchAsync() const; 00418 00427 void setDispatchAsync(bool value); 00428 00434 bool isAlwaysSyncSend() const; 00435 00441 void setAlwaysSyncSend(bool value); 00442 00447 bool isUseAsyncSend() const; 00448 00453 void setUseAsyncSend(bool value); 00454 00459 bool isUseCompression() const; 00460 00467 void setUseCompression(bool value); 00468 00478 void setCompressionLevel(int value); 00479 00485 int getCompressionLevel() const; 00486 00491 unsigned int getSendTimeout() const; 00492 00498 void setSendTimeout(unsigned int timeout); 00499 00504 unsigned int getCloseTimeout() const; 00505 00510 void setCloseTimeout(unsigned int timeout); 00511 00519 unsigned int getProducerWindowSize() const; 00520 00527 void setProducerWindowSize(unsigned int windowSize); 00528 00533 bool isMessagePrioritySupported() const; 00534 00542 void setMessagePrioritySupported(bool value); 00543 00548 long long getNextTempDestinationId(); 00549 00554 long long getNextLocalTransactionId(); 00555 00562 bool isWatchTopicAdvisories() const; 00563 00571 void setWatchTopicAdvisories(bool value); 00572 00581 int getAuditDepth() const; 00582 00592 void setAuditDepth(int auditDepth); 00593 00599 int getAuditMaximumProducerNumber() const; 00600 00607 void setAuditMaximumProducerNumber(int auditMaximumProducerNumber); 00608 00621 bool isCheckForDuplicates() const; 00622 00636 void setCheckForDuplicates(bool checkForDuplicates); 00637 00645 bool isTransactedIndividualAck() const; 00646 00655 void setTransactedIndividualAck(bool transactedIndividualAck); 00656 00663 bool isNonBlockingRedelivery() const; 00664 00673 void setNonBlockingRedelivery(bool nonBlockingRedelivery); 00674 00680 long long getConsumerFailoverRedeliveryWaitPeriod() const; 00681 00688 void setConsumerFailoverRedeliveryWaitPeriod(long long value); 00689 00693 bool isOptimizeAcknowledge() const; 00694 00701 void setOptimizeAcknowledge(bool optimizeAcknowledge); 00702 00708 long long getOptimizeAcknowledgeTimeOut() const; 00709 00716 void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut); 00717 00726 long long getOptimizedAckScheduledAckInterval() const; 00727 00737 void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval); 00738 00744 bool isUseRetroactiveConsumer() const; 00745 00754 void setUseRetroactiveConsumer(bool useRetroactiveConsumer); 00755 00761 bool isExclusiveConsumer() const; 00762 00770 void setExclusiveConsumer(bool exclusiveConsumer); 00771 00778 bool isSendAcksAsync() const; 00779 00787 void setSendAcksAsync(bool sendAcksAsync); 00788 00789 public: // TransportListener 00790 00802 void addTransportListener(transport::TransportListener* transportListener); 00803 00812 void removeTransportListener(transport::TransportListener* transportListener); 00813 00819 virtual void onCommand(const Pointer<commands::Command> command); 00820 00825 virtual void onException(const decaf::lang::Exception& ex); 00826 00830 virtual void transportInterrupted(); 00831 00835 virtual void transportResumed(); 00836 00837 public: 00838 00845 const commands::ConnectionInfo& getConnectionInfo() const; 00846 00853 const commands::ConnectionId& getConnectionId() const; 00854 00860 transport::Transport& getTransport() const; 00861 00867 Pointer<threads::Scheduler> getScheduler() const; 00868 00875 std::string getResourceManagerId() const; 00876 00881 void cleanup(); 00882 00893 void oneway(Pointer<commands::Command> command); 00894 00909 Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0); 00910 00923 void asyncRequest(Pointer<commands::Command> command, cms::AsyncCallback* onComplete); 00924 00929 virtual void fire(const exceptions::ActiveMQException& ex); 00930 00935 void setTransportInterruptionProcessingComplete(); 00936 00943 void setFirstFailureError(decaf::lang::Exception* error); 00944 00950 decaf::lang::Exception* getFirstFailureError() const; 00951 00958 void onAsyncException(const decaf::lang::Exception& ex); 00959 00966 void onClientInternalException(const decaf::lang::Exception& ex); 00967 00973 void checkClosed() const; 00974 00980 void checkClosedOrFailed() const; 00981 00985 void ensureConnectionInfoSent(); 00986 00990 decaf::util::concurrent::ExecutorService* getExecutor() const; 00991 00999 void addTempDestination(Pointer<commands::ActiveMQTempDestination> destination); 01000 01008 void removeTempDestination(Pointer<commands::ActiveMQTempDestination> destination); 01009 01019 void deleteTempDestination(Pointer<commands::ActiveMQTempDestination> destination); 01020 01027 void cleanUpTempDestinations(); 01028 01035 bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const; 01036 01037 protected: 01038 01042 virtual Pointer<commands::SessionId> getNextSessionId(); 01043 01044 // Sends a oneway disconnect message to the broker. 01045 void disconnect(long long lastDeliveredSequenceId); 01046 01047 // Waits for all Consumers to handle the Transport Interrupted event. 01048 void waitForTransportInterruptionProcessingToComplete(); 01049 01050 // Marks processing complete for a single caller when interruption processing completes. 01051 void signalInterruptionProcessingComplete(); 01052 01053 // Allow subclasses to access the original Properties object for this connection. 01054 const decaf::util::Properties& getProperties() const; 01055 01056 // Process the ControlCommand command 01057 void onControlCommand(Pointer<commands::Command> command); 01058 01059 // Process the ConnectionControl command 01060 void onConnectionControl(Pointer<commands::Command> command); 01061 01062 // Process the ConsumerControl command 01063 void onConsumerControl(Pointer<commands::Command> command); 01064 01065 }; 01066 01067 }} 01068 01069 #endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_*/
1.6.1