XRootD
XrdThrottleManager Class Reference

#include <XrdThrottleManager.hh>

+ Collaboration diagram for XrdThrottleManager:

Public Member Functions

 XrdThrottleManager (XrdSysError *lP, XrdOucTrace *tP)
 
 ~XrdThrottleManager ()
 
void Apply (int reqsize, int reqops, int uid)
 
bool CheckLoadShed (const std::string &opaque)
 
bool CloseFile (const std::string &entity)
 
void FromConfig (XrdThrottle::Configuration &config)
 
std::tuple< std::string, uint16_t > GetUserInfo (const XrdSecEntity *client)
 
void Init ()
 
bool IsThrottling ()
 
bool OpenFile (const std::string &entity, std::string &open_error_message)
 
void PerformLoadShed (const std::string &opaque, std::string &host, unsigned &port)
 
void PrepLoadShed (const char *opaque, std::string &lsOpaque)
 
void SetLoadShed (std::string &hostname, unsigned port, unsigned frequency)
 
void SetMaxConns (unsigned long max_conns)
 
void SetMaxOpen (unsigned long max_open)
 
void SetMaxWait (unsigned long max_wait)
 
void SetMonitor (XrdXrootdGStream *gstream)
 
void SetThrottles (float reqbyterate, float reqoprate, int concurrency, float interval_length)
 
XrdThrottleTimer StartIOTimer (uint16_t uid, bool &ok)
 

Protected Member Functions

void StopIOTimer (std::chrono::steady_clock::duration &event_duration, uint16_t uid)
 

Friends

class XrdThrottleTimer
 

Detailed Description

Definition at line 53 of file XrdThrottleManager.hh.

Constructor & Destructor Documentation

◆ XrdThrottleManager()

XrdThrottleManager::XrdThrottleManager ( XrdSysError lP,
XrdOucTrace tP 
)

Definition at line 44 of file XrdThrottleManager.cc.

44  :
45  m_trace(tP),
46  m_log(lP),
47  m_interval_length_seconds(1.0),
48  m_bytes_per_second(-1),
49  m_ops_per_second(-1),
50  m_concurrency_limit(-1),
51  m_last_round_allocation(100*1024),
52  m_loadshed_host(""),
53  m_loadshed_port(0),
54  m_loadshed_frequency(0)
55 {
56 }

◆ ~XrdThrottleManager()

XrdThrottleManager::~XrdThrottleManager ( )
inline

Definition at line 107 of file XrdThrottleManager.hh.

107 {} // The buffmanager is never deleted

Member Function Documentation

◆ Apply()

void XrdThrottleManager::Apply ( int  reqsize,
int  reqops,
int  uid 
)

Definition at line 322 of file XrdThrottleManager.cc.

323 {
324  if (m_bytes_per_second < 0)
325  reqsize = 0;
326  if (m_ops_per_second < 0)
327  reqops = 0;
328  while (reqsize || reqops)
329  {
330  // Subtract the requested out of the shares
331  AtomicBeg(m_compute_var);
332  GetShares(m_primary_bytes_shares[uid], reqsize);
333  if (reqsize)
334  {
335  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
336  GetShares(m_secondary_bytes_shares[uid], reqsize);
337  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
338  }
339  else
340  {
341  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
342  }
343  GetShares(m_primary_ops_shares[uid], reqops);
344  if (reqops)
345  {
346  GetShares(m_secondary_ops_shares[uid], reqops);
347  }
348  StealShares(uid, reqsize, reqops);
349  AtomicEnd(m_compute_var);
350 
351  if (reqsize || reqops)
352  {
353  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
354  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
355  m_compute_var.Wait();
356  m_loadshed_limit_hit++;
357  }
358  }
359 
360 }
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition: XrdTrace.hh:63

References AtomicBeg, AtomicEnd, TRACE, and XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ CheckLoadShed()

bool XrdThrottleManager::CheckLoadShed ( const std::string &  opaque)

Definition at line 774 of file XrdThrottleManager.cc.

775 {
776  if (m_loadshed_port == 0)
777  {
778  return false;
779  }
780  if (m_loadshed_limit_hit == 0)
781  {
782  return false;
783  }
784  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
785  {
786  return false;
787  }
788  if (opaque.empty())
789  {
790  return false;
791  }
792  return true;
793 }

◆ CloseFile()

bool XrdThrottleManager::CloseFile ( const std::string &  entity)

