XRootD
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor. More...
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue. More...
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO. More...
 
bool blocksize_str2value (const char *from, const char *str, long long &val, long long min, long long max) const
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters, XrdOucEnv *env)
 Parse configuration file. More...
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached. More...
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog () const
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace () const
 
bool is_prefetch_enabled () const
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
bool prefetch_str2value (const char *from, const char *str, int &val, int min, int max) const
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk. More...
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration. More...
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction. More...
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache. More...
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor. More...
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation. More...
 
static CacheGetInstance ()
 Singleton access. More...
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check. More...
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file) More...
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write) More...
 
static const int optRW = 0x0004
 File is read/write (o/w read/only) More...
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache. More...
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc). More...
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 162 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 158 of file XrdPfc.cc.

158  :
159  XrdOucCache("pfc"),
160  m_env(env),
161  m_log(logger, "XrdPfc_"),
162  m_trace(new XrdSysTrace("XrdPfc", logger)),
163  m_traceID("Cache"),
164  m_oss(0),
165  m_gstream(0),
166  m_purge_pin(0),
167  m_prefetch_condVar(0),
168  m_prefetch_enabled(false),
169  m_RAM_used(0),
170  m_RAM_write_queue(0),
171  m_RAM_std_size(0),
172  m_isClient(false),
173  m_active_cond(0)
174 {
175  // Default log level is Warning.
176  m_trace->What = 2;
177 }
XrdOucCache(const char *ctype)
Definition: XrdOucCache.hh:700

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 221 of file XrdPfc.cc.

222 {
223  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
224 
225  {
226  XrdSysMutexHelper lock(&m_RAM_mutex);
227  m_RAM_write_queue += b->get_size();
228  }
229 
230  m_writeQ.condVar.Lock();
231  if (fromRead)
232  m_writeQ.queue.push_back(b);
233  else
234  m_writeQ.queue.push_front(b);
235  m_writeQ.size++;
236  m_writeQ.condVar.Signal();
237  m_writeQ.condVar.UnLock();
238 }
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int get_size() const
Definition: XrdPfcFile.hh:136
File * get_file() const
Definition: XrdPfcFile.hh:140
long long m_offset
Definition: XrdPfcFile.hh:114
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:262

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 179 of file XrdPfc.cc.

180 {
181  const char* tpfx = "Attach() ";
182 
183  if (Cache::GetInstance().Decide(io))
184  {
185  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
186 
187  IO *cio;
188 
189  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
190  {
191  cio = new IOFileBlock(io, *this);
192  }
193  else
194  {
195  IOFile *iof = new IOFile(io, *this);
196 
197  if ( ! iof->HasFile())
198  {
199  delete iof;
200  // TODO - redirect instead. But this is kind of an awkward place for it.
201  // errno is set during IOFile construction.
202  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
203  return io;
204  }
205 
206  cio = iof;
207  }
208 
209  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
210  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
211 
212  return cio;
213  }
214  else
215  {
216  TRACE(Info, tpfx << "decision decline " << io->Path());
217  }
218  return io;
219 }
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:59
bool Debug
@ Error
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:215
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:137
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO* XrdOucCache::Attach

Obtain a new IO object that fronts existing XrdOucCacheIO.

◆ blocksize_str2value()

bool Cache::blocksize_str2value ( const char *  from,
const char *  str,
long long &  val,
long long  min,
long long  max 
) const

Definition at line 103 of file XrdPfcConfiguration.cc.

105 {
106  if (XrdOuca2x::a2sz(m_log, "Error parsing block-size", str, &val, min, max))
107  return false;
108 
109  if (val & 0xFFF) {
110  val &= ~0x0FFF;
111  val += 0x1000;
112  m_log.Emsg(from, "blocksize must be a multiple of 4 kB. Rounded up.");
113  }
114 
115  return true;
116 }
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95

References XrdOuca2x::a2sz(), and XrdSysError::Emsg().

+ Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 685 of file XrdPfc.cc.

