2434 std::string checkSumMode;
2435 std::string checkSumType;
2436 std::string checkSumPreset;
2437 std::string zipSource;
2438 uint16_t parallelChunks;
2441 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442 rmOnBadCksum, continue_, zipappend, doserver;
2443 int32_t nbXcpSources;
2445 long long xRateThreshold;
2447 std::vector<std::string> addcksums;
2478 if( force && continue_ )
2480 "Invalid argument combination: continue + force." );
2482 if( zipappend && ( continue_ || force ) )
2484 "Invalid argument combination: ( continue | force ) + zip-append." );
2489 std::unique_ptr<timer_sec_t> cptimer;
2490 if( cpTimeout ) cptimer.reset(
new timer_sec_t() );
2495 if( rmOnBadCksum ) posc =
true;
2500 if( checkSumType ==
"auto" )
2503 if( checkSumType.empty() )
2506 log->
Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2509 if( cptimer && cptimer->elapsed() > cpTimeout )
2515 std::unique_ptr<Source> src;
2517 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2519 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2520 checkSumType, addcksums , doserver) );
2521 else if(
GetSource().GetProtocol() ==
"stdio" )
2522 src.reset(
new StdInSource( checkSumType, chunkSize, addcksums ) );
2526 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2528 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2532 if( !st.
IsOK() )
return SourceError( st );
2533 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2535 if( cptimer && cptimer->elapsed() > cpTimeout )
2538 std::unique_ptr<Destination> dest;
2541 if(
GetTarget().GetProtocol() ==
"stdio" )
2542 dest.reset(
new StdOutDestination( checkSumType ) );
2543 else if( zipappend )
2546 size_t pos = fn.rfind(
'/' );
2547 if( pos != std::string::npos )
2548 fn = fn.substr( pos + 1 );
2549 int64_t size = src->GetSize();
2550 dest.reset(
new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *
this ) );
2557 if( src->GetSize() >= 0 )
2560 std::ostringstream o; o << src->GetSize();
2561 params[
"oss.asize"] = o.str();
2562 newDestUrl.SetParams( params );
2565 dest.reset(
new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *
this ) );
2568 dest->SetForce( force );
2569 dest->SetPOSC( posc );
2570 dest->SetCoerce( coerce );
2571 dest->SetMakeDir( makeDir );
2572 dest->SetContinue( continue_ );
2573 st = dest->Initialize();
2574 if( !st.
IsOK() )
return DestinationError( st );
2576 if( cptimer && cptimer->elapsed() > cpTimeout )
2584 size -= dest->GetSize();
2586 if( !st.
IsOK() )
return SetResult( st );
2590 uint64_t total_processed = 0;
2591 uint64_t processed = 0;
2593 uint16_t threshold_interval = parallelChunks;
2594 bool threshold_draining =
false;
2595 timer_nsec_t threshold_timer;
2598 st = src->GetChunk( pageInfo );
2600 return SourceError( st);
2605 if( cptimer && cptimer->elapsed() > cpTimeout )
2610 auto elapsed = (
time_nsec() - start ).count();
2611 double transferred = total_processed + pageInfo.
GetLength();
2612 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2618 transferred > expected )
2620 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2625 if( xRateThreshold )
2627 auto elapsed = threshold_timer.elapsed();
2628 double transferred = processed + pageInfo.
GetLength();
2629 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2635 transferred < expected &&
2636 threshold_interval == 0 )
2638 if( !threshold_draining )
2641 " trying different source!" );
2644 "The transfer rate dropped below "
2645 "requested threshold!" );
2646 threshold_draining =
true;
2652 threshold_timer.reset();
2653 threshold_interval = parallelChunks;
2654 threshold_draining =
false;
2658 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2661 total_processed += pageInfo.
GetLength();
2664 st = dest->PutChunk( std::move( pageInfo ) );
2670 pResults->
Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671 return SetResult( st );
2673 return DestinationError( st );
2686 return DestinationError( st );
2693 std::vector<xattr_t> xattrs;
2694 st = src->GetXAttr( xattrs );
2695 if( !st.
IsOK() )
return SourceError( st );
2696 st = dest->SetXAttr( xattrs );
2697 if( !st.
IsOK() )
return DestinationError( st );
2704 if( src->GetSize() >= 0 && size != total_processed )
2706 log->
Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2707 "received %llu bytes.", (
unsigned long long) size, (
unsigned long long) total_processed );
2715 st = dest->Finalize();
2717 return DestinationError( st );
2722 if( checkSumMode !=
"none" )
2725 checkSumMode.c_str() );
2726 std::string sourceCheckSum;
2727 std::string targetCheckSum;
2729 if( cptimer && cptimer->elapsed() > cpTimeout )
2735 timeval oStart, oEnd;
2738 if( checkSumMode ==
"end2end" || checkSumMode ==
"source" ||
2739 !checkSumPreset.empty() )
2741 gettimeofday( &oStart, 0 );
2742 if( !checkSumPreset.empty() )
2744 sourceCheckSum = checkSumType +
":";
2750 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2752 gettimeofday( &oEnd, 0 );
2755 return SourceError( st );
2757 pResults->
Set(
"sourceCheckSum", sourceCheckSum );
2760 if( !addcksums.empty() )
2761 pResults->
Set(
"additionalCkeckSum", src->GetAddCks() );
2763 if( cptimer && cptimer->elapsed() > cpTimeout )
2769 timeval tStart, tEnd;
2771 if( checkSumMode ==
"end2end" || checkSumMode ==
"target" )
2773 gettimeofday( &tStart, 0 );
2774 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2776 return DestinationError( st );
2777 gettimeofday( &tEnd, 0 );
2778 pResults->
Set(
"targetCheckSum", targetCheckSum );
2781 if( cptimer && cptimer->elapsed() > cpTimeout )
2787 auto sanitize_cksum = [](
char c )
2790 if( std::isalpha( c ) )
return std::tolower( c, loc );
2794 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795 sourceCheckSum.begin(), sanitize_cksum );
2797 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798 targetCheckSum.begin(), sanitize_cksum );
2803 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2806 if( sourceCheckSum == targetCheckSum )
2815 i.
cksum = sourceCheckSum;
2827 st = fs.Rm( newDestUrl.GetPath() );
2831 log->
Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2834 st = dest->Finalize();
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
PropertyList * pProperties
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
Send file/filesystem queries to an XRootD cluster.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Info(uint64_t topic, const char *format,...)
Print an info.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
std::map< std::string, std::string > ParamsMap
const std::string & GetPath() const
Get the path.
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static bool HasXAttr(const XrdCl::URL &url)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
uint32_t GetLength() const
Get the data length.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.