Definition at line 256 of file XrdThrottleManager.cc.

257 {
258  if (m_max_open == 0 && m_max_conns == 0) return true;
259 
260  bool result = true;
261  const std::lock_guard<std::mutex> lock(m_file_mutex);
262  if (m_max_open) {
263  auto iter = m_file_counters.find(entity);
264  if (iter == m_file_counters.end()) {
265  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
266  result = false;
267  } else if (iter->second == 0) {
268  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
269  result = false;
270  } else {
271  iter->second--;
272  }
273  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
274  " remain open");
275  }
276 
277  if (m_max_conns) {
278  auto pid = XrdSysThread::Num();
279  auto conn_iter = m_active_conns.find(entity);
280  auto conn_count_iter = m_conn_counters.find(entity);
281  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
282  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
283  " tracking");
284  return false;
285  }
286  auto pid_iter = conn_iter->second->find(pid);
287  if (pid_iter == conn_iter->second->end()) {
288  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
289  " tracking");
290  return false;
291  }
292  if (pid_iter->second == 0) {
293  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
294  " plugin thinks was idle");
295  } else {
296  pid_iter->second--;
297  }
298  if (conn_count_iter == m_conn_counters.end()) {
299  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
300  " observed an open file");
301  } else if (pid_iter->second == 0) {
302  if (conn_count_iter->second == 0) {
303  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
304  " throttle plugin already thought all connections were idle");
305  } else {
306  conn_count_iter->second--;
307  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
308  << conn_count_iter->second << " active connections remain");
309  }
310  }
311  }
312 
313  return result;
314 }
static unsigned long Num(void)

References XrdSysThread::Num(), and TRACE.

Referenced by XrdThrottle::File::close(), and XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ FromConfig()

void XrdThrottleManager::FromConfig ( XrdThrottle::Configuration config)

Definition at line 59 of file XrdThrottleManager.cc.

60 {
61 
62  auto max_open = config.GetMaxOpen();
63  if (max_open != -1) SetMaxOpen(max_open);
64  auto max_conn = config.GetMaxConn();
65  if (max_conn != -1) SetMaxConns(max_conn);
66  auto max_wait = config.GetMaxWait();
67  if (max_wait != -1) SetMaxWait(max_wait);
68 
70  config.GetThrottleIOPSRate(),
71  config.GetThrottleConcurrency(),
72  static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
73 
74  m_trace->What = config.GetTraceLevels();
75 
76  auto loadshed_host = config.GetLoadshedHost();
77  auto loadshed_port = config.GetLoadshedPort();
78  auto loadshed_freq = config.GetLoadshedFreq();
79  if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
80  {
81  // Loadshed specified, so set it.
82  SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
83  }
84 }
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const

References XrdThrottle::Configuration::GetLoadshedFreq(), XrdThrottle::Configuration::GetLoadshedHost(), XrdThrottle::Configuration::GetLoadshedPort(), XrdThrottle::Configuration::GetMaxConn(), XrdThrottle::Configuration::GetMaxOpen(), XrdThrottle::Configuration::GetMaxWait(), XrdThrottle::Configuration::GetThrottleConcurrency(), XrdThrottle::Configuration::GetThrottleDataRate(), XrdThrottle::Configuration::GetThrottleIOPSRate(), XrdThrottle::Configuration::GetThrottleRecomputeIntervalMS(), XrdThrottle::Configuration::GetTraceLevels(), SetLoadShed(), SetMaxConns(), SetMaxOpen(), SetMaxWait(), SetThrottles(), and XrdOucTrace::What.

+ Here is the call graph for this function:

◆ GetUserInfo()

std::tuple< std::string, uint16_t > XrdThrottleManager::GetUserInfo ( const XrdSecEntity client)

Definition at line 116 of file XrdThrottleManager.cc.

116  {
117  // client can be null, if so, return nobody
118  if (!client) {
119  return std::make_tuple("nobody", GetUid("nobody"));
120  }
121 
122  // Try various potential "names" associated with the request, from the most
123  // specific to most generic.
124  std::string user;
125 
126  if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
127  if (client->vorg) user = std::string(client->vorg) + ":" + user;
128  } else if (client->eaAPI) {
129  std::string request_name;
130  if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
131  }
132  if (user.empty()) {user = client->name ? client->name : "nobody";}
133  uint16_t uid = GetUid(user.c_str());
134  return std::make_tuple(user, uid);
135 }
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
XrdSecEntityAttr * eaAPI
non-const API to attributes
Definition: XrdSecEntity.hh:92
char * name
Entity's name.
Definition: XrdSecEntity.hh:69