686 {
687  XrdSysCondVarHelper lock(&m_active_cond);
688  m_purge_delay_set.clear();
689 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 134 of file XrdPfc.cc.

134 { return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters,
XrdOucEnv env 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
envoptional environment to use for configuration
Returns
parse status

Definition at line 431 of file XrdPfcConfiguration.cc.

432 {
433  // Indicate whether or not we are a client instance
434  const char *theINS = getenv("XRDINSTANCE");
435  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
436 
437  // Tell everyone else we are a caching proxy
438  XrdOucEnv::Export("XRDPFC", 1);
439 
440  XrdOucEnv emptyEnv;
441  XrdOucEnv *myEnv = env ? env : &emptyEnv;
442 
443  XrdOucStream Config(&m_log, theINS, myEnv, "=====> ");
444 
445  if (! config_filename || ! *config_filename)
446  {
447  TRACE(Error, "Config() configuration file not specified.");
448  return false;
449  }
450 
451  int fd;
452  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
453  {
454  TRACE( Error, "Config() can't open configuration file " << config_filename);
455  return false;
456  }
457 
458  Config.Attach(fd);
459  static const char *cvec[] = { "*** pfc plugin config:", 0 };
460  Config.Capture(cvec);
461 
462  // Obtain OFS configurator for OSS plugin.
463  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
464  &XrdVERSIONINFOVAR(XrdOucGetCache));
465  if (! ofsCfg) return false;
466 
467  TmpConfiguration tmpc;
468 
469  Configuration &CFG = m_configuration;
470 
471  // Adjust default parameters for client/serverless caching
472  if (m_isClient)
473  {
474  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
475  m_configuration.m_wqueue_blocks = 8;
476  m_configuration.m_wqueue_threads = 1;
477  }
478 
479  // If network checksum processing is the default, indicate so.
480  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
481 
482  // Actual parsing of the config file.
483  bool retval = true, aOK = true;
484  char *var;
485  while ((var = Config.GetMyFirstWord()))
486  {
487  if (! strcmp(var,"pfc.osslib"))
488  {
489  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
490  }
491  else if (! strcmp(var,"pfc.cschk"))
492  {
493  retval = xcschk(Config);
494  }
495  else if (! strcmp(var,"pfc.decisionlib"))
496  {
497  retval = xdlib(Config);
498  }
499  else if (! strcmp(var,"pfc.purgelib"))
500  {
501  retval = xplib(Config);
502  }
503  else if (! strcmp(var,"pfc.trace"))
504  {
505  retval = xtrace(Config);
506  }
507  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
508  {
509  m_configuration.m_allow_xrdpfc_command = true;
510  }
511  else if (! strncmp(var,"pfc.", 4))
512  {
513  retval = ConfigParameters(std::string(var+4), Config, tmpc);
514  }
515 
516  if ( ! retval)
517  {
518  TRACE(Error, "Config() error in parsing");
519  aOK = false;
520  }
521  }
522 
523  Config.Close();
524 
525  // Load OSS plugin.
526  auto orig_runmode = myEnv->Get("oss.runmode");
527  myEnv->Put("oss.runmode", "pfc");
528  if (m_configuration.is_cschk_cache())
529  {
530  char csi_conf[128];
531  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
532  {
533  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
534  } else {
535  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
536  return false;
537  }
538  }
539  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, myEnv))
540  {
541  ofsCfg->Plugin(m_oss);
542  }
543  else
544  {
545  TRACE(Error, "Config() Unable to create an OSS object");
546  return false;
547  }
548  if (orig_runmode) myEnv->Put("oss.runmode", orig_runmode);
549  else myEnv->Put("oss.runmode", "");
550 
551  // Test if OSS is operational, determine optional features.
552  aOK &= test_oss_basics_and_features();
553 
554  // sets default value for disk usage
555  XrdOssVSInfo sP;
556  {
557  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
558  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
559  {
560  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
561  return false;
562  }
563  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
564  {
565  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
566  m_configuration.m_meta_space.c_str());
567  return false;
568  }
569  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
570  {
571  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
572  return false;
573  }
574  if (sP.Total < 10ll << 20)
575  {
576  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
577  m_configuration.m_data_space.c_str());
578  return false;
579  }
580 
581  m_configuration.m_diskTotalSpace = sP.Total;
582 
583  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
584  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
585  {
586  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
587  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
588  aOK = false;
589  }
590  }
591  else aOK = false;
592 
593  if ( ! tmpc.m_fileUsageMax.empty())
594  {
595  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
596  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
597  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
598  {
599  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
600  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
601  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
602  {
603  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
604  aOK = false;
605  }
606 
607 
608  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
609  {
610  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
611  aOK = false;
612  }
613  }
614  else aOK = false;
615  }
616  }
617 
618  // sets flush frequency
619  if ( ! tmpc.m_flushRaw.empty())
620  {
621  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
622  {
623  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
624  &m_configuration.m_flushCnt,
625  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
626  {
627  return false;
628  }
629  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
630  }
631  else
632  {
633  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
634  &m_configuration.m_flushCnt, 100, 100000))
635  {
636  return false;
637  }
638  }
639  }
640 
641  // get number of available RAM blocks after process configuration
642  if (m_configuration.m_RamAbsAvailable == 0)
643  {
644  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
645  char buff[1024];
646  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
647  m_log.Say("Config info: ", buff);
648  }
649  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
650  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
651 
652  // Set tracing to debug if this is set in environment
653  char* cenv = getenv("XRDDEBUG");
654  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
655 
656  if (aOK)
657  {
658 // 000 001 010
659  const char *csc[] = {"off", "cache nonet", "nocache net notls",
660 // 011
661  "cache net notls",
662 // 100 101 110
663  "off", "cache nonet", "nocache net tls",
664 // 111
665  "cache net tls"};
666  char uvk[32];
667  if (m_configuration.m_cs_UVKeep < 0)
668  strcpy(uvk, "lru");
669  else
670  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
671  float ram_gb = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
672 
673  char urlcgi_blks[64] = "ignore", urlcgi_npref[32] = "ignore";
674  if (CFG.m_cgi_blocksize_allowed)
675  snprintf(urlcgi_blks, sizeof(urlcgi_blks), "%lldk %lldk",
676  CFG.m_cgi_min_bufferSize >> 10, CFG.m_cgi_max_bufferSize >> 10);
677  if (CFG.m_cgi_prefetch_allowed)
678  snprintf(urlcgi_npref, sizeof(urlcgi_npref), "%d %d",
680 
681  char buff[8192];
682  int loff = 0;
683  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
684  " pfc.cschk %s uvkeep %s\n"
685  " pfc.blocksize %lldk\n"
686  " pfc.prefetch %d\n"
687  " pfc.urlcgi blocksize %s prefetch %s\n"
688  " pfc.ram %.fg\n"
689  " pfc.writequeue %d %d\n"
690  " # Total available disk: %lld\n"
691  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
692  " pfc.spaces %s %s\n"
693  " pfc.trace %d\n"
694  " pfc.flush %lld\n"
695  " pfc.acchistorysize %d\n"
696  " pfc.onlyIfCachedMinBytes %lld\n"
697  " pfc.onlyIfCachedMinFrac %.2f\n",
698  config_filename,
699  csc[int(m_configuration.m_cs_Chk)], uvk,
700  m_configuration.m_bufferSize >> 10,
701  m_configuration.m_prefetch_max_blocks,
702  urlcgi_blks, urlcgi_npref,
703  ram_gb,
704  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
705  sP.Total,
706  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
707  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
708  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
709  m_configuration.m_data_space.c_str(),
710  m_configuration.m_meta_space.c_str(),
711  m_trace->What,
712  m_configuration.m_flushCnt,
713  m_configuration.m_accHistorySize,
714  m_configuration.m_onlyIfCachedMinSize,
715  m_configuration.m_onlyIfCachedMinFrac);
716 
717  if (m_configuration.is_dir_stat_reporting_on())
718  {
719  loff += snprintf(buff + loff, sizeof(buff) - loff,
720  " pfc.dirstats interval %d maxdepth %d (internal: size_of_dirlist %d, size_of_globlist %d)\n",
721  m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsStoreDepth,
722  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
723  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
724  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
725  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
726  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
727  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
728  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
729  }
730 
731  if (m_configuration.m_hdfsmode)
732  {
733  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
734  }
735 
736  if (m_configuration.m_username.empty())
737  {
738  char unameBuff[256];
739  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
740  m_configuration.m_username = unameBuff;
741  }
742  else
743  {
744  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
745  }
746 
747  m_log.Say(buff);
748 
749  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
750  }
751 
752  // Derived settings
753  m_prefetch_enabled = CFG.m_prefetch_max_blocks > 0 || CFG.m_cgi_max_prefetch_max_blocks > 0;
755 
756  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
757 
758  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
759 
760  // Create the ResourceMonitor and get it ready for starting the main thread function.
761  if (aOK)
762  {
763  m_res_mon = new ResourceMonitor(*m_oss);
764  m_res_mon->init_before_main();
765  }
766 
767  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
768 
769  if (ofsCfg) delete ofsCfg;
770 
771  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
772  // Building of xrdpfc_print fails when this is enabled.
773 #ifdef XRDPFC_CKSUM_TEST
774  {
775  int xxx = m_configuration.m_cs_Chk;
776 
777  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
778  {
779  Info::TestCksumStuff();
780  }
781 
782  m_configuration.m_cs_Chk = xxx;
783  }
784 #endif
785 
786  return aOK;
787 }
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:76
#define open
Definition: XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition: XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:121
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:122
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:115
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:100
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:116
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition: XrdPfc.hh:119
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:111
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:105
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:103
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:126
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:125
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:109
long long m_bufferSize
cache block size, default 128 kB
Definition: XrdPfc.hh:107
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:114
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition: XrdPfc.hh:104
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:110
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:117
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition: XrdPfc.hh:118
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:129
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:124
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:128
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::string m_diskUsageLWM
Definition: XrdPfc.hh:141
std::string m_diskUsageHWM
Definition: XrdPfc.hh:142
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:143
std::string m_fileUsageNominal
Definition: XrdPfc.hh:144
std::string m_flushRaw
Definition: XrdPfc.hh:146
std::string m_fileUsageMax
Definition: XrdPfc.hh:145

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Error, XrdOucEnv::Export(), XrdOucEnv::Get(), XrdOucEnv::GetPtr(), XrdPfc::ResourceMonitor::init_before_main(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cgi_blocksize_allowed, XrdPfc::Configuration::m_cgi_max_bufferSize, XrdPfc::Configuration::m_cgi_max_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_min_bufferSize, XrdPfc::Configuration::m_cgi_min_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_prefetch_allowed, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsInterval, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1000 of file XrdPfc.cc.

1001 {
1002  static const char* tpfx = "ConsiderCached ";
1003 
1004  TRACE(Debug, tpfx << curl);
1005 
1006  XrdCl::URL url(curl);
1007  std::string f_name = url.GetPath();
1008 
1009  File *file = nullptr;
1010  {
1011  XrdSysCondVarHelper lock(&m_active_cond);
1012  auto it = m_active.find(f_name);
1013  if (it != m_active.end()) {
1014  file = it->second;
1015  // If the file-open is in progress, `file` is a nullptr
1016  // so we cannot increase the reference count. For now,
1017  // simply treat it as if the file open doesn't exist instead
1018  // of trying to wait and see if it succeeds.
1019  if (file) {
1020  inc_ref_cnt(file, false, false);
1021  }
1022  }
1023  }
1024  if (file) {
1025  struct stat sbuff;
1026  int res = file->Fstat(sbuff);
1027  dec_ref_cnt(file, false);
1028  if (res)
1029  return res;
1030  // DecideIfConsideredCached() already called in File::Fstat().
1031  return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1032  }
1033 
1034  struct stat sbuff;
1035  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1036  if (res != XrdOssOK) {
1037  TRACE(Debug, tpfx << curl << " -> " << res);
1038  return res;
1039  }
1040  if (S_ISDIR(sbuff.st_mode))
1041  {
1042  TRACE(Debug, tpfx << curl << " -> EISDIR");
1043  return -EISDIR;
1044  }
1045 
1046  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1047  if (file_size < 0) {
1048  TRACE(Debug, tpfx << curl << " -> " << file_size);
1049  return (int) file_size;
1050  }
1051  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1052 
1053  return is_cached ? 0 : -EREMOTE;
1054 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define stat(a, b)
Definition: XrdPosix.hh:101
URL representation.
Definition: XrdClURL.hh:31
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:926
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:967
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:632
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 125 of file XrdPfc.cc.

126 {
127  assert (m_instance == 0);
128  m_instance = new Cache(logger, env);
129  return *m_instance;
130 }
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:158

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 137 of file XrdPfc.cc.

138 {
139  if (! m_decisionpoints.empty())
140  {
141  XrdCl::URL url(io->Path());
142  std::string filename = url.GetPath();
143  std::vector<Decision*>::const_iterator it;
144  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145  {
146  XrdPfc::Decision *d = *it;
147  if (! d) continue;
148  if (! d->Decide(filename, *m_oss))
149  {
150  return false;
151  }
152  }
153  }
154 
155  return true;
156 }
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long  file_size,
long long  bytes_on_disk 
)

Definition at line 967 of file XrdPfc.cc.

968 {
969  if (file_size == 0 || bytes_on_disk >= file_size)
970  return true;
971 
972  double frac_on_disk = (double) bytes_on_disk / file_size;
973 
974  if (file_size <= m_configuration.m_onlyIfCachedMinSize)
975  {
976  if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
977  return true;
978  }
979  else
980  {
981  if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
982  frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
983  return true;
984  }
985  return false;
986 }

References XrdPfc::Configuration::m_onlyIfCachedMinFrac, and XrdPfc::Configuration::m_onlyIfCachedMinSize.

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 711 of file XrdPfc.cc.

712 {
713  // Can be called with other locks held.
714 
715  if ( ! m_prefetch_enabled)
716  {
717  return;
718  }
719 
720  m_prefetch_condVar.Lock();
721  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
722  {
723  if (*it == file)
724  {
725  m_prefetchList.erase(it);
726  break;
727  }
728  }
729  m_prefetch_condVar.UnLock();
730 }

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string &  cinfo_fname)

