00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _ACTIVE_TRANSPORT_FAILOVER_FAILOVERTRANSPORT_H_
00019 #define _ACTIVE_TRANSPORT_FAILOVER_FAILOVERTRANSPORT_H_
00020
00021 #include <activemq/util/Config.h>
00022
00023 #include <activemq/commands/Command.h>
00024 #include <activemq/commands/ConnectionId.h>
00025 #include <activemq/threads/TaskRunner.h>
00026 #include <activemq/threads/CompositeTaskRunner.h>
00027 #include <activemq/state/ConnectionStateTracker.h>
00028 #include <activemq/transport/CompositeTransport.h>
00029 #include <activemq/wireformat/WireFormat.h>
00030
00031 #include <decaf/util/List.h>
00032 #include <decaf/util/Properties.h>
00033 #include <decaf/net/URI.h>
00034 #include <decaf/io/IOException.h>
00035
00036 namespace activemq {
00037 namespace transport {
00038 namespace failover {
00039
00040 using namespace decaf::lang;
00041 using activemq::commands::Command;
00042 using activemq::commands::Response;
00043
00044 class FailoverTransportListener;
00045 class BackupTransportPool;
00046 class FailoverTransportImpl;
00047
00048 class AMQCPP_API FailoverTransport : public CompositeTransport,
00049 public activemq::threads::CompositeTask {
00050 private:
00051
00052 friend class FailoverTransportListener;
00053 friend class BackupTransportPool;
00054
00055 state::ConnectionStateTracker stateTracker;
00056
00057 FailoverTransportImpl* impl;
00058
00059 private:
00060
00061 FailoverTransport(const FailoverTransport&);
00062 FailoverTransport& operator=(const FailoverTransport&);
00063
00064 public:
00065
00066 FailoverTransport();
00067
00068 virtual ~FailoverTransport();
00069
00077 void reconnect(bool rebalance);
00078
00087 void add(bool rebalance, const std::string& uri);
00088
00089 public:
00090
00091 virtual void addURI(bool rebalance, const List<decaf::net::URI>& uris);
00092
00093 virtual void removeURI(bool rebalance, const List<decaf::net::URI>& uris);
00094
00095 public:
00096
00097 virtual void start();
00098
00099 virtual void stop();
00100
00101 virtual void close();
00102
00103 virtual void oneway(const Pointer<Command> command);
00104
00105 virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
00106 const Pointer<ResponseCallback> responseCallback);
00107
00108 virtual Pointer<Response> request(const Pointer<Command> command);
00109
00110 virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout);
00111
00112 virtual Pointer<wireformat::WireFormat> getWireFormat() const;
00113
00114 virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat AMQCPP_UNUSED) {}
00115
00116 virtual void setTransportListener(TransportListener* listener);
00117
00118 virtual TransportListener* getTransportListener() const;
00119
00120 virtual bool isFaultTolerant() const {
00121 return true;
00122 }
00123
00124 virtual bool isConnected() const;
00125
00126 virtual bool isClosed() const;
00127
00128 bool isInitialized() const;
00129
00130 void setInitialized(bool value);
00131
00132 virtual Transport* narrow(const std::type_info& typeId);
00133
00134 virtual std::string getRemoteAddress() const;
00135
00136 virtual void reconnect(const decaf::net::URI& uri);
00137
00138 virtual void updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& uris);
00139
00140 public:
00141
00146 virtual bool isPending() const;
00147
00155 virtual bool iterate();
00156
00157 public:
00158
00159 long long getTimeout() const;
00160
00161 void setTimeout(long long value);
00162
00163 long long getInitialReconnectDelay() const;
00164
00165 void setInitialReconnectDelay(long long value);
00166
00167 long long getMaxReconnectDelay() const;
00168
00169 void setMaxReconnectDelay(long long value);
00170
00171 long long getBackOffMultiplier() const;
00172
00173 void setBackOffMultiplier(long long value);
00174
00175 bool isUseExponentialBackOff() const;
00176
00177 void setUseExponentialBackOff(bool value);
00178
00179 bool isRandomize() const;
00180
00181 void setRandomize(bool value);
00182
00183 int getMaxReconnectAttempts() const;
00184
00185 void setMaxReconnectAttempts(int value);
00186
00187 int getStartupMaxReconnectAttempts() const;
00188
00189 void setStartupMaxReconnectAttempts(int value);
00190
00191 long long getReconnectDelay() const;
00192
00193 void setReconnectDelay(long long value);
00194
00195 bool isBackup() const;
00196
00197 void setBackup(bool value);
00198
00199 int getBackupPoolSize() const;
00200
00201 void setBackupPoolSize(int value);
00202
00203 bool isTrackMessages() const;
00204
00205 void setTrackMessages(bool value);
00206
00207 bool isTrackTransactionProducers() const;
00208
00209 void setTrackTransactionProducers(bool value);
00210
00211 int getMaxCacheSize() const;
00212
00213 void setMaxCacheSize(int value);
00214
00215 int getMaxPullCacheSize() const;
00216
00217 void setMaxPullCacheSize(int value);
00218
00219 bool isReconnectSupported() const;
00220
00221 void setReconnectSupported(bool value);
00222
00223 bool isUpdateURIsSupported() const;
00224
00225 void setUpdateURIsSupported(bool value);
00226
00227 bool isRebalanceUpdateURIs() const;
00228
00229 void setRebalanceUpdateURIs(bool rebalanceUpdateURIs);
00230
00231 bool isPriorityBackup() const;
00232
00233 void setPriorityBackup(bool priorityBackup);
00234
00235 void setPriorityURIs(const std::string& priorityURIs);
00236
00237 const decaf::util::List<decaf::net::URI>& getPriorityURIs() const;
00238
00239 void setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId);
00240
00241 bool isConnectedToPriority() const;
00242
00243 protected:
00244
00254 void restoreTransport(const Pointer<Transport> transport);
00255
00261 void handleTransportFailure(const decaf::lang::Exception& error);
00262
00271 void handleConnectionControl(const Pointer<Command> control);
00272
00273 private:
00274
00283 Pointer<Transport> createTransport(const decaf::net::URI& location) const;
00284
00285 void processNewTransports(bool rebalance, std::string newTransports);
00286
00287 void processResponse(const Pointer<Response> response);
00288
00289 };
00290
00291 }}}
00292
00293 #endif