References XrdSecEntity::eaAPI, XrdSecEntityAttr::Get(), XrdSecEntity::name, and XrdSecEntity::vorg.

Referenced by XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Init()

void XrdThrottleManager::Init ( )

Definition at line 87 of file XrdThrottleManager.cc.

88 {
89  TRACE(DEBUG, "Initializing the throttle manager.");
90  // Initialize all our shares to zero.
91  m_primary_bytes_shares.resize(m_max_users);
92  m_secondary_bytes_shares.resize(m_max_users);
93  m_primary_ops_shares.resize(m_max_users);
94  m_secondary_ops_shares.resize(m_max_users);
95  for (auto & waiter : m_waiter_info) {
96  waiter.m_manager = this;
97  }
98 
99  // Allocate each user 100KB and 10 ops to bootstrap;
100  for (int i=0; i<m_max_users; i++)
101  {
102  m_primary_bytes_shares[i] = m_last_round_allocation;
103  m_secondary_bytes_shares[i] = 0;
104  m_primary_ops_shares[i] = 10;
105  m_secondary_ops_shares[i] = 0;
106  }
107 
108  int rc;
109  pthread_t tid;
110  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
111  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
112 
113 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References DEBUG, XrdSysError::Emsg(), XrdSysThread::Run(), and TRACE.

+ Here is the call graph for this function:

◆ IsThrottling()

bool XrdThrottleManager::IsThrottling ( )
inline

Definition at line 69 of file XrdThrottleManager.hh.

69 {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}

◆ OpenFile()

bool XrdThrottleManager::OpenFile ( const std::string &  entity,
std::string &  open_error_message 
)

Definition at line 179 of file XrdThrottleManager.cc.

180 {
181  if (m_max_open == 0 && m_max_conns == 0) return true;
182 
183  const std::lock_guard<std::mutex> lock(m_file_mutex);
184  auto iter = m_file_counters.find(entity);
185  unsigned long cur_open_files = 0, cur_open_conns;
186  if (m_max_open) {
187  if (iter == m_file_counters.end()) {
188  m_file_counters[entity] = 1;
189  TRACE(FILES, "User " << entity << " has opened their first file");
190  cur_open_files = 1;
191  } else if (iter->second < m_max_open) {
192  iter->second++;
193  cur_open_files = iter->second;
194  } else {
195  std::stringstream ss;
196  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
197  TRACE(FILES, ss.str());
198  error_message = ss.str();
199  return false;
200  }
201  }
202 
203  if (m_max_conns) {
204  auto pid = XrdSysThread::Num();
205  auto conn_iter = m_active_conns.find(entity);
206  auto conn_count_iter = m_conn_counters.find(entity);
207  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
208  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
209  {
210  // note: we are rolling back the increment in open files
211  if (m_max_open) iter->second--;
212  std::stringstream ss;
213  ss << "User " << entity << " has hit the limit of " << m_max_conns <<
214  " open connections";
215  TRACE(CONNS, ss.str());
216  error_message = ss.str();
217  return false;
218  }
219  if (conn_iter == m_active_conns.end()) {
220  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
221  new std::unordered_map<pid_t, unsigned long>());
222  (*conn_map)[pid] = 1;
223  m_active_conns[entity] = std::move(conn_map);
224  if (conn_count_iter == m_conn_counters.end()) {
225  m_conn_counters[entity] = 1;
226  cur_open_conns = 1;
227  } else {
228  m_conn_counters[entity] ++;
229  cur_open_conns = m_conn_counters[entity];
230  }
231  } else {
232  auto pid_iter = conn_iter->second->find(pid);
233  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
234  (*(conn_iter->second))[pid] = 1;
235  conn_count_iter->second++;
236  cur_open_conns = conn_count_iter->second;
237  } else {
238  (*(conn_iter->second))[pid] ++;
239  cur_open_conns = conn_count_iter->second;
240  }
241  }
242  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
243  }
244  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
245  return true;
246 }

References XrdSysThread::Num(), and TRACE.

Referenced by XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ PerformLoadShed()

void XrdThrottleManager::PerformLoadShed ( const std::string &  opaque,
std::string &  host,
unsigned &  port 
)

Definition at line 820 of file XrdThrottleManager.cc.

