xrootd
XrdClParallelOperation.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#ifndef __XRD_CL_PARALLELOPERATION_HH__
27#define __XRD_CL_PARALLELOPERATION_HH__
28
31
32#include <atomic>
33
34namespace XrdCl
35{
36
38 {
39
40 };
41
42 //----------------------------------------------------------------------------
48 //----------------------------------------------------------------------------
49 template<bool HasHndl>
50 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
51 {
52 template<bool> friend class ParallelOperation;
53
54 public:
55
56 //------------------------------------------------------------------------
58 //------------------------------------------------------------------------
59 template<bool from>
61 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
62 pipelines( std::move( obj.pipelines ) )
63 {
64 }
65
66 //------------------------------------------------------------------------
72 //------------------------------------------------------------------------
73 template<class Container>
74 ParallelOperation( Container &&container )
75 {
76 static_assert( !HasHndl, "Constructor is available only operation without handler");
77
78 pipelines.reserve( container.size() );
79 auto begin = std::make_move_iterator( container.begin() );
80 auto end = std::make_move_iterator( container.end() );
81 std::copy( begin, end, std::back_inserter( pipelines ) );
82 container.clear(); // there's junk inside so we clear it
83 }
84
85 //------------------------------------------------------------------------
87 //------------------------------------------------------------------------
88 std::string ToString()
89 {
90 std::ostringstream oss;
91 oss << "Parallel(";
92 for( size_t i = 0; i < pipelines.size(); i++ )
93 {
94 oss << pipelines[i]->ToString();
95 if( i + 1 != pipelines.size() )
96 {
97 oss << " && ";
98 }
99 }
100 oss << ")";
101 return oss.str();
102 }
103
104 private:
105
106 //------------------------------------------------------------------------
111 //------------------------------------------------------------------------
112 struct Ctx
113 {
114 //----------------------------------------------------------------------
118 //----------------------------------------------------------------------
120 {
121
122 }
123
124 //----------------------------------------------------------------------
126 //----------------------------------------------------------------------
128 {
129 Handle( XRootDStatus() );
130 }
131
132 //----------------------------------------------------------------------
137 //----------------------------------------------------------------------
138 void Handle( const XRootDStatus &st )
139 {
140 PipelineHandler* hdlr = handler.exchange( nullptr );
141 if( hdlr )
142 hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
143 }
144
145 //----------------------------------------------------------------------
147 //----------------------------------------------------------------------
148 std::atomic<PipelineHandler*> handler;
149 };
150
151 //------------------------------------------------------------------------
157 //------------------------------------------------------------------------
159 {
160 std::shared_ptr<Ctx> ctx( new Ctx( this->handler.release() ) );
161
162 try
163 {
164 for( size_t i = 0; i < pipelines.size(); ++i )
165 {
166 pipelines[i].Run( [ctx]( const XRootDStatus &st ){ if( !st.IsOK() ) ctx->Handle( st ); } );
167 }
168 }
169 catch( const PipelineException& ex )
170 {
171 return ex.GetError();
172 }
173 catch( const std::exception& ex )
174 {
175 return XRootDStatus( stError, ex.what() );
176 }
177
178 return XRootDStatus();
179 }
180
181 std::vector<Pipeline> pipelines;
182 };
183
184 //----------------------------------------------------------------------------
186 //----------------------------------------------------------------------------
187 template<class Container>
188 ParallelOperation<false> Parallel( Container &container )
189 {
190 return ParallelOperation<false>( container );
191 }
192
193 //----------------------------------------------------------------------------
195 //----------------------------------------------------------------------------
196 inline void PipesToVec( std::vector<Pipeline>& )
197 {
198 // base case
199 }
200
201 //----------------------------------------------------------------------------
202 // Declare PipesToVec (we need to do declare those functions ahead of
203 // definitions, as they may call each other.
204 //----------------------------------------------------------------------------
205 template<typename ... Others>
206 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
207 Others&... others );
208
209 template<typename ... Others>
210 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
211 Others&... others );
212
213 template<typename ... Others>
214 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
215 Others&... others );
216
217 //----------------------------------------------------------------------------
218 // Define PipesToVec
219 //----------------------------------------------------------------------------
220 template<typename ... Others>
221 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
222 Others&... others )
223 {
224 v.emplace_back( operation );
225 PipesToVec( v, others... );
226 }
227
228 template<typename ... Others>
229 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
230 Others&... others )
231 {
232 v.emplace_back( operation );
233 PipesToVec( v, others... );
234 }
235
236 template<typename ... Others>
237 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
238 Others&... others )
239 {
240 v.emplace_back( std::move( pipeline ) );
241 PipesToVec( v, others... );
242 }
243
244 //----------------------------------------------------------------------------
249 //----------------------------------------------------------------------------
250 template<typename ... Operations>
251 ParallelOperation<false> Parallel( Operations&& ... operations )
252 {
253 constexpr size_t size = sizeof...( operations );
254 std::vector<Pipeline> v;
255 v.reserve( size );
256 PipesToVec( v, operations... );
257 return Parallel( v );
258 }
259}
260
261#endif // __XRD_CL_OPERATIONS_HH__
Definition: XrdClOperations.hh:468
Definition: XrdClOperations.hh:156
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:280
Definition: XrdClParallelOperation.hh:38
Definition: XrdClParallelOperation.hh:51
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:181
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:60
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:74
XRootDStatus RunImpl()
Definition: XrdClParallelOperation.hh:158
std::string ToString()
Definition: XrdClParallelOperation.hh:88
Pipeline exception, wrapps an XRootDStatus.
Definition: XrdClOperationHandlers.hh:312
const XRootDStatus & GetError() const
Definition: XrdClOperationHandlers.hh:351
const char * what() const noexcept
inherited from std::exception
Definition: XrdClOperationHandlers.hh:343
Definition: XrdClOperations.hh:51
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
Definition: XrdClOperations.hh:296
Request status.
Definition: XrdClXRootDResponses.hh:213
Definition: XrdClAnyObject.hh:26
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:196
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
ParallelOperation< false > Parallel(Container &container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:188
Definition: XrdClParallelOperation.hh:113
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:138
Ctx(PipelineHandler *handler)
Definition: XrdClParallelOperation.hh:119
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:127
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:148
Definition: XrdClOperationHandlers.hh:554
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:119