Definition at line 926 of file XrdPfc.cc.

927 {
928  if (m_metaXattr) {
929  char pfn[4096];
930  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
931  long long fsize = -1ll;
932  int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
933  if (res == sizeof(long long))
934  {
935  return fsize;
936  }
937  else
938  {
939  TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
940  }
941  }
942 
943  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
944  XrdOucEnv env;
945  long long ret;
946  int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
947  if (res < 0) {
948  ret = res;
949  } else {
950  Info info(m_trace, 0);
951  if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
952  ret = -EBADF;
953  } else {
954  ret = info.GetFileSize();
955  }
956  infoFile->Close();
957  }
958  delete infoFile;
959  return ret;
960 }
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdSysXAttr::Get(), XrdPfc::Info::GetFileSize(), XrdOss::Lfn2Pfn(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52 {
53  static const char *top_epfx = "ExecuteCommandUrl ";
54 
55  SplitParser cp(command_url, "/");
56 
57  std::string token = cp.get_token();
58 
59  if (token != "xrdpfc_command")
60  {
61  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62  return;
63  }
64 
65  // Get the command
66  token = cp.get_token_as_string();
67 
68  auto get_opt = [](SplitParser &sp) -> char {
69  const char *t = sp.get_token();
70  if (t)
71  return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72  else
73  return -1;
74  };
75 
76  //================================================================
77  // create_file
78  //================================================================
79 
80  if (token == "create_file")
81  {
82  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83  static const char* usage =
84  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85  " Creates a cache file with given parameters. Data in file is random.\n"
86  " Useful for cache purge testing.\n"
87  "Notes:\n"
88  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90  " . -t and -d can be given multiple times to record several accesses.\n"
91  " . Negative arguments given to -t are interpreted as relative to now.\n";
92 
93  const Configuration &conf = m_configuration;
94 
95  token = cp.get_token_as_string();
96  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97  if (token.empty()) {
98  TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99  return;
100  }
101  TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102  if ( ! cp.has_reminder()) {
103  TRACE(Error, err_prefix << "Path section must not be empty.");
104  return;
105  }
106 
107  long long file_size = ONE_GB;
108  long long block_size = conf.m_bufferSize;
109  int access_time [MAX_ACCESSES];
110  int access_duration[MAX_ACCESSES];
111  int at_count = 0, ad_count = 0;
112 
113  time_t time_now = time(0);
114 
115  SplitParser ap(token, " ");
116  char theOpt;
117 
118  while ((theOpt = get_opt(ap)) != (char) -1)
119  {
120  switch (theOpt)
121  {
122  case 'h': {
123  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124  return;
125  }
126  case 's': {
127  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128  &file_size, 0ll, 32 * ONE_GB))
129  return;
130  break;
131  }
132  case 'b': {
133  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134  &block_size, 0ll, 64 * ONE_MB))
135  return;
136  break;
137  }
138  case 't': {
139  if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140  &access_time[at_count++], INT_MIN, INT_MAX))
141  return;
142  break;
143  }
144  case 'd': {
145  if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146  &access_duration[ad_count++], 0, 24 * 3600))
147  return;
148  break;
149  }
150  default: {
151  TRACE(Error, err_prefix << "Unhandled command argument.");
152  return;
153  }
154  }
155  }
156 
157  if (at_count < 1) access_time [at_count++] = time_now - 10;
158  if (ad_count < 1) access_duration[ad_count++] = 10;
159 
160  if (at_count != ad_count)
161  {
162  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163  return;
164  }
165 
166  std::string file_path (cp.get_reminder_with_delim());
167  std::string cinfo_path(file_path + Info::s_infoExtension);
168 
169  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170 
171  // Check if cinfo exists ... bail out if it does.
172  {
173  struct stat infoStat;
174  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175  {
176  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177  return;
178  }
179  }
180 
181  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182 
183  {
184  const char *myUser = conf.m_username.c_str();
185  XrdOucEnv myEnv;
186 
187  // Create the data file.
188 
189  char size_str[32]; sprintf(size_str, "%lld", file_size);
190  myEnv.Put("oss.asize", size_str);
191  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192  int cret;
193  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194  {
195  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196  return;
197  }
198 
199  XrdOssDF *myFile = GetOss()->newFile(myUser);
200  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201  {
202  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203  delete myFile;
204  return;
205  }
206 
207  // Create the info file.
208 
209  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212  {
213  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214  myFile->Close(); delete myFile;
215  return;
216  }
217 
218  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220  {
221  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222  delete myInfoFile;
223  myFile->Close(); delete myFile;
224  return;
225  }
226 
227  // Allocate space for the data file.
228 
229  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230  {
231  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232  }
233 
234  // Fill up cinfo.
235 
236  Info myInfo(m_trace, false);
237  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238  myInfo.SetAllBitsSynced();
239 
240  for (int i = 0; i < at_count; ++i)
241  {
242  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243 
244  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245  }
246 
247  myInfo.Write(myInfoFile, cinfo_path.c_str());
248 
249  // Fake last modified time to the last access_time
250  {
251  time_t last_detach;
252  myInfo.GetLatestDetachTime(last_detach);
253  struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254 
255  futimens(myInfoFile->getFD(), acc_mod_time);
256  }
257 
258  myInfoFile->Close(); delete myInfoFile;
259  myFile->Close(); delete myFile;
260 
261  struct stat dstat;
262  GetOss()->Stat(file_path.c_str(), &dstat);
263  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264  << "st_blocks=" << dstat.st_blocks);
265 
266  {
267  XrdSysCondVarHelper lock(&m_writeQ.condVar);
268 
269  m_writeQ.writes_between_purges += file_size;
270  }
271  {
272  int token = m_res_mon->register_file_open(file_path, time_now, false);
273  XrdPfc::Stats stats;
274  stats.m_BytesWritten = file_size;
275  stats.m_StBlocksAdded = dstat.st_blocks;
276  m_res_mon->register_file_update_stats(token, stats);
277  m_res_mon->register_file_close(token, time(0), stats);
278  }
279  }
280  }
281 
282  //================================================================
283  // remove_file
284  //================================================================
285 
286  else if (token == "remove_file")
287  {
288  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289  static const char* usage =
290  "Usage: remove_file/ [-h] /<path>\n"
291  " Removes given file from the cache unless it is currently open.\n"
292  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293  "Notes:\n"
294  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295 
296  token = cp.get_token_as_string();
297  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298  if (token.empty()) {
299  TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300  return;
301  }
302  TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303  if ( ! cp.has_reminder()) {
304  TRACE(Error, err_prefix << "Path section must not be empty.");
305  return;
306  }
307 
308  SplitParser ap(token, " ");
309  char theOpt;
310 
311  while ((theOpt = get_opt(ap)) != (char) -1)
312  {
313  switch (theOpt)
314  {
315  case 'h': {
316  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317  return;
318  }
319  default: {
320  TRACE(Error, err_prefix << "Unhandled command argument.");
321  return;
322  }
323  }
324  }
325 
326  std::string f_name(cp.get_reminder_with_delim());
327 
328  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329 
330  int ret = UnlinkFile(f_name, true);
331 
332  TRACE(Info, err_prefix << "returned with status " << ret);
333  }
334 
335  //================================================================
336  // unknown command
337  //================================================================
338 
339  else
340  {
341  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342  }
343 }
void usage()
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
bool Create
virtual int getFD()
Definition: XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
XrdOss * GetOss() const
Definition: XrdPfc.hh:280
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1117
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1188
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::ResourceMonitor::register_file_close(), XrdPfc::ResourceMonitor::register_file_open(), XrdPfc::ResourceMonitor::register_file_update_stats(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), stat, XrdOss::Stat(), Stat(), TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 542 of file XrdPfc.cc.