821 {
822  host = m_loadshed_host;
823  host += "?";
824  host += opaque;
825  port = m_loadshed_port;
826 }

◆ PrepLoadShed()

void XrdThrottleManager::PrepLoadShed ( const char *  opaque,
std::string &  lsOpaque 
)

Definition at line 796 of file XrdThrottleManager.cc.

797 {
798  if (m_loadshed_port == 0)
799  {
800  return;
801  }
802  if (opaque && opaque[0])
803  {
804  XrdOucEnv env(opaque);
805  // Do not load shed client if it has already been done once.
806  if (env.Get("throttle.shed") != 0)
807  {
808  return;
809  }
810  lsOpaque = opaque;
811  lsOpaque += "&throttle.shed=1";
812  }
813  else
814  {
815  lsOpaque = "throttle.shed=1";
816  }
817 }

References XrdOucEnv::Get().

Referenced by XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetLoadShed()

void XrdThrottleManager::SetLoadShed ( std::string &  hostname,
unsigned  port,
unsigned  frequency 
)
inline

Definition at line 80 of file XrdThrottleManager.hh.

81  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxConns()

void XrdThrottleManager::SetMaxConns ( unsigned long  max_conns)
inline

Definition at line 85 of file XrdThrottleManager.hh.

85 {m_max_conns = max_conns;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxOpen()

void XrdThrottleManager::SetMaxOpen ( unsigned long  max_open)
inline

Definition at line 83 of file XrdThrottleManager.hh.

83 {m_max_open = max_open;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxWait()

void XrdThrottleManager::SetMaxWait ( unsigned long  max_wait)
inline

Definition at line 87 of file XrdThrottleManager.hh.

87 {m_max_wait_time = std::chrono::seconds(max_wait);}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMonitor()

void XrdThrottleManager::SetMonitor ( XrdXrootdGStream gstream)
inline

Definition at line 89 of file XrdThrottleManager.hh.

89 {m_gstream = gstream;}

◆ SetThrottles()

void XrdThrottleManager::SetThrottles ( float  reqbyterate,
float  reqoprate,
int  concurrency,
float  interval_length 
)
inline

Definition at line 76 of file XrdThrottleManager.hh.

77  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
78  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ StartIOTimer()

XrdThrottleTimer XrdThrottleManager::StartIOTimer ( uint16_t  uid,
bool &  ok 
)

Definition at line 708 of file XrdThrottleManager.cc.

709 {
710  int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
711  m_io_total++;
712 
713  while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
714  {
715  // If the user has essentially no concurrency, then we let them
716  // temporarily exceed the limit. This prevents potential waits for
717  // every single read for an infrequent user.
718  if (m_waiter_info[uid].m_concurrency < 1)
719  {
720  break;
721  }
722  m_loadshed_limit_hit++;
723  m_io_active.fetch_sub(1, std::memory_order_acq_rel);
724  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
725  ok = m_waiter_info[uid].Wait();
726  if (!ok) {
727  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
728  return XrdThrottleTimer();
729  }
730  cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
731  }
732 
733  ok = true;
734  return XrdThrottleTimer(this, uid);
735 }
friend class XrdThrottleTimer

References DEBUG, TRACE, and XrdThrottleTimer.

◆ StopIOTimer()

void XrdThrottleManager::StopIOTimer ( std::chrono::steady_clock::duration &  event_duration,
uint16_t  uid 
)
protected

Definition at line 741 of file XrdThrottleManager.cc.

742 {
743  m_io_active_time += event_duration.count();
744  auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
745  m_waiter_info[uid].m_io_time += event_duration.count();
746  if (old_active == static_cast<unsigned>(m_concurrency_limit))
747  {
748  // If we are below the concurrency limit threshold and have another waiter
749  // for our user, then execute it immediately. Otherwise, we will give
750  // someone else a chance to run (as we have gotten more than our share recently).
751  unsigned waiting_users = m_waiting_users;
752  if (waiting_users == 0) waiting_users = 1;
753  if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
754  {
755  std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
756  if (m_waiter_info[uid].m_waiting > 0)
757  {
758  m_waiter_info[uid].NotifyOne(std::move(lock));
759  return;
760  }
761  }
762  NotifyOne();
763  }
764 }

Friends And Related Function Documentation

◆ XrdThrottleTimer

friend class XrdThrottleTimer
friend

Definition at line 56 of file XrdThrottleManager.hh.

Referenced by StartIOTimer().


The documentation for this class was generated from the following files: