XRootD
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClPostMaster.hh"
33 #include "XrdCl/XrdClJobManager.hh"
34 
35 #include <atomic>
36 #include <condition_variable>
37 #include <mutex>
38 
39 namespace XrdCl
40 {
41 
42  //----------------------------------------------------------------------------
43  // Interface for different execution policies:
44  // - all : all operations need to succeed in order for the parallel
45  // operation to be successful
46  // - any : just one of the operations needs to succeed in order for
47  // the parallel operation to be successful
48  // - some : n (user defined) operations need to succeed in order for
49  // the parallel operation to be successful
50  // - at least : at least n (user defined) operations need to succeed in
51  // order for the parallel operation to be successful (the
52  // user handler will be called only when all operations are
53  // resolved)
54  //
55  // @param status : status returned by one of the aggregated operations
56  //
57  // @return : true if the status should be passed to the user handler,
58  // false otherwise.
59  //----------------------------------------------------------------------------
61  {
62  virtual ~PolicyExecutor()
63  {
64  }
65 
66  virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67 
68  virtual XRootDStatus Result() = 0;
69  };
70 
71  //----------------------------------------------------------------------------
77  //----------------------------------------------------------------------------
78  template<bool HasHndl>
79  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80  {
81  template<bool> friend class ParallelOperation;
82 
83  public:
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  template<bool from>
90  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91  pipelines( std::move( obj.pipelines ) ),
92  policy( std::move( obj.policy ) )
93  {
94  }
95 
96  //------------------------------------------------------------------------
102  //------------------------------------------------------------------------
103  template<class Container>
104  ParallelOperation( Container &&container )
105  {
106  static_assert( !HasHndl, "Constructor is available only operation without handler");
107 
108  pipelines.reserve( container.size() );
109  auto begin = std::make_move_iterator( container.begin() );
110  auto end = std::make_move_iterator( container.end() );
111  std::copy( begin, end, std::back_inserter( pipelines ) );
112  container.clear(); // there's junk inside so we clear it
113  }
114 
116  {
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  std::string ToString()
123  {
124  std::ostringstream oss;
125  oss << "Parallel(";
126  for( size_t i = 0; i < pipelines.size(); i++ )
127  {
128  oss << pipelines[i]->ToString();
129  if( i + 1 != pipelines.size() )
130  {
131  oss << " && ";
132  }
133  }
134  oss << ")";
135  return oss.str();
136  }
137 
138  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
145  {
146  policy.reset( new AllPolicy() );
147  return std::move( *this );
148  }
149 
150  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
157  {
158  policy.reset( new AnyPolicy( pipelines.size() ) );
159  return std::move( *this );
160  }
161 
162  //------------------------------------------------------------------------
163  // Set policy to `Some`
167  //------------------------------------------------------------------------
168  ParallelOperation<HasHndl> Some( size_t threshold )
169  {
170  policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171  return std::move( *this );
172  }
173 
174  //------------------------------------------------------------------------
180  //------------------------------------------------------------------------
182  {
183  policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184  return std::move( *this );
185  }
186 
187  private:
188 
189  //------------------------------------------------------------------------
194  //------------------------------------------------------------------------
195  struct AllPolicy : public PolicyExecutor
196  {
197  bool Examine( const XrdCl::XRootDStatus &status )
198  {
199  // keep the status in case this is the final result
200  res = status;
201  if( status.IsOK() ) return false;
202  // we require all request to succeed
203  return true;
204  }
205 
206  XRootDStatus Result()
207  {
208  return res;
209  }
210 
211  XRootDStatus res;
212  };
213 
214  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  struct AnyPolicy : public PolicyExecutor
221  {
222  AnyPolicy( size_t size) : cnt( size )
223  {
224  }
225 
226  bool Examine( const XrdCl::XRootDStatus &status )
227  {
228  // keep the status in case this is the final result
229  res = status;
230  // decrement the counter
231  size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
232  // we require just one operation to be successful
233  if( status.IsOK() ) return true;
234  // lets see if this is the last one?
235  if( nb == 1 ) return true;
236  // we still have a chance there will be one that is successful
237  return false;
238  }
239 
240  XRootDStatus Result()
241  {
242  return res;
243  }
244 
245  private:
246  std::atomic<size_t> cnt;
247  XRootDStatus res;
248  };
249 
250  //------------------------------------------------------------------------
255  //------------------------------------------------------------------------
256  struct SomePolicy : PolicyExecutor
257  {
258  SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
259  threshold( threshold ), size( size )
260  {
261  }
262 
263  bool Examine( const XrdCl::XRootDStatus &status )
264  {
265  // keep the status in case this is the final result
266  res = status;
267  if( status.IsOK() )
268  {
269  size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270  if( s + 1 == threshold ) return true; // we reached the threshold
271  // we are not yet there
272  return false;
273  }
274  size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
275  // did we drop below the threshold
276  if( f == size - threshold ) return true;
277  // we still have a chance there will be enough of successful operations
278  return false;
279  }
280 
281  XRootDStatus Result()
282  {
283  return res;
284  }
285 
286  private:
287  std::atomic<size_t> failed;
288  std::atomic<size_t> succeeded;
289  const size_t threshold;
290  const size_t size;
291  XRootDStatus res;
292  };
293 
294  //------------------------------------------------------------------------
300  //------------------------------------------------------------------------
301  struct AtLeastPolicy : PolicyExecutor
302  {
303  AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
304  failed_cnt( 0 ),
305  failed_threshold( size - threshold )
306  {
307  }
308 
309  //----------------------------------------------------------------------
313  //----------------------------------------------------------------------
314  bool Examine( const XrdCl::XRootDStatus &status )
315  {
316  if (!status.IsOK()) {
317  if (failed_cnt.fetch_add(1, std::memory_order_relaxed) == failed_threshold) {
318  res = status;
319  return true;
320  }
321  }
322 
323  return pending_cnt.fetch_sub(1, std::memory_order_relaxed) == 1;
324  }
325 
326  XRootDStatus Result()
327  {
328  return res;
329  }
330 
331  private:
332  std::atomic<size_t> pending_cnt;
333  std::atomic<size_t> failed_cnt;
334  const size_t failed_threshold;
335  XRootDStatus res;
336  };
337 
338  //------------------------------------------------------------------------
340  //------------------------------------------------------------------------
341  struct barrier_t
342  {
343  barrier_t() : on( true ) { }
344 
345  void wait()
346  {
347  std::unique_lock<std::mutex> lck( mtx );
348  if( on ) cv.wait( lck );
349  }
350 
351  void lift()
352  {
353  std::unique_lock<std::mutex> lck( mtx );
354  on = false;
355  cv.notify_all();
356  }
357 
358  private:
359  std::condition_variable cv;
360  std::mutex mtx;
361  bool on;
362  };
363 
364  //------------------------------------------------------------------------
369  //------------------------------------------------------------------------
370  struct Ctx
371  {
372  //----------------------------------------------------------------------
376  //----------------------------------------------------------------------
377  Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
378  policy( policy )
379  {
380  }
381 
382  //----------------------------------------------------------------------
384  //----------------------------------------------------------------------
385  ~Ctx()
386  {
387  Handle( XRootDStatus() );
388  }
389 
390  //----------------------------------------------------------------------
395  //----------------------------------------------------------------------
396  inline void Examine( const XRootDStatus &st )
397  {
398  if( policy->Examine( st ) )
399  Handle( policy->Result() );
400  }
401 
402  //----------------------------------------------------------------------
407  //---------------------------------------------------------------------
408  inline void Handle( const XRootDStatus &st )
409  {
410  PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
411  if( hdlr )
412  {
413  barrier.wait();
414  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
415  }
416  }
417 
418  //----------------------------------------------------------------------
420  //----------------------------------------------------------------------
421  std::atomic<PipelineHandler*> handler;
422 
423  //----------------------------------------------------------------------
425  //----------------------------------------------------------------------
426  std::unique_ptr<PolicyExecutor> policy;
427 
428  //----------------------------------------------------------------------
431  //----------------------------------------------------------------------
432  barrier_t barrier;
433  };
434 
435  //------------------------------------------------------------------------
437  //------------------------------------------------------------------------
438  struct PipelineEnd : public Job
439  {
440  //----------------------------------------------------------------------
441  // Constructor
442  //----------------------------------------------------------------------
443  PipelineEnd( std::shared_ptr<Ctx> &ctx,
444  const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
445  {
446  }
447 
448  //----------------------------------------------------------------------
449  // Run Ctx::Examine in the thread-pool
450  //----------------------------------------------------------------------
451  void Run( void* )
452  {
453  ctx->Examine( st );
454  delete this;
455  }
456 
457  private:
458  std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
459  XrdCl::XRootDStatus st; //< final status of the ParallelOperation
460  };
461 
462  //------------------------------------------------------------------------
464  //------------------------------------------------------------------------
465  inline static
466  void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
467  {
469  PipelineEnd *end = new PipelineEnd( ctx, st );
470  mgr->QueueJob( end, nullptr );
471  }
472 
473  //------------------------------------------------------------------------
479  //------------------------------------------------------------------------
480  XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
481  {
482  // make sure we have a valid policy for the parallel operation
483  if( !policy ) policy.reset( new AllPolicy() );
484 
485  std::shared_ptr<Ctx> ctx =
486  std::make_shared<Ctx>( handler, policy.release() );
487 
488  uint16_t timeout = pipelineTimeout < this->timeout ?
489  pipelineTimeout : this->timeout;
490 
491  for( size_t i = 0; i < pipelines.size(); ++i )
492  {
493  if( !pipelines[i] ) continue;
494  pipelines[i].Run( timeout,
495  [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
496  }
497 
498  ctx->barrier.lift();
499  return XRootDStatus();
500  }
501 
502  std::vector<Pipeline> pipelines;
503  std::unique_ptr<PolicyExecutor> policy;
504  };
505 
506  //----------------------------------------------------------------------------
508  //----------------------------------------------------------------------------
509  template<class Container>
510  inline ParallelOperation<false> Parallel( Container &&container )
511  {
512  return ParallelOperation<false>( container );
513  }
514 
515  //----------------------------------------------------------------------------
517  //----------------------------------------------------------------------------
518  inline void PipesToVec( std::vector<Pipeline>& )
519  {
520  // base case
521  }
522 
523  //----------------------------------------------------------------------------
524  // Declare PipesToVec (we need to do declare those functions ahead of
525  // definitions, as they may call each other.
526  //----------------------------------------------------------------------------
527  template<typename ... Others>
528  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
529  Others&... others );
530 
531  template<typename ... Others>
532  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
533  Others&... others );
534 
535  template<typename ... Others>
536  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
537  Others&... others );
538 
539  //----------------------------------------------------------------------------
540  // Define PipesToVec
541  //----------------------------------------------------------------------------
542  template<typename ... Others>
543  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
544  Others&... others )
545  {
546  v.emplace_back( operation );
547  PipesToVec( v, others... );
548  }
549 
550  template<typename ... Others>
551  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
552  Others&... others )
553  {
554  v.emplace_back( operation );
555  PipesToVec( v, others... );
556  }
557 
558  template<typename ... Others>
559  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
560  Others&... others )
561  {
562  v.emplace_back( std::move( pipeline ) );
563  PipesToVec( v, others... );
564  }
565 
566  //----------------------------------------------------------------------------
571  //----------------------------------------------------------------------------
572  template<typename ... Operations>
573  inline ParallelOperation<false> Parallel( Operations&& ... operations )
574  {
575  constexpr size_t size = sizeof...( operations );
576  std::vector<Pipeline> v;
577  v.reserve( size );
578  PipesToVec( v, operations... );
579  return Parallel( v );
580  }
581 }
582 
583 #endif // __XRD_CL_OPERATIONS_HH__
static PostMaster * GetPostMaster()
Get default post master.
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124