543 {
544  dec_ref_cnt(f, high_debug);
545 }

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 389 of file XrdPfc.cc.

390 {
391  // Called from virtual IOFile constructor.
392 
393  TRACE(Debug, "GetFile " << path << ", io " << io);
394 
395  ActiveMap_i it;
396 
397  {
398  XrdSysCondVarHelper lock(&m_active_cond);
399 
400  while (true)
401  {
402  it = m_active.find(path);
403 
404  // File is not open or being opened. Mark it as being opened and
405  // proceed to opening it outside of while loop.
406  if (it == m_active.end())
407  {
408  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
409  break;
410  }
411 
412  if (it->second != 0)
413  {
414  it->second->AddIO(io);
415  inc_ref_cnt(it->second, false, true);
416 
417  return it->second;
418  }
419  else
420  {
421  // Wait for some change in m_active, then recheck.
422  m_active_cond.Wait();
423  }
424  }
425  }
426 
427  // This is always true, now that IOFileBlock is unsupported.
428  if (filesize == 0)
429  {
430  struct stat st;
431  int res = io->Fstat(st);
432  if (res < 0) {
433  errno = res;
434  TRACE(Error, "GetFile, could not get valid stat");
435  } else if (res > 0) {
436  errno = ENOTSUP;
437  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
438  } else {
439  filesize = st.st_size;
440  }
441  }
442 
443  File *file = 0;
444 
445  if (filesize >= 0)
446  {
447  file = File::FileOpen(path, off, filesize, io->GetInput());
448  }
449 
450  {
451  XrdSysCondVarHelper lock(&m_active_cond);
452 
453  if (file)
454  {
455  inc_ref_cnt(file, false, true);
456  it->second = file;
457 
458  file->AddIO(io);
459  }
460  else
461  {
462  m_active.erase(it);
463  }
464 
465  m_active_cond.Broadcast();
466  }
467 
468  return file;
469 }
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
void AddIO(IO *io)
Definition: XrdPfcFile.cc:349
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:141
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), XrdPfc::IO::GetInput(), stat, TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream* XrdPfc::Cache::GetGStream ( )
inline

