00001 00018 #ifndef _ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMAT_H_ 00019 #define _ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMAT_H_ 00020 00021 #include <activemq/util/Config.h> 00022 #include <activemq/commands/WireFormatInfo.h> 00023 #include <activemq/commands/DataStructure.h> 00024 #include <activemq/wireformat/WireFormat.h> 00025 #include <activemq/wireformat/openwire/utils/BooleanStream.h> 00026 #include <decaf/lang/Pointer.h> 00027 #include <decaf/util/Properties.h> 00028 #include <decaf/util/concurrent/atomic/AtomicBoolean.h> 00029 #include <decaf/lang/exceptions/IllegalStateException.h> 00030 #include <decaf/lang/exceptions/IllegalArgumentException.h> 00031 #include <memory> 00032 00033 namespace activemq { 00034 namespace wireformat { 00035 namespace openwire { 00036 00037 namespace marshal { 00038 class DataStreamMarshaller; 00039 } 00040 00041 using decaf::lang::Pointer; 00042 00043 class AMQCPP_API OpenWireFormat : public wireformat::WireFormat { 00044 protected: 00045 00046 // Declared here to make life easier. 00047 static const unsigned char NULL_TYPE; 00048 00049 // V1 if the default version we start at. 00050 static const int DEFAULT_VERSION; 00051 00052 // Defines the maximum supported openwire version 00053 static const int MAX_SUPPORTED_VERSION; 00054 00055 private: 00056 00057 // Configuration parameters 00058 decaf::util::Properties properties; 00059 00060 // Preferred WireFormatInfo 00061 Pointer<commands::WireFormatInfo> preferedWireFormatInfo; 00062 00063 // Marshalers 00064 std::vector< marshal::DataStreamMarshaller* > dataMarshallers; 00065 00066 // Uniquely Generated ID, initialize in the Ctor 00067 std::string id; 00068 00069 // Indicates when we are in the doUnmarshal call 00070 decaf::util::concurrent::atomic::AtomicBoolean receiving; 00071 00072 // WireFormat Data 00073 int version; 00074 bool stackTraceEnabled; 00075 bool tcpNoDelayEnabled; 00076 bool cacheEnabled; 00077 int cacheSize; 00078 bool tightEncodingEnabled; 00079 bool sizePrefixDisabled; 00080 long long maxInactivityDuration; 00081 long long maxInactivityDurationInitialDelay; 00082 00083 public: 00084 00089 OpenWireFormat(const decaf::util::Properties& properties); 00090 00091 virtual ~OpenWireFormat(); 00092 00096 virtual bool hasNegotiator() const { 00097 return true; 00098 } 00099 00103 virtual Pointer<transport::Transport> createNegotiator(const Pointer<transport::Transport> transport); 00104 00110 void addMarshaller(marshal::DataStreamMarshaller* marshaler); 00111 00115 virtual void marshal(const Pointer<commands::Command> command, const activemq::transport::Transport* transport, decaf::io::DataOutputStream* out); 00116 00120 virtual Pointer<commands::Command> unmarshal(const activemq::transport::Transport* transport, decaf::io::DataInputStream* in); 00121 00122 public: 00123 00131 virtual int tightMarshalNestedObject1(commands::DataStructure* object, utils::BooleanStream* bs); 00132 00142 void tightMarshalNestedObject2(commands::DataStructure* o, decaf::io::DataOutputStream* ds, utils::BooleanStream* bs); 00143 00153 commands::DataStructure* tightUnmarshalNestedObject(decaf::io::DataInputStream* dis, utils::BooleanStream* bs); 00154 00164 commands::DataStructure* looseUnmarshalNestedObject(decaf::io::DataInputStream* dis); 00165 00174 void looseMarshalNestedObject(commands::DataStructure* o, decaf::io::DataOutputStream* dataOut); 00175 00185 void renegotiateWireFormat(const commands::WireFormatInfo& info); 00186 00195 void setPreferedWireFormatInfo(const Pointer<commands::WireFormatInfo> info); 00196 00201 const Pointer<commands::WireFormatInfo>& getPreferedWireFormatInfo() const { 00202 return this->preferedWireFormatInfo; 00203 } 00204 00209 bool isStackTraceEnabled() const { 00210 return stackTraceEnabled; 00211 } 00212 00217 void setStackTraceEnabled(bool stackTraceEnabled) { 00218 this->stackTraceEnabled = stackTraceEnabled; 00219 } 00220 00225 bool isTcpNoDelayEnabled() const { 00226 return tcpNoDelayEnabled; 00227 } 00228 00233 void setTcpNoDelayEnabled(bool tcpNoDelayEnabled) { 00234 this->tcpNoDelayEnabled = tcpNoDelayEnabled; 00235 } 00236 00241 int getVersion() const { 00242 return version; 00243 } 00244 00253 void setVersion(int version); 00254 00260 virtual bool inReceive() const { 00261 return this->receiving.get(); 00262 } 00263 00268 bool isCacheEnabled() const { 00269 return cacheEnabled; 00270 } 00271 00276 void setCacheEnabled(bool cacheEnabled) { 00277 this->cacheEnabled = cacheEnabled; 00278 } 00279 00284 int getCacheSize() const { 00285 return cacheSize; 00286 } 00287 00292 void setCacheSize(int value) { 00293 this->cacheSize = value; 00294 } 00295 00300 bool isTightEncodingEnabled() const { 00301 return tightEncodingEnabled; 00302 } 00303 00308 void setTightEncodingEnabled(bool tightEncodingEnabled) { 00309 this->tightEncodingEnabled = tightEncodingEnabled; 00310 } 00311 00316 bool isSizePrefixDisabled() const { 00317 return sizePrefixDisabled; 00318 } 00319 00324 void setSizePrefixDisabled(bool sizePrefixDisabled) { 00325 this->sizePrefixDisabled = sizePrefixDisabled; 00326 } 00327 00332 long long getMaxInactivityDuration() const { 00333 return this->maxInactivityDuration; 00334 } 00335 00340 void setMaxInactivityDuration(long long value) { 00341 this->maxInactivityDuration = value; 00342 } 00343 00348 long long getMaxInactivityDurationInitialDelay() const { 00349 return this->maxInactivityDurationInitialDelay; 00350 } 00351 00356 void setMaxInactivityDurationInitialDelay(long long value) { 00357 this->maxInactivityDurationInitialDelay = value; 00358 } 00359 00360 protected: 00361 00375 commands::DataStructure* doUnmarshal(decaf::io::DataInputStream* dis); 00376 00382 void destroyMarshalers(); 00383 00384 }; 00385 00386 }}} 00387 00388 #endif /*_ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMAT_H_*/
1.6.1