XRootD
XrdClXCpCtx.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClXCpCtx.hh"
26 #include "XrdCl/XrdClXCpSrc.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClDefaultEnv.hh"
29 #include "XrdCl/XrdClConstants.hh"
30 
31 #include <algorithm>
32 
33 namespace XrdCl
34 {
35 
36 XCpCtx::XCpCtx( const std::vector<std::string> &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize ) :
37  pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38  pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39  pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40  pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
41 {
42  SetFileSize( fileSize );
43 }
44 
45 XCpCtx::~XCpCtx()
46 {
47  // at this point there's no concurrency
48  // this object dies as the last one
49  while( !pSink.IsEmpty() )
50  {
51  PageInfo *chunk = pSink.Get();
52  if( chunk )
53  XCpSrc::DeleteChunk( chunk );
54  }
55 }
56 
57 bool XCpCtx::GetNextUrl( std::string & url )
58 {
59  XrdSysMutexHelper lck( pMtx );
60  if( pUrls.empty() ) return false;
61  url = pUrls.front();
62  pUrls.pop();
63  return true;
64 }
65 
67 {
68  uint64_t transferRate = -1; // set transferRate to max uint64 value
69  XCpSrc *ret = 0;
70 
71  std::list<XCpSrc*>::iterator itr;
72  XrdSysMutexHelper lck( pMtx );
73 
74  for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
75  {
76  XCpSrc *src = *itr;
77  if( src == exclude ) continue;
78  uint64_t tmp = src->TransferRate();
79  if( src->HasData() && tmp < transferRate )
80  {
81  ret = src;
82  transferRate = tmp;
83  }
84  }
85 
86  if( !ret ) return ret;
87  return ret->Self();
88 }
89 
90 void XCpCtx::PutChunk( PageInfo* chunk )
91 {
92  pSink.Put( chunk );
93 }
94 
95 std::pair<uint64_t, uint64_t> XCpCtx::GetBlock()
96 {
97  XrdSysMutexHelper lck( pMtx );
98 
99  uint64_t blkSize = pBlockSize, offset = pOffset;
100  if( pOffset + blkSize > uint64_t( pFileSize ) )
101  blkSize = pFileSize - pOffset;
102  pOffset += blkSize;
103 
104  return std::make_pair( offset, blkSize );
105 }
106 
107 void XCpCtx::SetFileSize( int64_t size )
108 {
109  XrdSysCondVarHelper lckcv( pFileSizeCV );
110  XrdSysMutexHelper lckmtx( pMtx );
111  if( pFileSize < 0 && size >= 0 )
112  {
113  pFileSize = size;
114  pFileSizeCV.Broadcast();
115 
116  if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
117  pBlockSize = pFileSize / pParallelSrc;
118 
119  if( pBlockSize < pChunkSize )
120  pBlockSize = pChunkSize;
121  }
122 }
123 
125 {
126  for( uint8_t i = 0; i < pParallelSrc; ++i )
127  {
128  XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
129  pSources.push_back( src );
130  }
131 
132  auto scpy = pSources;
133  bool ok = false;
134  for(auto src: scpy) {
135  if( src->Start() )
136  {
137  // src destructor will remove src from pSources
138  src->Delete();
139  }
140  else
141  {
142  ok = true;
143  }
144  }
145 
146  if( !ok )
147  {
148  Log *log = DefaultEnv::GetLog();
149  log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
150  return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." );
151  }
152 
153  return XRootDStatus();
154 }
155 
157 {
158  // if we received all the data we are done here
159  if( pDataReceived == uint64_t( pFileSize ) )
160  {
161  XrdSysCondVarHelper lck( pDoneCV );
162  pDone = true;
163  pDoneCV.Broadcast();
164  return XRootDStatus( stOK, suDone );
165  }
166 
167  // if we don't have active sources it means we failed
168  if( GetRunning() == 0 )
169  {
170  XrdSysCondVarHelper lck( pDoneCV );
171  pDone = true;
172  pDoneCV.Broadcast();
174  }
175 
176  PageInfo *chunk = pSink.Get();
177  if( chunk )
178  {
179  pDataReceived += chunk->GetLength();
180  ci = std::move( *chunk );
181  delete chunk;
182  return XRootDStatus( stOK, suContinue );
183  }
184 
185  return XRootDStatus( stOK, suRetry );
186 }
187 
189 {
190  pDoneCV.Broadcast();
191 }
192 
194 {
195  XrdSysCondVarHelper lck( pDoneCV );
196 
197  if( !pDone )
198  pDoneCV.Wait( 60 );
199 
200  return pDone;
201 }
202 
203 size_t XCpCtx::GetRunning()
204 {
205  // count active sources
206  size_t nbRunning = 0;
207  std::list<XCpSrc*>::iterator itr;
208  XrdSysMutexHelper lck( pMtx );
209 
210  for( itr = pSources.begin() ; itr != pSources.end() ; ++ itr)
211  if( (*itr)->IsRunning() )
212  ++nbRunning;
213  return nbRunning;
214 }
215 
216 
217 } /* namespace XrdCl */
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void NotifyIdleSrc()
Definition: XrdClXCpCtx.cc:188
bool GetNextUrl(std::string &url)
Definition: XrdClXCpCtx.cc:57
XCpSrc * WeakestLink(XCpSrc *exclude)
Definition: XrdClXCpCtx.cc:66
void PutChunk(PageInfo *chunk)
Definition: XrdClXCpCtx.cc:90
XCpCtx(const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
Definition: XrdClXCpCtx.cc:36
void SetFileSize(int64_t size)
Definition: XrdClXCpCtx.cc:107
std::pair< uint64_t, uint64_t > GetBlock()
Definition: XrdClXCpCtx.cc:95
XRootDStatus Initialize()
Definition: XrdClXCpCtx.cc:124
XRootDStatus GetChunk(XrdCl::PageInfo &ci)
Definition: XrdClXCpCtx.cc:156
static void DeleteChunk(PageInfo *&chunk)
Definition: XrdClXCpSrc.hh:131
XCpSrc * Self()
Definition: XrdClXCpSrc.hh:89
uint64_t TransferRate()
Definition: XrdClXCpSrc.cc:596
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t UtilityMsg
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errNoMoreReplicas
No more replicas to try.
Definition: XrdClStatus.hh:65
uint32_t GetLength() const
Get the data length.