Definition at line 298 of file XrdPfc.hh.

298 { return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 132 of file XrdPfc.cc.

132 { return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError* XrdPfc::Cache::GetLog ( ) const
inline

Definition at line 294 of file XrdPfc.hh.

294 { return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 733 of file XrdPfc.cc.

734 {
735  m_prefetch_condVar.Lock();
736  while (m_prefetchList.empty())
737  {
738  m_prefetch_condVar.Wait();
739  }
740 
741  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
742 
743  size_t l = m_prefetchList.size();
744  int idx = rand() % l;
745  File* f = m_prefetchList[idx];
746 
747  m_prefetch_condVar.UnLock();
748  return f;
749 }

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss* XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 280 of file XrdPfc.hh.

280 { return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin* XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 284 of file XrdPfc.hh.

284 { return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace* XrdPfc::Cache::GetTrace ( ) const
inline

Definition at line 295 of file XrdPfc.hh.

295 { return m_trace; }

Referenced by XrdPfc::IO::GetTrace(), and XrdPfc::File::GetTrace().

+ Here is the caller graph for this function:

◆ is_prefetch_enabled()

bool XrdPfc::Cache::is_prefetch_enabled ( ) const
inline

Definition at line 307 of file XrdPfc.hh.

307 { return m_prefetch_enabled; }

Referenced by XrdOucGetCache().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path) const

Definition at line 677 of file XrdPfc.cc.

678 {
679  XrdSysCondVarHelper lock(&m_active_cond);
680 
681  return m_active.find(path) != m_active.end() ||
682  m_purge_delay_set.find(path) != m_purge_delay_set.end();
683 }

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 794 of file XrdPfc.cc.

796 {
797  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
798  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
799  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
800 
801  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
802 
803  if (buff && blen > 0) buff[0] = 0;
804 
805  XrdCl::URL url(curl);
806  std::string f_name = url.GetPath();
807  std::string i_name = f_name + Info::s_infoExtension;
808 
809  if (why == ForPath)
810  {
811  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
812  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
813  return ret;
814  }
815 
816  {
817  XrdSysCondVarHelper lock(&m_active_cond);
818  m_purge_delay_set.insert(f_name);
819  }
820 
821  struct stat sbuff, sbuff2;
822  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
823  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
824  {
825  if (S_ISDIR(sbuff.st_mode))
826  {
827  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
828  return -EISDIR;
829  }
830  else
831  {
832  bool read_ok = false;
833  bool is_complete = false;
834 
835  // Lock and check if the file is active. If NOT, keep the lock
836  // and add dummy access after successful reading of info file.
837  // If it IS active, just release the lock, this ongoing access will
838  // assure the file continues to exist.
839 
840  // XXXX How can I just loop over the cinfo file when active?
841  // Can I not get is_complete from the existing file?
842  // Do I still want to inject access record?
843  // Oh, it writes only if not active .... still let's try to use existing File.
844 
845  m_active_cond.Lock();
846 
847  bool is_active = m_active.find(f_name) != m_active.end();
848 
849  if (is_active) m_active_cond.UnLock();
850 
851  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
852  XrdOucEnv myEnv;
853  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
854  if (res >= 0)
855  {
856  Info info(m_trace, 0);
857  if (info.Read(infoFile, i_name.c_str()))
858  {
859  read_ok = true;
860 
861  is_complete = info.IsComplete();
862 
863  // Add full-size access if reason is for access.
864  if ( ! is_active && is_complete && why == ForAccess)
865  {
866  info.WriteIOStatSingle(info.GetFileSize());
867  info.Write(infoFile, i_name.c_str());
868  }
869  }
870  infoFile->Close();
871  }
872  delete infoFile;
873 
874  if ( ! is_active) m_active_cond.UnLock();
875 
876  if (read_ok)
877  {
878  if ((is_complete || why == ForInfo) && buff != 0)
879  {
880  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
881  if (res2 < 0)
882  return res2;
883 
884  // Normally, files are owned by us but when direct cache access
885  // is wanted and possible, make sure the file is world readable.
886  if (why == ForAccess)
887  {mode_t mode = (forall ? worldReadable : groupReadable);
888  if (((sbuff.st_mode & worldReadable) != mode)
889  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
890  {is_complete = false;
891  *buff = 0;
892  }
893  }
894  }
895 
896  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
897  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
898 
899  return is_complete ? 0 : -EREMOTE;
900  }
901  }
902  }
903 
904  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
905  return -ENOENT;
906 }
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0

References XrdOss::Chmod(), XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 752 of file XrdPfc.cc.

753 {
754  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
755 
756  while (true)
757  {
758  m_RAM_mutex.Lock();
759  bool doPrefetch = (m_RAM_used < limit_RAM);
760  m_RAM_mutex.UnLock();
761 
762  if (doPrefetch)
763  {
765  f->Prefetch();
766  }
767  else
768  {
770  }
771  }
772 }
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:733
void Prefetch()
Definition: XrdPfcFile.cc:1624
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ prefetch_str2value()

bool Cache::prefetch_str2value ( const char *  from,
const char *  str,
int &  val,
int  min,
int  max 
) const

Definition at line 118 of file XrdPfcConfiguration.cc.

120 {
121  if (XrdOuca2x::a2i(m_log, "Error parsing prefetch block count", str, &val, min, max))
122  return false;
123 
124  return true;
125 }

References XrdOuca2x::a2i().

+ Here is the call graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1066 of file XrdPfc.cc.

1067 {
1068  XrdCl::URL url(curl);
1069  std::string f_name = url.GetPath();
1070  std::string i_name = f_name + Info::s_infoExtension;
1071 
1072  // Do not allow write access.
1073  if ((oflags & O_ACCMODE) != O_RDONLY)
1074  {
1075  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1076  return -EROFS;
1077  }
1078 
1079  // Intercept xrdpfc_command requests.
1080  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1081  {
1082  // Schedule a job to process command request.
1083  {
1084  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1085 
1086  schedP->Schedule(ce);
1087  }
1088 
1089  return -EAGAIN;
1090  }
1091 
1092  {
1093  XrdSysCondVarHelper lock(&m_active_cond);
1094  m_purge_delay_set.insert(f_name);
1095  }
1096 
1097  struct stat sbuff;
1098  if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1099  {
1100  TRACE(Dump, "Prepare defer open " << f_name);
1101  return 1;
1102  }
1103  else
1104  {
1105  return 0;
1106  }
1107 }
static XrdScheduler * schedP
Definition: XrdPfc.hh:302
void Schedule(XrdJob *jp)
@ Warning

References XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat, XrdOss::Stat(), TRACE, TPC::Warning, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 273 of file XrdPfc.cc.

274 {
275  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
276 
277  while (true)
278  {
279  m_writeQ.condVar.Lock();
280  while (m_writeQ.size == 0)
281  {
282  m_writeQ.condVar.Wait();
283  }
284 
285  // MT -- optimize to pop several blocks if they are available (or swap the list).
286  // This makes sense especially for smallish block sizes.
287 
288  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
289  long long sum_size = 0;
290 
291  for (int bi = 0; bi < n_pushed; ++bi)
292  {
293  Block* block = m_writeQ.queue.front();
294  m_writeQ.queue.pop_front();
295  m_writeQ.writes_between_purges += block->get_size();
296  sum_size += block->get_size();
297 
298  blks_to_write[bi] = block;
299 
300  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
301  }
302  m_writeQ.size -= n_pushed;
303 
304  m_writeQ.condVar.UnLock();
305 
306  {
307  XrdSysMutexHelper lock(&m_RAM_mutex);
308  m_RAM_write_queue -= sum_size;
309  }
310 
311  for (int bi = 0; bi < n_pushed; ++bi)
312  {
313  Block* block = blks_to_write[bi];
314 
315  block->m_file->WriteBlockToDisk(block);
316  }
317  }
318 }
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1609
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1137

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration& XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 215 of file XrdPfc.hh.

215 { return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor& XrdPfc::Cache::RefResMon ( )
inline

Definition at line 297 of file XrdPfc.hh.

297 { return *m_res_mon; }

Referenced by ResMon().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 695 of file XrdPfc.cc.

696 {
697  // Can be called with other locks held.
698 
699  if ( ! m_prefetch_enabled)
700  {
701  return;
702  }
703 
704  m_prefetch_condVar.Lock();
705  m_prefetchList.push_back(file);
706  m_prefetch_condVar.Signal();
707  m_prefetch_condVar.UnLock();
708 }

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 471 of file XrdPfc.cc.

472 {
473  // Called from virtual IO::DetachFinalize.
474 
475  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
476 
477  {
478  XrdSysCondVarHelper lock(&m_active_cond);
479 
480  f->RemoveIO(io);
481  }
482  dec_ref_cnt(f, true);
483 }
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:386

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 371 of file XrdPfc.cc.

372 {
373  bool std_size = (size == m_configuration.m_bufferSize);
374  {
375  XrdSysMutexHelper lock(&m_RAM_mutex);
376 
377  m_RAM_used -= size;
378 
379  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
380  {
381  m_RAM_std_blocks.push_back(buf);
382  ++m_RAM_std_size;
383  return;
384  }
385  }
386  free(buf);
387 }

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 240 of file XrdPfc.cc.

241 {
242  std::list<Block*> removed_blocks;
243  long long sum_size = 0;
244 
245  m_writeQ.condVar.Lock();
246  std::list<Block*>::iterator i = m_writeQ.queue.begin();
247  while (i != m_writeQ.queue.end())
248  {
249  if ((*i)->m_file == file)
250  {
251  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
252  std::list<Block*>::iterator j = i++;
253  removed_blocks.push_back(*j);
254  sum_size += (*j)->get_size();
255  m_writeQ.queue.erase(j);
256  --m_writeQ.size;
257  }
258  else
259  {
260  ++i;
261  }
262  }
263  m_writeQ.condVar.UnLock();
264 
265  {
266  XrdSysMutexHelper lock(&m_RAM_mutex);
267  m_RAM_write_queue -= sum_size;
268  }
269 
270  file->BlocksRemovedFromWriteQ(removed_blocks);
271 }

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 331 of file XrdPfc.cc.

332 {
333  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
334 
335  bool std_size = (size == m_configuration.m_bufferSize);
336 
337  m_RAM_mutex.Lock();
338 
339  long long total = m_RAM_used + size;
340 
341  if (total <= m_configuration.m_RamAbsAvailable)
342  {
343  m_RAM_used = total;
344  if (std_size && m_RAM_std_size > 0)
345  {
346  char *buf = m_RAM_std_blocks.back();
347  m_RAM_std_blocks.pop_back();
348  --m_RAM_std_size;
349 
350  m_RAM_mutex.UnLock();
351 
352  return buf;
353  }
354  else
355  {
356  m_RAM_mutex.UnLock();
357  char *buf;
358  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
359  {
360  // Report out of mem? Probably should report it at least the first time,
361  // then periodically.
362  return 0;
363  }
364  return buf;
365  }
366  }
367  m_RAM_mutex.UnLock();
368  return 0;
369 }

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 135 of file XrdPfc.cc.

135 { return m_instance->RefResMon(); }
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:297

References RefResMon().

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 290 of file XrdPfc.hh.

290 { schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1117 of file XrdPfc.cc.

1118 {
1119  const char *tpfx = "Stat ";
1120 
1121  XrdCl::URL url(curl);
1122  std::string f_name = url.GetPath();
1123 
1124  File *file = nullptr;
1125  {
1126  XrdSysCondVarHelper lock(&m_active_cond);
1127  auto it = m_active.find(f_name);
1128  if (it != m_active.end()) {
1129  file = it->second;
1130  // If `file` is nullptr, the file-open is in progress; instead
1131  // of waiting for the file-open to finish, simply treat it as if
1132  // the file-open doesn't exist.
1133  if (file) {
1134  inc_ref_cnt(file, false, false);
1135  }
1136  }
1137  }
1138  if (file) {
1139  int res = file->Fstat(sbuff);
1140  dec_ref_cnt(file, false);
1141  TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1142  return res;
1143  }
1144 
1145  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1146  if (res != XrdOssOK) {
1147  TRACE(Debug, tpfx << curl << " -> " << res);
1148  return 1; // res; -- for only-if-cached
1149  }
1150  if (S_ISDIR(sbuff.st_mode))
1151  {
1152  TRACE(Debug, tpfx << curl << " -> EISDIR");
1153  return -EISDIR;
1154  }
1155 
1156  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1157  if (file_size < 0) {
1158  TRACE(Debug, tpfx << curl << " -> " << file_size);
1159  return 1; // (int) file_size; -- for only-if-cached
1160  }
1161  sbuff.st_size = file_size;
1162  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1163  if ( ! is_cached)
1164  sbuff.st_atime = 0;
1165 
1166  TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1167 
1168  return 0;
1169 }

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 133 of file XrdPfc.cc.

133 { return *m_instance; }

Referenced by XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1178 of file XrdPfc.cc.

1179 {
1180  XrdCl::URL url(curl);
1181  std::string f_name = url.GetPath();
1182 
1183  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1184 
1185  return UnlinkFile(f_name, false);
1186 }

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1188 of file XrdPfc.cc.

1189 {
1190  static const char* trc_pfx = "UnlinkFile ";
1191  ActiveMap_i it;
1192  File *file = 0;
1193  long long st_blocks_to_purge = 0;
1194  {
1195  XrdSysCondVarHelper lock(&m_active_cond);
1196 
1197  it = m_active.find(f_name);
1198 
1199  if (it != m_active.end())
1200  {
1201  if (fail_if_open)
1202  {
1203  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1204  return -EBUSY;
1205  }
1206 
1207  // Null File* in m_active map means an operation is ongoing, probably
1208  // Attach() with possible File::Open(). Ask for retry.
1209  if (it->second == 0)
1210  {
1211  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1212  return -EAGAIN;
1213  }
1214 
1215  file = it->second;
1216  st_blocks_to_purge = file->initiate_emergency_shutdown();
1217  it->second = 0;
1218  }
1219  else
1220  {
1221  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1222  }
1223  }
1224 
1225  if (file) {
1226  RemoveWriteQEntriesFor(file);
1227  } else {
1228  struct stat f_stat;
1229  if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1230  st_blocks_to_purge = f_stat.st_blocks;
1231  }
1232 
1233  std::string i_name = f_name + Info::s_infoExtension;
1234 
1235  // Unlink file & cinfo
1236  int f_ret = m_oss->Unlink(f_name.c_str());
1237  int i_ret = m_oss->Unlink(i_name.c_str());
1238 
1239  if (st_blocks_to_purge)
1240  m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1241 
1242  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1243 
1244  {
1245  XrdSysCondVarHelper lock(&m_active_cond);
1246  m_active.erase(it);
1247  m_active_cond.Broadcast();
1248  }
1249 
1250  return std::min(f_ret, i_ret);
1251 }
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:240
long long initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:154
void register_file_purge(DirState *target, long long size_in_st_blocks)

References XrdSysCondVar::Broadcast(), Debug, XrdPfc::File::initiate_emergency_shutdown(), XrdPfc::ResourceMonitor::register_file_purge(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdOss::Unlink(), and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 245 of file XrdPfc.hh.

245 { return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int  cinfo_fd,
long long  file_size 
)

Definition at line 911 of file XrdPfc.cc.

912 {
913  if (m_metaXattr) {
914  int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
915  if (res != 0) {
916  TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
917  }
918  }
919 }
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 320 of file XrdPfc.cc.

321 {
322  // Called from ResourceMonitor for an alternative estimation of disk writes.
323  XrdSysCondVarHelper lock(&m_writeQ.condVar);
324  long long ret = m_writeQ.writes_between_purges;
325  m_writeQ.writes_between_purges = 0;
326  return ret;
327 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: