53 #include <sys/types.h>
59 #if __cplusplus < 201103L
68 template<
typename U = std::ratio<1, 1>>
72 mytimer_t() : start( clock_t::now() ){ }
73 void reset(){ start = clock_t::now(); }
74 uint64_t elapsed()
const
76 return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
79 typedef std::chrono::high_resolution_clock clock_t;
80 typedef std::chrono::duration<uint64_t, U> unit_t;
81 std::chrono::time_point<clock_t> start;
84 using timer_sec_t = mytimer_t<>;
85 using timer_nsec_t = mytimer_t<std::nano>;
89 std::vector<XrdCl::xattr_t> &out )
91 std::vector<XrdCl::xattr_t> ret;
92 ret.reserve( in.size() );
93 std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94 for( ; itr != in.end() ; ++itr )
96 if( !itr->status.IsOK() )
return itr->
status;
98 ret.push_back( std::move( xa ) );
108 std::vector<XrdCl::xattr_t> &xattrs )
110 std::vector<XrdCl::XAttr> rsp;
112 if( !st.
IsOK() )
return st;
113 return Translate( rsp, xattrs );
120 std::vector<XrdCl::xattr_t> &xattrs )
124 std::vector<XrdCl::XAttr> rsp;
126 if( !st.
IsOK() )
return st;
127 return Translate( rsp, xattrs );
131 const std::vector<XrdCl::xattr_t> &xattrs )
133 std::vector<XrdCl::XAttrStatus> rsp;
135 std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136 for( ; itr != rsp.end() ; ++itr )
137 if( !itr->status.IsOK() )
return itr->
status;
150 Source(
const std::string &checkSumType =
"",
151 const std::vector<std::string> &addcks = std::vector<std::string>() ) :
155 if( !checkSumType.empty() )
158 for(
auto &type : addcks )
165 for(
auto ptr : pAddCksHelpers )
177 virtual int64_t GetSize() = 0;
198 std::string &checkSumType ) = 0;
203 virtual std::vector<std::string> GetAddCks() = 0;
221 std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
234 Destination(
const std::string &checkSumType =
"" ):
235 pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236 pContinue( false ), pCkSumHelper( 0 )
238 if( !checkSumType.empty() )
245 virtual ~Destination()
277 std::string &checkSumType ) = 0;
287 virtual int64_t GetSize() = 0;
292 void SetPOSC(
bool posc )
300 void SetForce(
bool force )
308 void SetContinue(
bool continue_ )
310 pContinue = continue_;
316 void SetCoerce(
bool coerce )
324 void SetMakeDir(
bool makedir )
332 virtual const std::string& GetLastURL()
const
334 static const std::string empty;
341 virtual const std::string& GetWrtRecoveryRedir()
const
343 static const std::string empty;
360 class StdInSource:
public Source
366 StdInSource(
const std::string &ckSumType, uint32_t chunkSize,
const std::vector<std::string> &addcks ):
367 Source( ckSumType, addcks ),
369 pChunkSize( chunkSize )
377 virtual ~StdInSource()
389 auto st = pCkSumHelper->Initialize();
390 if( !st.
IsOK() )
return st;
391 for(
auto cksHelper : pAddCksHelpers )
393 st = cksHelper->Initialize();
394 if( !st.
IsOK() )
return st;
403 virtual int64_t GetSize()
414 "Cannot continue from stdin!" );
422 using namespace XrdCl;
423 Log *log = DefaultEnv::GetLog();
425 uint32_t toRead = pChunkSize;
426 char *buffer =
new char[toRead];
428 int64_t bytesRead = 0;
432 int64_t bRead =
read( 0, buffer+offset, toRead );
456 pCkSumHelper->Update( buffer, bytesRead );
458 for(
auto cksHelper : pAddCksHelpers )
459 cksHelper->Update( buffer, bytesRead );
462 pCurrentOffset += bytesRead;
470 std::string &checkSum,
471 std::string &checkSumType )
473 using namespace XrdCl;
475 return cksHelper->
GetCheckSum( checkSum, checkSumType );
483 std::string &checkSumType )
485 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
491 std::vector<std::string> GetAddCks()
493 std::vector<std::string> ret;
494 for(
auto cksHelper : pAddCksHelpers )
496 std::string type = cksHelper->
GetType();
498 GetCheckSumImpl( cksHelper, cks, type );
499 ret.push_back( type +
":" + cks );
513 StdInSource(
const StdInSource &other);
514 StdInSource &operator = (
const StdInSource &other);
516 uint64_t pCurrentOffset;
523 class XRootDSource:
public Source
527 virtual void Cancel() = 0;
536 template<
typename READER>
537 struct OnConnJob :
public CancellableJob
539 OnConnJob( XRootDSource *
self, READER *reader ) : self( self ), reader( reader )
545 std::unique_lock<std::mutex> lck( mtx );
546 if( !
self || !reader )
return;
548 if( self->pNbConn < self->pMaxNbConn )
549 self->FillQueue( reader );
554 std::unique_lock<std::mutex> lck( mtx );
572 return pFile->TryOtherServer();
580 uint8_t parallelChunks,
581 const std::string &ckSumType,
582 const std::vector<std::string> &addcks,
584 Source( ckSumType, addcks ),
585 pUrl( url ), pFile( new
XrdCl::
File() ), pSize( -1 ),
586 pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587 pParallel( parallelChunks ),
588 pNbConn( 0 ), pUsePgRead( false ),
589 pDoServer( doserver )
593 pMaxNbConn = val - 1;
599 virtual ~XRootDSource()
602 pDataConnCB->Cancel();
605 if( pFile->IsOpen() )
615 using namespace XrdCl;
616 Log *log = DefaultEnv::GetLog();
618 pUrl->GetObfuscatedURL().c_str() );
621 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
622 pFile->SetProperty(
"ReadRecovery", value );
629 st = pFile->Stat(
false, statInfo );
636 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
638 st = pCkSumHelper->Initialize();
639 if( !st.
IsOK() )
return st;
641 for(
auto cksHelper : pAddCksHelpers )
644 if( !st.
IsOK() )
return st;
651 if( !pUrl->IsLocalFile() ||
652 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
654 pFile->GetProperty(
"LastURL", pDataServer );
658 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
673 if( pDoServer && !pUrl->IsLocalFile() )
676 DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677 std::string *ipstack =
nullptr;
679 std::cerr <<
"!-!" << *ipstack << std::endl;
683 SetOnDataConnectHandler( pFile );
691 virtual int64_t GetSize()
701 pCurrentOffset = offset;
716 return GetChunkImpl( pFile, ci );
732 while( !pChunks.empty() )
737 delete [] (
char *)ch->chunk.GetBuffer();
746 std::string &checkSumType )
748 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
752 std::string &checkSum,
753 std::string &checkSumType )
755 if( pUrl->IsMetalink() )
759 checkSum = redirector->
GetCheckSum( checkSumType );
763 if( pUrl->IsLocalFile() )
770 return cksHelper->
GetCheckSum( checkSum, checkSumType );
775 std::string dataServer; pFile->GetProperty(
"DataServer", dataServer );
776 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
783 std::vector<std::string> GetAddCks()
785 std::vector<std::string> ret;
786 for(
auto cksHelper : pAddCksHelpers )
788 std::string type = cksHelper->
GetType();
790 GetCheckSumImpl( cksHelper, cks, type );
791 ret.push_back( cks );
797 XRootDSource(
const XRootDSource &other);
798 XRootDSource &operator = (
const XRootDSource &other);
805 template<
typename READER>
806 inline void FillQueue( READER *reader )
811 uint16_t parallel = pParallel;
812 if( pNbConn < pMaxNbConn )
815 NbConnectedStrm( pDataServer );
817 if( pNbConn ) parallel *= pNbConn;
819 while( pChunks.size() < parallel && pCurrentOffset < pSize )
821 uint64_t chunkSize = pChunkSize;
822 if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823 chunkSize = pSize - pCurrentOffset;
825 char *buffer =
new char[chunkSize];
828 ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829 : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
831 pCurrentOffset += chunkSize;
844 template<
typename READER>
845 void SetOnDataConnectHandler( READER *reader )
848 pDataConnCB.reset(
new OnConnJob<READER>(
this, reader ) );
851 if( pDataServer.empty() )
return;
865 template<
typename READER>
871 using namespace XrdCl;
872 Log *log = DefaultEnv::GetLog();
877 std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
883 if( pChunks.empty() )
886 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
892 if( !ch->status.IsOK() )
895 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
896 pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
897 delete [] (
char *)ch->chunk.GetBuffer();
902 ci = std::move( ch->chunk );
904 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
909 for(
auto cksHelper : pAddCksHelpers )
927 this->status = *statusval;
931 chunk = ToChunk( response );
942 response->
Get( resp );
943 return std::move( *resp );
948 response->
Get( resp );
962 int64_t pCurrentOffset;
965 std::queue<ChunkHandler*> pChunks;
966 std::string pDataServer;
972 std::shared_ptr<CancellableJob> pDataConnCB;
978 class XRootDSourceZip:
public XRootDSource
984 XRootDSourceZip(
const std::string &filename,
987 uint8_t parallelChunks,
988 const std::string &ckSumType,
989 const std::vector<std::string> &addcks,
991 XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
993 pFilename( filename ),
1001 virtual ~XRootDSourceZip()
1014 using namespace XrdCl;
1015 Log *log = DefaultEnv::GetLog();
1017 pUrl->GetObfuscatedURL().c_str() );
1020 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
1021 pZipArchive->SetProperty(
"ReadRecovery", value );
1027 st = pZipArchive->OpenFile( pFilename );
1032 st = pZipArchive->Stat( info );
1041 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1043 auto st = pCkSumHelper->Initialize();
1044 if( !st.
IsOK() )
return st;
1045 for(
auto cksHelper : pAddCksHelpers )
1048 if( !st.
IsOK() )
return st;
1052 if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1053 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1055 pZipArchive->GetProperty(
"DataServer", pDataServer );
1064 SetOnDataConnectHandler( pZipArchive );
1080 return GetChunkImpl( pZipArchive, ci );
1087 std::string &checkSumType )
1089 return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1096 std::string &checkSumType,
1100 if( checkSumType ==
"zcrc32" )
1103 auto st = pZipArchive->GetCRC32( pFilename, cksum );
1104 if( !st.
IsOK() )
return st;
1107 ckSum.
Set(
"zcrc32" );
1108 ckSum.
Set(
reinterpret_cast<void*
>( &cksum ),
sizeof( uint32_t ) );
1109 char cksBuffer[265];
1110 ckSum.
Get( cksBuffer, 256 );
1111 checkSum =
"zcrc32:";
1118 env->
GetInt(
"ZipMtlnCksum", useMtlnCksum );
1119 if( useMtlnCksum && pUrl->IsMetalink() )
1123 checkSum = redirector->
GetCheckSum( checkSumType );
1128 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1129 return cksHelper->
GetCheckSum( checkSum, checkSumType );
1138 std::vector<std::string> GetAddCks()
1140 std::vector<std::string> ret;
1141 for(
auto cksHelper : pAddCksHelpers )
1143 std::string type = cksHelper->
GetType();
1145 GetCheckSumImpl( cks, type, cksHelper );
1146 ret.push_back( cks );
1161 XRootDSourceZip(
const XRootDSourceZip &other);
1162 XRootDSourceZip &operator = (
const XRootDSourceZip &other);
1164 const std::string pFilename;
1171 class XRootDSourceDynamic:
public Source
1180 return pFile->TryOtherServer();
1188 const std::string &ckSumType,
1189 const std::vector<std::string> &addcks ):
1190 Source( ckSumType, addcks ),
1191 pUrl( url ), pFile( new
XrdCl::
File() ), pCurrentOffset( 0 ),
1192 pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1199 virtual ~XRootDSourceDynamic()
1210 using namespace XrdCl;
1211 Log *log = DefaultEnv::GetLog();
1213 pUrl->GetObfuscatedURL().c_str() );
1216 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
1217 pFile->SetProperty(
"ReadRecovery", value );
1223 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1225 auto st = pCkSumHelper->Initialize();
1226 if( !st.
IsOK() )
return st;
1227 for(
auto cksHelper : pAddCksHelpers )
1230 if( !st.
IsOK() )
return st;
1234 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1235 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1237 std::string datasrv;
1238 pFile->GetProperty(
"DataServer", datasrv );
1253 virtual int64_t GetSize()
1263 pCurrentOffset = offset;
1282 using namespace XrdCl;
1290 char *buffer =
new char[pChunkSize];
1291 uint32_t bytesRead = 0;
1293 std::vector<uint32_t> cksums;
1295 ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1296 : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1310 if( bytesRead < pChunkSize )
1314 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1317 pCkSumHelper->Update( buffer, bytesRead );
1319 for(
auto cksHelper : pAddCksHelpers )
1320 cksHelper->
Update( buffer, bytesRead );
1324 pCurrentOffset += bytesRead;
1333 std::string &checkSumType )
1335 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1339 std::string &checkSum,
1340 std::string &checkSumType )
1342 if( pUrl->IsMetalink() )
1346 checkSum = redirector->
GetCheckSum( checkSumType );
1350 if( pUrl->IsLocalFile() )
1357 return cksHelper->
GetCheckSum( checkSum, checkSumType );
1362 std::string dataServer; pFile->GetProperty(
"DataServer", dataServer );
1363 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
1370 std::vector<std::string> GetAddCks()
1372 std::vector<std::string> ret;
1373 for(
auto cksHelper : pAddCksHelpers )
1375 std::string type = cksHelper->
GetType();
1377 GetCheckSumImpl( cksHelper, cks, type );
1378 ret.push_back( cks );
1392 XRootDSourceDynamic(
const XRootDSourceDynamic &other);
1393 XRootDSourceDynamic &operator = (
const XRootDSourceDynamic &other);
1396 int64_t pCurrentOffset;
1397 uint32_t pChunkSize;
1405 class XRootDSourceXCp:
public Source
1411 XRootDSourceXCp(
const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1412 pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1428 int64_t fileSize = -1;
1430 if( pUrl->IsMetalink() )
1434 fileSize = redirector->
GetSize();
1442 if( !st.
IsOK() )
return st;
1445 for( itr = li->
Begin(); itr != li->
End(); ++itr)
1447 std::string url =
"root://" + itr->GetAddress() +
"/" + pUrl->
GetPath();
1448 pReplicas.push_back( url );
1454 std::stringstream ss;
1455 ss <<
"XCp sources: ";
1457 std::vector<std::string>::iterator itr;
1458 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1464 pXCpCtx =
new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1466 return pXCpCtx->Initialize();
1472 virtual int64_t GetSize()
1474 return pXCpCtx->GetSize();
1499 st = pXCpCtx->GetChunk( ci );
1509 std::string &checkSumType )
1511 if( pUrl->IsMetalink() )
1515 checkSum = redirector->
GetCheckSum( checkSumType );
1519 std::vector<std::string>::iterator itr;
1520 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1524 checkSumType, url );
1525 if( st.
IsOK() )
return st;
1534 std::vector<std::string> GetAddCks()
1536 return std::vector<std::string>();
1545 std::vector<std::string>::iterator itr = pReplicas.begin();
1546 for( ; itr < pReplicas.end() ; ++itr )
1549 if( st.
IsOK() )
return st;
1559 std::vector<std::string> pReplicas;
1560 uint32_t pChunkSize;
1561 uint16_t pParallelChunks;
1563 uint64_t pBlockSize;
1569 class StdOutDestination:
public Destination
1575 StdOutDestination(
const std::string &ckSumType ):
1576 Destination( ckSumType ), pCurrentOffset(0)
1583 virtual ~StdOutDestination()
1594 ENOTSUP,
"Cannot continue to stdout." );
1597 return pCkSumHelper->Initialize();
1617 using namespace XrdCl;
1618 Log *log = DefaultEnv::GetLog();
1623 " %llu, got %llu", (
unsigned long long) pCurrentOffset, (
unsigned long long) ci.
GetOffset() );
1632 wr =
write( 1, cursor, length );
1640 pCurrentOffset += wr;
1664 std::string &checkSumType )
1667 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1682 virtual int64_t GetSize()
1688 StdOutDestination(
const StdOutDestination &other);
1689 StdOutDestination &operator = (
const StdOutDestination &other);
1690 uint64_t pCurrentOffset;
1696 class XRootDDestination:
public Destination
1702 XRootDDestination(
const XrdCl::URL &url, uint8_t parallelChunks,
1704 Destination( ckSumType ),
1706 pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1713 virtual ~XRootDDestination()
1725 if( !cptarget.empty() )
1738 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1744 " on failure: %s", st.
ToString().c_str() );
1753 using namespace XrdCl;
1754 Log *log = DefaultEnv::GetLog();
1756 pUrl.GetObfuscatedURL().c_str() );
1759 DefaultEnv::GetEnv()->GetString(
"WriteRecovery", value );
1760 pFile->SetProperty(
"WriteRecovery", value );
1764 flags |= OpenFlags::Delete;
1765 else if( !pContinue )
1766 flags |= OpenFlags::New;
1769 flags |= OpenFlags::POSC;
1775 flags |= OpenFlags::MakePath;
1777 Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1783 if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1784 ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1786 std::string datasrv;
1787 pFile->GetProperty(
"DataServer", datasrv );
1798 if( !cptarget.empty() )
1800 std::string targeturl;
1801 pFile->GetProperty(
"LastURL", targeturl );
1803 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1808 cptarget.c_str(), targeturl.c_str() );
1812 st = pFile->Stat(
false, info );
1818 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1819 return pCkSumHelper->Initialize();
1829 return pFile->Close();
1840 using namespace XrdCl;
1841 if( !pFile->IsOpen() )
1850 if( pChunks.size() < pParallel )
1851 return QueueChunk( std::move( ci ) );
1857 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1860 delete [] (
char*)ch->chunk.GetBuffer();
1861 if( !ch->status.IsOK() )
1863 Log *log = DefaultEnv::GetLog();
1865 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
1866 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1874 return CheckIfRetriable( ch->status );
1877 return QueueChunk( std::move( ci ) );
1883 virtual int64_t GetSize()
1891 void CleanUpChunks()
1893 while( !pChunks.empty() )
1898 delete [] (
char *)ch->chunk.GetBuffer();
1910 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1916 ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1917 : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1921 delete [] (
char*)ch->chunk.GetBuffer();
1935 while( !pChunks.empty() )
1940 if( !ch->status.IsOK() )
1946 st = CheckIfRetriable( ch->status );
1948 delete [] (
char *)ch->chunk.GetBuffer();
1958 std::string &checkSumType )
1960 if( pUrl.IsLocalFile() )
1967 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1972 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
1988 const std::string& GetLastURL()
const
1996 const std::string& GetWrtRecoveryRedir()
const
1998 return pWrtRecoveryRedir;
2002 XRootDDestination(
const XRootDDestination &other);
2003 XRootDDestination &operator = (
const XRootDDestination &other);
2013 chunk(std::move( ci ) ) {}
2018 this->status = *statusval;
2030 if( status.
IsOK() )
return status;
2037 if( pFile->GetProperty(
"WrtRecoveryRedir", value ) )
2039 pWrtRecoveryRedir = value;
2040 if( pFile->GetProperty(
"LastURL", value ) ) pLastURL = value;
2050 std::queue<ChunkHandler *> pChunks;
2053 std::string pWrtRecoveryRedir;
2054 std::string pLastURL;
2062 class XRootDZipDestination:
public Destination
2068 XRootDZipDestination(
const XrdCl::URL &url,
const std::string &fn,
2070 Destination(
"zcrc32" ),
2072 pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2079 virtual ~XRootDZipDestination()
2088 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2096 " on failure: %s", st.
ToString().c_str() );
2106 using namespace XrdCl;
2107 Log *log = DefaultEnv::GetLog();
2109 pUrl.GetObfuscatedURL().c_str() );
2112 DefaultEnv::GetEnv()->GetString(
"WriteRecovery", value );
2113 pZip->SetProperty(
"WriteRecovery", value );
2119 auto st = fs.Stat( pUrl.GetPath(), info );
2121 flags |= OpenFlags::New;
2124 flags |= OpenFlags::POSC;
2130 flags |= OpenFlags::MakePath;
2138 if( !cptarget.empty() )
2140 std::string targeturl;
2141 pZip->GetProperty(
"LastURL", targeturl );
2142 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2151 return pCkSumHelper->Initialize();
2160 auto st = pCkSumHelper->GetRawCheckSum(
"zcrc32", crc32 );
2161 if( !st.
IsOK() )
return st;
2162 pZip->UpdateMetadata( crc32 );
2175 using namespace XrdCl;
2180 if( pChunks.size() < pParallel )
2181 return QueueChunk( std::move( ci ) );
2187 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2190 delete [] (
char*)ch->chunk.GetBuffer();
2191 if( !ch->status.IsOK() )
2193 Log *log = DefaultEnv::GetLog();
2195 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
2196 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2203 return CheckIfRetriable( ch->status );
2206 return QueueChunk( std::move( ci ) );
2212 virtual int64_t GetSize()
2220 void CleanUpChunks()
2222 while( !pChunks.empty() )
2227 delete [] (
char *)ch->chunk.GetBuffer();
2249 st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2253 delete [] (
char*)ch->chunk.GetBuffer();
2267 while( !pChunks.empty() )
2272 if( !ch->status.IsOK() )
2278 st = CheckIfRetriable( ch->status );
2280 delete [] (
char *)ch->chunk.GetBuffer();
2290 std::string &checkSumType )
2306 const std::string& GetLastURL()
const
2314 const std::string& GetWrtRecoveryRedir()
const
2316 return pWrtRecoveryRedir;
2320 XRootDZipDestination(
const XRootDDestination &other);
2321 XRootDZipDestination &operator = (
const XRootDDestination &other);
2331 chunk( std::move( ci ) ) {}
2336 this->status = *statusval;
2348 if( status.
IsOK() )
return status;
2355 if( pZip->GetProperty(
"WrtRecoveryRedir", value ) )
2357 pWrtRecoveryRedir = value;
2358 if( pZip->GetProperty(
"LastURL", value ) ) pLastURL = value;
2366 std::string pFilename;
2369 std::queue<ChunkHandler *> pChunks;
2372 std::string pWrtRecoveryRedir;
2373 std::string pLastURL;
2383 using namespace std::chrono;
2384 auto since_epoch = high_resolution_clock::now().time_since_epoch();
2385 return duration_cast<nanoseconds>( since_epoch );
2393 return sec * 1000000000;
2401 #if __cplusplus >= 201103L
2402 using namespace std::chrono;
2403 std::this_thread::sleep_for( nanoseconds( nsec ) );
2406 req.tv_sec = nsec /
to_nsec( 1 );
2407 req.tv_nsec = nsec %
to_nsec( 1 );
2408 nanosleep( &req, 0 );
2417 ClassicCopyJob::ClassicCopyJob( uint16_t jobId,
2420 CopyJob( jobId, jobProperties, jobResults )
2434 std::string checkSumMode;
2435 std::string checkSumType;
2436 std::string checkSumPreset;
2437 std::string zipSource;
2438 uint16_t parallelChunks;
2441 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442 rmOnBadCksum, continue_, zipappend, doserver;
2443 int32_t nbXcpSources;
2445 long long xRateThreshold;
2447 std::vector<std::string> addcksums;
2478 if( force && continue_ )
2480 "Invalid argument combination: continue + force." );
2482 if( zipappend && ( continue_ || force ) )
2484 "Invalid argument combination: ( continue | force ) + zip-append." );
2489 std::unique_ptr<timer_sec_t> cptimer;
2490 if( cpTimeout ) cptimer.reset(
new timer_sec_t() );
2495 if( rmOnBadCksum ) posc =
true;
2500 if( checkSumType ==
"auto" )
2503 if( checkSumType.empty() )
2506 log->
Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2509 if( cptimer && cptimer->elapsed() > cpTimeout )
2515 std::unique_ptr<Source> src;
2517 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2519 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2520 checkSumType, addcksums , doserver) );
2521 else if(
GetSource().GetProtocol() ==
"stdio" )
2522 src.reset(
new StdInSource( checkSumType, chunkSize, addcksums ) );
2526 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2528 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2532 if( !st.
IsOK() )
return SourceError( st );
2533 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2535 if( cptimer && cptimer->elapsed() > cpTimeout )
2538 std::unique_ptr<Destination> dest;
2541 if(
GetTarget().GetProtocol() ==
"stdio" )
2542 dest.reset(
new StdOutDestination( checkSumType ) );
2543 else if( zipappend )
2546 size_t pos = fn.rfind(
'/' );
2547 if( pos != std::string::npos )
2548 fn = fn.substr( pos + 1 );
2549 int64_t size = src->GetSize();
2550 dest.reset(
new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *
this ) );
2557 if( src->GetSize() >= 0 )
2560 std::ostringstream o; o << src->GetSize();
2561 params[
"oss.asize"] = o.str();
2565 dest.reset(
new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *
this ) );
2568 dest->SetForce( force );
2569 dest->SetPOSC( posc );
2570 dest->SetCoerce( coerce );
2571 dest->SetMakeDir( makeDir );
2572 dest->SetContinue( continue_ );
2573 st = dest->Initialize();
2574 if( !st.
IsOK() )
return DestinationError( st );
2576 if( cptimer && cptimer->elapsed() > cpTimeout )
2584 size -= dest->GetSize();
2586 if( !st.
IsOK() )
return SetResult( st );
2590 uint64_t total_processed = 0;
2591 uint64_t processed = 0;
2593 uint16_t threshold_interval = parallelChunks;
2594 bool threshold_draining =
false;
2595 timer_nsec_t threshold_timer;
2598 st = src->GetChunk( pageInfo );
2600 return SourceError( st);
2605 if( cptimer && cptimer->elapsed() > cpTimeout )
2610 auto elapsed = (
time_nsec() - start ).count();
2611 double transferred = total_processed + pageInfo.
GetLength();
2612 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2618 transferred > expected )
2620 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2625 if( xRateThreshold )
2627 auto elapsed = threshold_timer.elapsed();
2628 double transferred = processed + pageInfo.
GetLength();
2629 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2635 transferred < expected &&
2636 threshold_interval == 0 )
2638 if( !threshold_draining )
2641 " trying different source!" );
2644 "The transfer rate dropped below "
2645 "requested threshold!" );
2646 threshold_draining =
true;
2652 threshold_timer.reset();
2653 threshold_interval = parallelChunks;
2654 threshold_draining =
false;
2658 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2661 total_processed += pageInfo.
GetLength();
2664 st = dest->PutChunk( std::move( pageInfo ) );
2670 pResults->
Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671 return SetResult( st );
2673 return DestinationError( st );
2686 return DestinationError( st );
2693 std::vector<xattr_t> xattrs;
2694 st = src->GetXAttr( xattrs );
2695 if( !st.
IsOK() )
return SourceError( st );
2696 st = dest->SetXAttr( xattrs );
2697 if( !st.
IsOK() )
return DestinationError( st );
2704 if( src->GetSize() >= 0 && size != total_processed )
2706 log->
Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2707 "received %llu bytes.", (
unsigned long long) size, (
unsigned long long) total_processed );
2715 st = dest->Finalize();
2717 return DestinationError( st );
2722 if( checkSumMode !=
"none" )
2725 checkSumMode.c_str() );
2726 std::string sourceCheckSum;
2727 std::string targetCheckSum;
2729 if( cptimer && cptimer->elapsed() > cpTimeout )
2735 timeval oStart, oEnd;
2738 if( checkSumMode ==
"end2end" || checkSumMode ==
"source" ||
2739 !checkSumPreset.empty() )
2741 gettimeofday( &oStart, 0 );
2742 if( !checkSumPreset.empty() )
2744 sourceCheckSum = checkSumType +
":";
2750 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2752 gettimeofday( &oEnd, 0 );
2755 return SourceError( st );
2757 pResults->
Set(
"sourceCheckSum", sourceCheckSum );
2760 if( !addcksums.empty() )
2761 pResults->
Set(
"additionalCkeckSum", src->GetAddCks() );
2763 if( cptimer && cptimer->elapsed() > cpTimeout )
2769 timeval tStart, tEnd;
2771 if( checkSumMode ==
"end2end" || checkSumMode ==
"target" )
2773 gettimeofday( &tStart, 0 );
2774 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2776 return DestinationError( st );
2777 gettimeofday( &tEnd, 0 );
2778 pResults->
Set(
"targetCheckSum", targetCheckSum );
2781 if( cptimer && cptimer->elapsed() > cpTimeout )
2787 auto sanitize_cksum = [](
char c )
2790 if( std::isalpha( c ) )
return std::tolower( c, loc );
2794 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795 sourceCheckSum.begin(), sanitize_cksum );
2797 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798 targetCheckSum.begin(), sanitize_cksum );
2803 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2806 if( sourceCheckSum == targetCheckSum )
2815 i.
cksum = sourceCheckSum;
2831 log->
Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2834 st = dest->Finalize();
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t read(int fildes, void *buf, size_t nbyte)
const char * XrdSysE2T(int errcode)
int Set(const char *csName)
int Get(char *Buff, int Blen)
void Get(Type &object)
Retrieve the object being held.
Check sum helper for stdio.
XRootDStatus Initialize()
Initialize.
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
const std::string & GetType()
void Update(const void *buffer, uint32_t size)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Interface for a job to be run by the job manager.
Iterator Begin()
Get the location begin iterator.
LocationList::iterator Iterator
Iterator over locations.
Iterator End()
Get the location end iterator.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
uint64_t GetSize() const
Get size (in bytes)
std::map< std::string, std::string > ParamsMap
void SetParams(const std::string ¶ms)
Set params.
std::string GetLocation() const
Get location (protocol://host:port/path)
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetPath() const
Get the path.
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
static bool HasXAttr(const XrdCl::URL &url)
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static bool HasPgRW(const XrdCl::URL &url)
An interface for metadata redirectors.
virtual long long GetSize() const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
virtual std::string GetCheckSum(const std::string &type) const =0
const uint16_t errUninitialized
const uint16_t errErrorResponse
const char *const DefaultCpTarget
const uint16_t errOperationExpired
const uint16_t errNotImplemented
Operation is not implemented.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint16_t stError
An error occurred that could potentially be retried.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
const uint16_t errOSError
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
const uint16_t errNotSupported
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const int DefaultZipMtlnCksum
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.