65 o <<
"WaitTask for: 0x" << handler->
GetRequest();
69 virtual time_t
Run( time_t now )
71 pHandler->WaitDone( now );
97 virtual void Run(
void *arg )
99 pHandler->HandleResponse();
111 const int sst = pSendingState.fetch_or( kSawResp );
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
119 log->
Dump(
XRootDMsg,
"[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
131 if( pOksofarAsAnswer )
134 while( pResponse ) pCV.
Wait();
141 log->
Warning(
ExDbgMsg,
"[%s] MsgHandler is examining a response although "
142 "it already owns a response: %p (message: %s ).",
148 if( msg->GetSize() < 8 )
188 pBodyReader->SetDataLength( dlen );
245 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_oksofar response to request "
249 if( !pOksofarAsAnswer )
251 pPartialResps.emplace_back( std::move( pResponse ) );
262 pTimeoutFence.store(
true, std::memory_order_relaxed );
271 pTimeoutFence.store(
true, std::memory_order_relaxed );
280 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_status response to request "
299 pTimeoutFence.store(
true, std::memory_order_relaxed );
366 pPartialResps.push_back( std::move( pResponse ) );
379 pPageReader.reset(
new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380 pPageReader->SetRsp( rspst );
395 pResponse->GetCursor() )
434 int *qryResponse =
nullptr;
436 qryResult.
Get( qryResponse );
438 pHosts->back().flags = *qryResponse;
440 qryResponse =
nullptr;
443 qryResult.
Get( qryResponse );
445 pHosts->back().protocol = *qryResponse;
471 pAggregatedWaitTime = 0;
480 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_ok response to request %s",
490 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_status response to request %s",
503 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_oksofar response to request %s",
516 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
517 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
518 log->
Dump(
XRootDMsg,
"[%s] Got a kXR_error response to request %s "
542 char *urlInfoBuff =
new char[rsp->
hdr.
dlen-3];
543 urlInfoBuff[rsp->
hdr.
dlen-4] = 0;
544 memcpy( urlInfoBuff, rsp->
body.redirect.host, rsp->
hdr.
dlen-4 );
545 std::string urlInfo = urlInfoBuff;
546 delete [] urlInfoBuff;
548 "message %s: %s, port %d", pUrl.
GetHostId().c_str(),
550 rsp->
body.redirect.port );
555 if( !pRedirectCounter )
558 "message %s, the last known error is: %s",
574 uint32_t flags = pHosts->back().flags;
575 if( !pHasLoadBalancer )
586 pLoadBalancer = pHosts->back();
587 log->
Dump(
XRootDMsg,
"[%s] Current server has been assigned "
588 "as a load-balancer for message %s",
591 HostList::iterator it;
592 for( it = pHosts->begin(); it != pHosts->end(); ++it )
593 it->loadBalancer =
false;
594 pHosts->back().loadBalancer =
true;
605 pEffectiveDataServerUrl =
new URL( pHosts->back().url );
610 std::vector<std::string> urlComponents;
614 std::ostringstream o;
616 o << urlComponents[0];
617 if( rsp->
body.redirect.port > 0 )
618 o <<
":" << rsp->
body.redirect.port <<
"/";
619 else if( rsp->
body.redirect.port < 0 )
634 std::string url( rsp->
body.redirect.host, rsp->
hdr.
dlen-4 );
640 std::string url( rsp->
body.redirect.host, rsp->
hdr.
dlen-4 );
642 pRedirectAsAnswer =
true;
646 URL newUrl =
URL( o.str() );
651 pUrl.
GetHostId().c_str(), urlInfo.c_str() );
668 std::ostringstream ossXrd;
671 for(URL::ParamsMap::const_iterator it = urlParams.begin();
672 it != urlParams.end(); ++it )
674 if( it->first.compare( 0, 4,
"xrd." ) &&
675 it->first.compare( 0, 6,
"xrdcl." ) )
678 ossXrd << it->first <<
'=' << it->second <<
'&';
681 std::string xrdCgi = ossXrd.str();
682 pRedirectUrl = newUrl.
GetURL();
685 if( urlComponents.size() > 1 )
688 pRedirectUrl += urlComponents[1];
689 std::ostringstream o;
690 o <<
"fake://fake:111//fake?";
691 o << urlComponents[1];
693 if( urlComponents.size() == 3 )
694 o <<
'?' << urlComponents[2];
700 pRedirectUrl += xrdCgi;
703 cgiURL =
URL( o.str() );
708 std::ostringstream o;
709 o <<
"fake://fake:111//fake?";
711 cgiURL =
URL( o.str() );
713 pRedirectUrl += xrdCgi;
723 pRedirectAsAnswer =
true;
725 if( pRedirectAsAnswer )
736 Status st = RewriteRequestRedirect( newUrl );
763 uint32_t waitSeconds = 0;
767 char *infoMsg =
new char[rsp->
hdr.
dlen-3];
769 memcpy( infoMsg, rsp->
body.wait.infomsg, rsp->
hdr.
dlen-4 );
770 log->
Dump(
XRootDMsg,
"[%s] Got kXR_wait response of %d seconds to "
771 "message %s: %s", pUrl.
GetHostId().c_str(),
775 waitSeconds = rsp->
body.wait.seconds;
779 log->
Dump(
XRootDMsg,
"[%s] Got kXR_wait response of 0 seconds to "
784 pAggregatedWaitTime += waitSeconds;
789 if( OmitWait( *pRequest, pLoadBalancer.
url ) )
793 if( pAggregatedWaitTime > maxWait )
805 Status st = RewriteRequestWait();
817 time_t resendTime = ::time(0)+waitSeconds;
819 if( resendTime < pExpiration )
821 log->
Debug(
ExDbgMsg,
"[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
826 taskMgr->
RegisterTask(
new WaitTask(
this ), resendTime );
830 log->
Debug(
XRootDMsg,
"[%s] Wait time is too long, timing out %s",
855 log->
Dump(
XRootDMsg,
"[%s] Got kXR_waitresp response of %d seconds to "
857 rsp->
body.waitresp.seconds,
867 log->
Dump(
XRootDMsg,
"[%s] Got unrecognized response %d to "
886 log->
Dump(
XRootDMsg,
"[%s] Stream event reported for msg %s",
892 if( pTimeoutFence.load( std::memory_order_relaxed ) )
895 HandleError( status );
904 uint32_t &bytesRead )
910 return pPageReader->Read( *socket, bytesRead );
912 return pBodyReader->Read( *socket, bytesRead );
926 const int sst = pSendingState.fetch_or( kSendDone );
929 if( status.
IsOK() && ( sst & kSendDone ) )
return;
933 if( !status.
IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
935 log->
Error(
XRootDMsg,
"[%s] Unexpected error for message %s. Trying to "
938 HandleError( status );
942 if( sst & kFinalResp )
944 log->
Dump(
XRootDMsg,
"[%s] Got late notification that outgoing message %s was "
945 "sent, already have final response, queuing handler callback.",
953 log->
Dump(
XRootDMsg,
"[%s] Got late notification that message %s has "
954 "been successfully sent.",
964 log->
Dump(
XRootDMsg,
"[%s] Message %s has been successfully sent.",
974 log->
Error(
XRootDMsg,
"[%s] Impossible to send message %s. Trying to "
977 HandleError( status );
1004 uint32_t &bytesWritten )
1009 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1019 int fLen = 0, lLen = 0;
1027 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1028 memcpy( pPgWrtCksumBuff.
GetBuffer(), &digest,
sizeof( uint32_t ) );
1031 uint32_t btsLeft = chunk.
length - pAsyncOffset;
1032 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen :
XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1033 if( pglen > btsLeft ) pglen = btsLeft;
1034 char* pgbuf =
static_cast<char*
>( chunk.
buffer ) + pAsyncOffset;
1036 while( btsLeft > 0 )
1039 while( pPgWrtCksumBuff.
GetCursor() <
sizeof( uint32_t ) )
1041 uint32_t dgstlen =
sizeof( uint32_t ) - pPgWrtCksumBuff.
GetCursor();
1044 Status st = socket->
Send( dgstbuf, dgstlen, btswrt );
1045 if( !st.
IsOK() )
return st;
1046 bytesWritten += btswrt;
1052 Status st = socket->
Send( pgbuf, pglen, btswrt );
1053 if( !st.
IsOK() )
return st;
1057 bytesWritten += btswrt;
1058 pAsyncOffset += btswrt;
1064 ++pPgWrtCurrentPageNb;
1065 if( pPgWrtCurrentPageNb < nbpgs )
1069 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1070 memcpy( pPgWrtCksumBuff.
GetBuffer(), &digest,
sizeof( uint32_t ) );
1074 if( pglen > btsLeft ) pglen = btsLeft;
1076 pPgWrtCurrentPageOffset = 0;
1080 pPgWrtCurrentPageOffset += btswrt;
1084 else if( !pChunkList->empty() )
1086 size_t size = pChunkList->size();
1087 for(
size_t i = pAsyncChunkIndex ; i < size; ++i )
1089 char *buffer = (
char*)(*pChunkList)[i].buffer;
1090 uint32_t size = (*pChunkList)[i].length;
1091 size_t leftToBeWritten = size - pAsyncOffset;
1093 while( leftToBeWritten )
1096 Status st = socket->
Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1097 bytesWritten += btswrt;
1099 pAsyncOffset += btswrt;
1100 leftToBeWritten -= btswrt;
1120 log->
Debug(
XRootDMsg,
"[%s] Channel is encrypted: cannot use kernel buffer.",
1126 pChunkList->push_back(
ChunkInfo( 0, ret, ubuff ) );
1133 while( !pKBuff->
Empty() )
1137 bytesWritten += btswrt;
1141 log->
Debug(
XRootDMsg,
"[%s] Request %s payload (kernel buffer) transferred to socket.",
1162 pTimeoutFence.store(
false, std::memory_order_relaxed );
1168 void XRootDMsgHandler::HandleResponse()
1179 const int sst = pSendingState.fetch_or( kFinalResp );
1180 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
1188 XRootDStatus *status = ProcessStatus();
1189 AnyObject *response = 0;
1192 log->Debug(
ExDbgMsg,
"[%s] Calling MsgHandler: %p (message: %s ) "
1196 status->ToString().c_str() );
1198 if( status->IsOK() )
1200 Status st = ParseResponse( response );
1205 status =
new XRootDStatus( st );
1215 pRdirEntry->status = *status;
1216 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1222 if( pSidMgr && finalrsp )
1225 if( status->IsOK() || !pMsgInFly ||
1230 HostList *hosts = pHosts.release();
1232 pHosts.reset(
new HostList( *hosts ) );
1249 pTimeoutFence.store(
false, std::memory_order_relaxed );
1258 XRootDStatus *XRootDMsgHandler::ProcessStatus()
1260 XRootDStatus *st =
new XRootDStatus( pStatus );
1265 if( !pStatus.
IsOK() && rsp )
1269 st->errNo = rsp->
body.error.errnum;
1272 std::string errmsg( rsp->
body.error.errmsg, rsp->
hdr.
dlen-5 );
1274 errmsg +=
" Last seen error: " + pLastError.
ToString();
1275 st->SetErrorMessage( errmsg );
1278 st->SetErrorMessage( pRedirectUrl );
1287 Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1301 log->Error(
XRootDMsg,
"Internal Error: unable to process redirect" );
1306 uint32_t length = 0;
1312 if( pPartialResps.empty() )
1314 buffer = rsp->
body.buffer.data;
1323 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1330 buff.Allocate( length );
1331 uint32_t offset = 0;
1332 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1335 buff.Append( part->
body.buffer.data, part->
hdr.
dlen, offset );
1338 buff.Append( rsp->
body.buffer.data, rsp->
hdr.
dlen, offset );
1339 buffer = buff.GetBuffer();
1370 AnyObject *obj =
new AnyObject();
1372 char *nullBuffer =
new char[length+1];
1373 nullBuffer[length] = 0;
1374 memcpy( nullBuffer, buffer, length );
1376 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1377 "LocateInfo: %s", pUrl.
GetHostId().c_str(),
1379 LocationInfo *data =
new LocationInfo();
1381 if( data->ParseServerResponse( nullBuffer ) ==
false )
1385 delete [] nullBuffer;
1388 delete [] nullBuffer;
1400 AnyObject *obj =
new AnyObject();
1407 StatInfoVFS *data =
new StatInfoVFS();
1409 char *nullBuffer =
new char[length+1];
1410 nullBuffer[length] = 0;
1411 memcpy( nullBuffer, buffer, length );
1413 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1414 "StatInfoVFS: %s", pUrl.
GetHostId().c_str(),
1417 if( data->ParseServerResponse( nullBuffer ) ==
false )
1421 delete [] nullBuffer;
1424 delete [] nullBuffer;
1433 StatInfo *data =
new StatInfo();
1435 char *nullBuffer =
new char[length+1];
1436 nullBuffer[length] = 0;
1437 memcpy( nullBuffer, buffer, length );
1439 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as StatInfo: "
1443 if( data->ParseServerResponse( nullBuffer ) ==
false )
1447 delete [] nullBuffer;
1450 delete [] nullBuffer;
1463 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as ProtocolInfo",
1469 log->Error(
XRootDMsg,
"[%s] Got invalid redirect response.",
1474 AnyObject *obj =
new AnyObject();
1475 ProtocolInfo *data =
new ProtocolInfo( rsp->
body.protocol.pval,
1476 rsp->
body.protocol.flags );
1487 AnyObject *obj =
new AnyObject();
1488 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1489 "DirectoryList", pUrl.
GetHostId().c_str(),
1496 DirectoryList *data =
new DirectoryList();
1497 data->SetParentName( path );
1500 char *nullBuffer =
new char[length+1];
1501 nullBuffer[length] = 0;
1502 memcpy( nullBuffer, buffer, length );
1504 bool invalidrsp =
false;
1506 if( !pDirListStarted )
1509 pDirListStarted =
true;
1511 invalidrsp = !data->ParseServerResponse( pUrl.
GetHostId(), nullBuffer );
1514 invalidrsp = !data->ParseServerResponse( pUrl.
GetHostId(), nullBuffer, pDirListWithStat );
1520 delete [] nullBuffer;
1524 delete [] nullBuffer;
1535 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as OpenInfo",
1541 log->Error(
XRootDMsg,
"[%s] Got invalid open response.",
1546 AnyObject *obj =
new AnyObject();
1547 StatInfo *statInfo = 0;
1554 log->Dump(
XRootDMsg,
"[%s] Parsing StatInfo in response to %s",
1560 char *nullBuffer =
new char[rsp->
hdr.
dlen-11];
1561 nullBuffer[rsp->
hdr.
dlen-12] = 0;
1562 memcpy( nullBuffer, buffer+12, rsp->
hdr.
dlen-12 );
1564 statInfo =
new StatInfo();
1565 if( statInfo->ParseServerResponse( nullBuffer ) ==
false )
1570 delete [] nullBuffer;
1573 if( rsp->
hdr.
dlen < 12 || !statInfo )
1575 log->Error(
XRootDMsg,
"[%s] Unable to parse StatInfo in response "
1583 OpenInfo *data =
new OpenInfo( (uint8_t*)buffer,
1584 pResponse->GetSessionId(),
1596 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as ChunkInfo",
1600 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1606 if( pPartialResps[i]->GetSize() > 8 )
1613 if( pResponse->GetSize() > 8 )
1618 return pBodyReader->GetResponse( response );
1626 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as PageInfo",
1633 ChunkInfo chunk = pChunkList->front();
1634 bool sizeMismatch =
false;
1635 uint32_t currentOffset = 0;
1636 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1646 if( currentOffset + datalen > chunk.length )
1648 sizeMismatch =
true;
1652 currentOffset += datalen;
1656 size_t datalen = rspst->
status.
bdy.
dlen - NbPgPerRsp( rspst->
info.pgread.offset,
1658 if( currentOffset + datalen <= chunk.length )
1659 currentOffset += datalen;
1661 sizeMismatch =
true;
1666 if( pChunkStatus.front().sizeError || sizeMismatch )
1668 log->Error(
XRootDMsg,
"[%s] Handling response to %s: user supplied "
1669 "buffer is too small for the received data.",
1675 AnyObject *obj =
new AnyObject();
1676 PageInfo *pgInfo =
new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1677 std::move( pCrc32cDigests) );
1689 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1696 retries.reserve( pgcnt );
1700 for(
size_t i = 0; i < pgcnt; ++i )
1703 if( i == 0 ) len = cse->
dlFirst;
1704 else if( i == pgcnt - 1 ) len = cse->
dlLast;
1705 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1709 RetryInfo *info =
new RetryInfo( std::move( retries ) );
1710 AnyObject *obj =
new AnyObject();
1723 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as "
1724 "VectorReadInfo", pUrl.
GetHostId().c_str(),
1727 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1733 if( pPartialResps[i]->GetSize() > 8 )
1740 if( pResponse->GetSize() > 8 )
1745 return pBodyReader->GetResponse( response );
1753 int len = rsp->hdr.dlen;
1754 char* data = rsp->body.buffer.data;
1756 return ParseXAttrResponse( data, len, response );
1767 AnyObject *obj =
new AnyObject();
1768 log->Dump(
XRootDMsg,
"[%s] Parsing the response to %s as BinaryData",
1773 data->Allocate( length );
1774 data->Append( buffer, length );
1787 Status XRootDMsgHandler::ParseXAttrResponse(
char *data,
size_t len,
1788 AnyObject *&response )
1801 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1805 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1808 std::vector<XAttrStatus> resp;
1810 for(
kXR_char i = 0; i < nattr; ++i )
1813 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1821 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1826 resp.push_back( XAttrStatus( name, st ) );
1833 response =
new AnyObject();
1834 response->Set(
new std::vector<XAttrStatus>( std::move( resp ) ) );
1844 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1848 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1851 std::vector<XAttr> resp;
1852 resp.reserve( nattr );
1855 for(
kXR_char i = 0; i < nattr; ++i )
1858 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1866 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1871 resp.push_back( XAttr( name, st ) );
1875 for(
kXR_char i = 0; i < nattr; ++i )
1878 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1880 vlen = ntohl( vlen );
1883 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1886 resp[i].value.swap( value );
1893 response =
new AnyObject();
1894 response->Set(
new std::vector<XAttr>( std::move( resp ) ) );
1902 std::vector<XAttr> resp;
1907 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1911 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1913 vlen = ntohl( vlen );
1916 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1919 resp.push_back( XAttr( name, value ) );
1923 response =
new AnyObject();
1924 response->Set(
new std::vector<XAttr>( std::move( resp ) ) );
1938 Status XRootDMsgHandler::RewriteRequestRedirect(
const URL &newUrl )
1946 std::string xrdCgi =
"";
1947 std::ostringstream ossXrd;
1948 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1950 if( it->first.compare( 0, 4,
"xrd." ) )
1952 ossXrd << it->first <<
'=' << it->second <<
'&';
1955 xrdCgi = ossXrd.str();
1965 std::string surl = newUrl.
GetURL();
1966 (surl.find(
'?') == std::string::npos) ? (surl +=
'?') :
1967 ((*surl.rbegin() !=
'&') ? (surl +=
'&') : (surl +=
""));
1971 std::string surlLog = surl;
1975 log->Error(
XRootDMsg,
"[%s] Failed to build redirection URL from data: %s",
1976 newUrl.GetHostId().c_str(), surl.c_str());
1993 Status XRootDMsgHandler::RewriteRequestWait()
2028 void XRootDMsgHandler::HandleError( XRootDStatus status )
2036 if( pSidMgr && pMsgInFly && (
2047 if( !noreplicas ) pLastError = status;
2050 log->Debug(
XRootDMsg,
"[%s] Handling error while processing %s: %s.",
2052 status.ToString().c_str() );
2058 if( status.IsFatal() && status.code ==
errTlsError && status.errNo == EAGAIN )
2060 if( pSslErrCnt < MaxSslErrRetry )
2080 if( RetriableErrorResponse( status ) )
2082 UpdateTriedCGI(status.errNo);
2084 SwitchOnRefreshFlag();
2105 log->Error(
XRootDMsg,
"[%s] Unable to get the response to request %s",
2127 if( !status.IsFatal() && IsRetriable() )
2129 log->Info(
XRootDMsg,
"[%s] Retrying request: %s.",
2158 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2159 pRdirEntry.reset(
new RedirectEntry( pUrl.
GetLocation(), url.GetLocation(), entryType ) );
2163 pHosts->push_back( url );
2175 pSidMgr->ReleaseSID( req->
streamid );
2182 if( !url.IsLocalFile() )
2185 Status st = pSidMgr->AllocateSID( req->
streamid );
2188 log->Error(
XRootDMsg,
"[%s] Impossible to send message %s.",
2200 log->Debug(
ExDbgMsg,
"[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2204 return pPostMaster->
Redirect( pUrl, pRequest,
this );
2208 HandleLocalRedirect( &pUrl );
2213 log->Debug(
ExDbgMsg,
"[%s] Retry at server MsgHandler: %p (message: %s ).",
2216 return pPostMaster->
Send( pUrl, pRequest,
this,
true, pExpiration );
2223 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2233 if( pEffectiveDataServerUrl )
2236 delete pEffectiveDataServerUrl;
2237 pEffectiveDataServerUrl = 0;
2248 {
if (errNo ==
kXR_NotFound) cgi[
"triedrc"] =
"enoent";
2249 else if (errNo ==
kXR_IOError) cgi[
"triedrc"] =
"ioerr";
2250 else if (errNo ==
kXR_FSError) cgi[
"triedrc"] =
"fserr";
2261 HostList::reverse_iterator it;
2262 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2264 if( it->loadBalancer )
2267 tried +=
"," + it->url.GetHostName();
2274 cgi[
"tried"] = tried;
2283 void XRootDMsgHandler::SwitchOnRefreshFlag()
2309 void XRootDMsgHandler::HandleRspOrQueue()
2320 const int sst = pSendingState.fetch_or( kFinalResp );
2321 if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
2326 if( jobMgr->IsWorker() )
2331 log->Debug(
ExDbgMsg,
"[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2341 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2344 log->Debug(
ExDbgMsg,
"[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2348 if( !pLFileHandler )
2354 AnyObject *resp = 0;
2356 XRootDStatus st = pLFileHandler->
Open( url, pRequest, resp );
2374 bool XRootDMsgHandler::IsRetriable()
2378 if( value ==
"true" )
return true;
2391 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2406 bool XRootDMsgHandler::OmitWait( Message &request,
const URL &url )
2409 if( !url.IsMetalink() )
2421 VirtualRedirector *redirector = registry.Get( url );
2425 if( redirector->Count( request ) > 1 )
2434 bool XRootDMsgHandler::RetriableErrorResponse(
const Status &status )
2462 bool ret = pNotAuthorizedCounter < limit;
2463 ++pNotAuthorizedCounter;
2468 "[%s] Reached limit of NotAuthorized retries!",
2489 void XRootDMsgHandler::DumpRedirectTraceBack()
2491 if( pRedirectTraceBack.empty() )
return;
2493 std::stringstream sstrm;
2495 sstrm <<
"Redirect trace-back:\n";
2499 auto itr = pRedirectTraceBack.begin();
2500 sstrm <<
'\t' << counter <<
". " << (*itr)->ToString() <<
'\n';
2506 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2507 sstrm <<
'\t' << counter <<
". "
2508 << (*itr)->ToString( (*prev)->status.IsOK() ) <<
'\n';
2513 bool warn = !pStatus.
IsOK() &&
2520 log->Warning(
XRootDMsg,
"%s", sstrm.str().c_str() );
2522 log->Debug(
XRootDMsg,
"%s", sstrm.str().c_str() );
2527 template<
typename T>
2528 Status XRootDMsgHandler::ReadFromBuffer(
char *&buffer,
size_t &buflen, T& result )
2532 memcpy(&result, buffer,
sizeof(T));
2534 buffer +=
sizeof( T );
2535 buflen -=
sizeof( T );
2543 Status XRootDMsgHandler::ReadFromBuffer(
char *&buffer,
size_t &buflen, std::string &result )
2550 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2563 Status XRootDMsgHandler::ReadFromBuffer(
char *&buffer,
size_t &buflen,
2564 size_t size, std::string &result )
2570 result.append( buffer, size );
union ServerResponse::@0 body
struct ClientFattrRequest fattr
#define kXR_collapseRedir
ServerResponseStatus status
struct ClientDirlistRequest dirlist
static const int kXR_ckpXeq
struct ClientOpenRequest open
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
struct ClientChkPointRequest chkpoint
struct ServerResponseHeader hdr
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
struct ClientStatRequest stat
struct ClientLocateRequest locate
std::string obfuscateAuth(const std::string &input)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
bool IsMetalink() const
Is it a URL to a metalink.
const std::string & GetHostName() const
Get the name of the target host.
std::map< std::string, std::string > ParamsMap
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
void SetPassword(const std::string &password)
Set the password.
const std::string & GetProtocol() const
Get the protocol.
void SetParams(const std::string ¶ms)
Set params.
std::string GetURL() const
Get the URL.
std::string GetLocation() const
Get location (protocol://host:port/path)
const std::string & GetUserName() const
Get the username.
const std::string & GetPassword() const
Get the password.
const ParamsMap & GetParams() const
Get the URL params.
void SetProtocol(const std::string &protocol)
Set protocol.
bool IsValid() const
Is the url valid.
void SetUserName(const std::string &userName)
Set the username.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t InspectStatusRsp() override
friend class HandleRspJob
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
void WaitDone(time_t now)
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t errNotSupported
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
none object for initializing empty Optional
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version