xrootd
XrdClStream.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
21
22#include "XrdCl/XrdClPoller.hh"
23#include "XrdCl/XrdClStatus.hh"
24#include "XrdCl/XrdClURL.hh"
28#include "XrdCl/XrdClInQueue.hh"
29#include "XrdCl/XrdClUtils.hh"
30
32#include "XrdNet/XrdNetAddr.hh"
33#include <list>
34#include <vector>
35#include <functional>
36
37namespace XrdCl
38{
39 class Message;
40 class Channel;
41 class TransportHandler;
42 class TaskManager;
43 struct SubStreamData;
44
45 //----------------------------------------------------------------------------
47 //----------------------------------------------------------------------------
48 class Stream
49 {
50 public:
51 //------------------------------------------------------------------------
53 //------------------------------------------------------------------------
55 {
59 Error = 3
60 };
61
62 //------------------------------------------------------------------------
64 //------------------------------------------------------------------------
65 Stream( const URL *url, uint16_t streamNum );
66
67 //------------------------------------------------------------------------
69 //------------------------------------------------------------------------
71
72 //------------------------------------------------------------------------
74 //------------------------------------------------------------------------
76
77 //------------------------------------------------------------------------
79 //------------------------------------------------------------------------
81 OutgoingMsgHandler *handler,
82 bool stateful,
83 time_t expires );
84
85 //------------------------------------------------------------------------
87 //------------------------------------------------------------------------
88 void SetTransport( TransportHandler *transport )
89 {
90 pTransport = transport;
91 }
92
93 //------------------------------------------------------------------------
95 //------------------------------------------------------------------------
96 void SetPoller( Poller *poller )
97 {
98 pPoller = poller;
99 }
100
101 //------------------------------------------------------------------------
103 //------------------------------------------------------------------------
104 void SetIncomingQueue( InQueue *incomingQueue )
105 {
106 pIncomingQueue = incomingQueue;
107 delete pQueueIncMsgJob;
108 pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue );
109 }
110
111 //------------------------------------------------------------------------
113 //------------------------------------------------------------------------
114 void SetChannelData( AnyObject *channelData )
115 {
116 pChannelData = channelData;
117 }
118
119 //------------------------------------------------------------------------
121 //------------------------------------------------------------------------
122 void SetTaskManager( TaskManager *taskManager )
123 {
124 pTaskManager = taskManager;
125 }
126
127 //------------------------------------------------------------------------
129 //------------------------------------------------------------------------
130 void SetJobManager( JobManager *jobManager )
131 {
132 pJobManager = jobManager;
133 }
134
135 //------------------------------------------------------------------------
139 //------------------------------------------------------------------------
141
142 //------------------------------------------------------------------------
144 //------------------------------------------------------------------------
145 void Disconnect( bool force = false );
146
147 //------------------------------------------------------------------------
150 //------------------------------------------------------------------------
151 void Tick( time_t now );
152
153 //------------------------------------------------------------------------
155 //------------------------------------------------------------------------
156 const URL *GetURL() const
157 {
158 return pUrl;
159 }
160
161 //------------------------------------------------------------------------
163 //------------------------------------------------------------------------
164 uint16_t GetStreamNumber() const
165 {
166 return pStreamNum;
167 }
168
169 //------------------------------------------------------------------------
171 //------------------------------------------------------------------------
173
174 //------------------------------------------------------------------------
176 //------------------------------------------------------------------------
177 const std::string &GetName() const
178 {
179 return pStreamName;
180 }
181
182 //------------------------------------------------------------------------
184 //------------------------------------------------------------------------
185 void DisableIfEmpty( uint16_t subStream );
186
187 //------------------------------------------------------------------------
189 //------------------------------------------------------------------------
190 void OnIncoming( uint16_t subStream,
191 Message *msg,
192 uint32_t bytesReceived );
193
194 //------------------------------------------------------------------------
195 // Call when one of the sockets is ready to accept a new message
196 //------------------------------------------------------------------------
197 std::pair<Message *, OutgoingMsgHandler *>
198 OnReadyToWrite( uint16_t subStream );
199
200 //------------------------------------------------------------------------
201 // Call when a message is written to the socket
202 //------------------------------------------------------------------------
203 void OnMessageSent( uint16_t subStream,
204 Message *msg,
205 uint32_t bytesSent );
206
207 //------------------------------------------------------------------------
209 //------------------------------------------------------------------------
210 void OnConnect( uint16_t subStream );
211
212 //------------------------------------------------------------------------
214 //------------------------------------------------------------------------
215 void OnConnectError( uint16_t subStream, Status status );
216
217 //------------------------------------------------------------------------
219 //------------------------------------------------------------------------
220 void OnError( uint16_t subStream, Status status );
221
222 //------------------------------------------------------------------------
224 //------------------------------------------------------------------------
225 void ForceError( Status status );
226
227 //------------------------------------------------------------------------
229 //------------------------------------------------------------------------
230 void OnReadTimeout( uint16_t subStream, bool &isBroken );
231
232 //------------------------------------------------------------------------
234 //------------------------------------------------------------------------
235 void OnWriteTimeout( uint16_t subStream );
236
237 //------------------------------------------------------------------------
239 //------------------------------------------------------------------------
241
242 //------------------------------------------------------------------------
244 //------------------------------------------------------------------------
246
247 //------------------------------------------------------------------------
256 //------------------------------------------------------------------------
257 std::pair<IncomingMsgHandler *, bool>
258 InstallIncHandler( Message *msg, uint16_t stream );
259
260 //------------------------------------------------------------------------
262 //------------------------------------------------------------------------
263 void SetOnConnectHandler( Job *onConnJob )
264 {
265 delete pOnConnJob;
266 pOnConnJob = onConnJob;
267 }
268
269 private:
270
271 //------------------------------------------------------------------------
272 // Job queuing the incoming messages
273 //------------------------------------------------------------------------
274 class QueueIncMsgJob: public Job
275 {
276 public:
277 QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {};
278 virtual ~QueueIncMsgJob() {};
279 virtual void Run( void *arg )
280 {
281 Message *msg = (Message *)arg;
282 pQueue->AddMessage( msg );
283 }
284 private:
286 };
287
288 //------------------------------------------------------------------------
289 // Job handling the incoming messages
290 //------------------------------------------------------------------------
291 class HandleIncMsgJob: public Job
292 {
293 public:
294 HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {};
295 virtual ~HandleIncMsgJob() {};
296 virtual void Run( void *arg )
297 {
298 Message *msg = (Message *)arg;
299 pHandler->Process( msg );
300 delete this;
301 }
302 private:
304 };
305
306 //------------------------------------------------------------------------
308 //------------------------------------------------------------------------
309 void OnFatalError( uint16_t subStream,
310 Status status,
311 XrdSysMutexHelper &lock );
312
313 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
317
318 //------------------------------------------------------------------------
320 //------------------------------------------------------------------------
322
323 typedef std::vector<SubStreamData*> SubStreamList;
324
325 //------------------------------------------------------------------------
326 // Data members
327 //------------------------------------------------------------------------
328 const URL *pUrl;
329 uint16_t pStreamNum;
330 std::string pStreamName;
346 std::vector<XrdNetAddr> pAddresses;
349 uint64_t pSessionId;
350
351 //------------------------------------------------------------------------
352 // Jobs
353 //------------------------------------------------------------------------
355
356 //------------------------------------------------------------------------
357 // Monitoring info
358 //------------------------------------------------------------------------
361 uint64_t pBytesSent;
363
364 //------------------------------------------------------------------------
365 // Data stream on-connect handler
366 //------------------------------------------------------------------------
368 };
369}
370
371#endif // __XRD_CL_STREAM_HH__
Definition: XrdClAnyObject.hh:33
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:221
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:34
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:36
bool AddMessage(Message *msg)
Add a fully reconstructed message to the queue.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:69
virtual void Process(Message *msg)
Definition: XrdClPostMasterInterfaces.hh:126
A synchronized queue.
Definition: XrdClJobManager.hh:51
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:34
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:168
Interface for socket pollers.
Definition: XrdClPoller.hh:87
Definition: XrdClStream.hh:292
IncomingMsgHandler * pHandler
Definition: XrdClStream.hh:303
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:296
HandleIncMsgJob(IncomingMsgHandler *handler)
Definition: XrdClStream.hh:294
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:295
Definition: XrdClStream.hh:275
virtual ~QueueIncMsgJob()
Definition: XrdClStream.hh:278
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:279
QueueIncMsgJob(InQueue *queue)
Definition: XrdClStream.hh:277
InQueue * pQueue
Definition: XrdClStream.hh:285
Stream.
Definition: XrdClStream.hh:49
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:88
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:55
@ Disconnected
Not connected.
Definition: XrdClStream.hh:56
@ Error
Broken.
Definition: XrdClStream.hh:59
@ Connected
Connected.
Definition: XrdClStream.hh:57
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:58
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:104
void OnIncoming(uint16_t subStream, Message *msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
timeval pConnectionStarted
Definition: XrdClStream.hh:359
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:323
InQueue * pIncomingQueue
Definition: XrdClStream.hh:336
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:335
Utils::AddressType pAddressType
Definition: XrdClStream.hh:347
Status RequestClose(Message *resp)
Send close after an open request timed out.
void OnFatalError(uint16_t subStream, Status status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:96
void ForceConnect()
Force connection.
time_t pConnectionInitTime
Definition: XrdClStream.hh:343
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:122
Poller * pPoller
Definition: XrdClStream.hh:332
uint16_t pStreamNum
Definition: XrdClStream.hh:329
TaskManager * pTaskManager
Definition: XrdClStream.hh:333
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:130
uint16_t GetStreamNumber() const
Get the stream number.
Definition: XrdClStream.hh:164
uint32_t pLastStreamError
Definition: XrdClStream.hh:338
void Disconnect(bool force=false)
Disconnect the stream.
Job * pOnConnJob
Definition: XrdClStream.hh:367
void SetOnConnectHandler(Job *onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:263
uint16_t pConnectionRetry
Definition: XrdClStream.hh:342
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:177
uint64_t pBytesReceived
Definition: XrdClStream.hh:362
std::pair< IncomingMsgHandler *, bool > InstallIncHandler(Message *msg, uint16_t stream)
void Tick(time_t now)
AnyObject * pChannelData
Definition: XrdClStream.hh:337
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:346
~Stream()
Destructor.
QueueIncMsgJob * pQueueIncMsgJob
Definition: XrdClStream.hh:354
const URL * pUrl
Definition: XrdClStream.hh:328
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
JobManager * pJobManager
Definition: XrdClStream.hh:334
std::pair< Message *, OutgoingMsgHandler * > OnReadyToWrite(uint16_t subStream)
uint16_t pConnectionCount
Definition: XrdClStream.hh:341
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
void MonitorDisconnection(Status status)
Inform the monitoring about disconnection.
void OnError(uint16_t subStream, Status status)
On error.
std::string pStreamName
Definition: XrdClStream.hh:330
SubStreamList pSubStreams
Definition: XrdClStream.hh:345
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:348
uint64_t pBytesSent
Definition: XrdClStream.hh:361
void OnReadTimeout(uint16_t subStream, bool &isBroken)
On read timeout.
uint16_t pConnectionWindow
Definition: XrdClStream.hh:344
TransportHandler * pTransport
Definition: XrdClStream.hh:331
Status pLastFatalError
Definition: XrdClStream.hh:339
Status Initialize()
Initializer.
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:340
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:156
timeval pConnectionDone
Definition: XrdClStream.hh:360
Status Send(Message *msg, OutgoingMsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Status EnableLink(PathID &path)
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:114
uint64_t pSessionId
Definition: XrdClStream.hh:349
void ForceError(Status status)
Force error.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Stream(const URL *url, uint16_t streamNum)
Constructor.
void OnConnectError(uint16_t subStream, Status status)
On connect error.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
Definition: XrdClTaskManager.hh:76
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:303
URL representation.
Definition: XrdClURL.hh:31
AddressType
Address type.
Definition: XrdClUtils.hh:94
Definition: XrdSysPthread.hh:261
Definition: XrdSysPthread.hh:240
Definition: XrdClAnyObject.hh:26
Definition: XrdClPostMasterInterfaces.hh:283
Procedure execution status.
Definition: XrdClStatus.hh:110