XRootD
XrdBwmHandle.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d B w m H a n d l e . c c */
4 /* */
5 /* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstring>
33 
34 #include "XrdBwm/XrdBwmHandle.hh"
35 #include "XrdBwm/XrdBwmLogger.hh"
36 #include "XrdBwm/XrdBwmTrace.hh"
38 #include "XrdSys/XrdSysError.hh"
39 #include "XrdSys/XrdSysPlatform.hh"
40 
41 #include "XProtocol/XProtocol.hh"
42 
43 /******************************************************************************/
44 /* S t a t i c O b j e c t s */
45 /******************************************************************************/
46 
47 XrdBwmLogger *XrdBwmHandle::Logger = 0;
48 XrdBwmPolicy *XrdBwmHandle::Policy = 0;
49 XrdBwmHandle *XrdBwmHandle::Free = 0;
50 unsigned int XrdBwmHandle::numQueued = 0;
51 
52 extern XrdSysError BwmEroute;
53 
54 /******************************************************************************/
55 /* L o c a l C l a s s e s */
56 /******************************************************************************/
57 
58 class XrdBwmHandleCB : public XrdOucEICB, public XrdOucErrInfo
59 {
60 public:
61 
62 static
64  {XrdBwmHandleCB *mP;
65  xMutex.Lock();
66  if (!(mP = Free)) mP = new XrdBwmHandleCB;
67  else Free = mP->Next;
68  xMutex.UnLock();
69  return mP;
70  }
71 
72 void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
73  {xMutex.Lock();
74  Next = Free;
75  Free = this;
76  xMutex.UnLock();
77  }
78 
79 int Same(unsigned long long arg1, unsigned long long arg2) {return 0;}
80 
81  XrdBwmHandleCB() : Next(0) {}
83 
84 private:
85  XrdBwmHandleCB *Next;
86 static XrdSysMutex xMutex;
87 static XrdBwmHandleCB *Free;
88 };
89 
90 XrdSysMutex XrdBwmHandleCB::xMutex;
91 XrdBwmHandleCB *XrdBwmHandleCB::Free = 0;
92 
93 /******************************************************************************/
94 /* E x t e r n a l L i n k a g e s */
95 /******************************************************************************/
96 
97 void *XrdBwmHanXeq(void *pp)
98 {
99  return XrdBwmHandle::Dispatch();
100 }
101 
102 /******************************************************************************/
103 /* c l a s s X r d B w m H a n d l e */
104 /******************************************************************************/
105 /******************************************************************************/
106 /* A c t i v a t e */
107 /******************************************************************************/
108 
109 #define tident Parms.Tident
110 
112 {
113  EPNAME("Activate");
114  XrdSysMutexHelper myHelper(hMutex);
115  char *rBuff;
116  int rSize, rc;
117 
118 // Check the status of this request.
119 //
120  if (Status != Idle)
121  {if (Status == Scheduled)
122  einfo.setErrInfo(kXR_inProgress, "Request already scheduled.");
123  else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued.");
124  return SFS_ERROR;
125  }
126 
127 // Try to schedule this request.
128 //
129  qTime = time(0);
130  rBuff = einfo.getMsgBuff(rSize);
131  if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR;
132 
133 // If resource immediately available, let client run
134 //
135  if (rc > 0)
136  {rHandle = rc;
137  Status = Dispatched;
138  rTime = time(0);
139  ZTRACE(sched,"Run " <<Parms.Lfn <<' ' <<Parms.LclNode
140  <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
141  <<Parms.RmtNode);
142  einfo.setErrCode(strlen(rBuff));
143  return (*rBuff ? SFS_DATA : SFS_OK);
144  }
145 
146 // Request was queued. We need to hold on to this so we can issue an async
147 // response later when the resource becomes available.
148 //
149  rHandle = -rc;
150  ErrCB = einfo.getErrCB(ErrCBarg);
151  einfo.setErrCB((XrdOucEICB *)&myEICB);
152  Status = Scheduled;
153  refHandle(rHandle, this);
154  ZTRACE(sched, "inQ " <<Parms.Lfn <<' ' <<Parms.LclNode
155  <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
156  <<Parms.RmtNode);
157 
158 // Indicate that client needs to wait
159 //
160  return SFS_STARTED;
161 }
162 #undef tident
163 
164 /******************************************************************************/
165 /* static public A l l o c # 1 */
166 /******************************************************************************/
167 
168 XrdBwmHandle *XrdBwmHandle::Alloc(const char *theUsr, const char *thePath,
169  const char *LclNode, const char *RmtNode,
170  int Incoming)
171 {
172  XrdBwmHandle *hP = Alloc();
173 
174 // Initialize the hanlde
175 //
176  if (hP)
177  {hP->Parms.Tident = theUsr; // Always available
178  hP->Parms.Lfn = strdup(thePath);
179  hP->Parms.LclNode = strdup(LclNode);
180  hP->Parms.RmtNode = strdup(RmtNode);
181  hP->Parms.Direction = (Incoming ? XrdBwmPolicy::Incoming
183  hP->Status = Idle;
184  hP->qTime = 0;
185  hP->rTime = 0;
186  hP->xSize = 0;
187  hP->xTime = 0;
188  }
189 
190 // All done
191 //
192  return hP;
193 }
194 
195 /******************************************************************************/
196 /* private A l l o c # 2 */
197 /******************************************************************************/
198 
200 {
201  static XrdSysMutex aMutex;
202  constexpr int minAlloc = 4096/sizeof(XrdBwmHandle);
203  XrdBwmHandle *hP = nullptr;
204 
205 // No handle currently in the table. Get a new one off the free list or
206 // return one to the free list.
207 //
208  aMutex.Lock();
209  if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;}
210  else {if (!Free)
211  if ((hP = new XrdBwmHandle[minAlloc]()))
212  {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
213  if ((hP = Free)) Free = hP->Next;
214  }
215  aMutex.UnLock();
216 
217  return hP;
218 }
219 
220 /******************************************************************************/
221 /* D i s p a t c h */
222 /******************************************************************************/
223 
224 #define tident hP->Parms.Tident
225 
227 {
228  EPNAME("Dispatch");
230  XrdBwmHandle *hP;
231  char *RespBuff;
232  int RespSize, readyH, Result, Err;
233 
234 // Dispatch ready requests in an endless loop
235 //
236  do {
237 
238 // Setup buffer
239 //
240  RespBuff = erP->getMsgBuff(RespSize);
241  *RespBuff = '\0';
242  erP->setErrCode(0);
243 
244 // Get next ready request and test if it ended with an error
245 //
246  if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0))
247  readyH = -readyH;
248 
249 // Find the matching handle
250 //
251  if (!(hP = refHandle(readyH)))
252  {sprintf(RespBuff, "%d", readyH);
253  BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff);
254  if (!Err) Policy->Done(readyH);
255  continue;
256  }
257 
258 // Lock the handle and make sure it can be dispatched
259 //
260  hP->hMutex.Lock();
261  if (hP->Status != Scheduled)
262  {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle",
263  hP->Parms.Tident, hP->Parms.Lfn);
264  if (!Err) Policy->Done(readyH);
265  } else {
266  hP->myEICB.Wait(); hP->rTime = time(0);
267  erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg);
268  if (Err) {hP->Status = Idle; Result = SFS_ERROR;}
269  else {hP->Status = Dispatched;
270  erP->setErrCode(strlen(RespBuff));
271  Result = (*RespBuff ? SFS_DATA : SFS_OK);
272  }
273  ZTRACE(sched,(Err?"Err ":"Run ") <<hP->Parms.Lfn <<' ' <<hP->Parms.LclNode
274  <<(hP->Parms.Direction == XrdBwmPolicy::Incoming ? " <- ":" -> ")
275  <<hP->Parms.RmtNode);
276  hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP);
277  erP = XrdBwmHandleCB::Alloc();
278  }
279  hP->hMutex.UnLock();
280  } while(1);
281 
282 // Keep the compiler happy
283 //
284  return (void *)0;
285 }
286 
287 #undef tident
288 
289 /******************************************************************************/
290 /* private r e f H a n d l e */
291 /******************************************************************************/
292 
293 XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP)
294 {
295  static XrdSysMutex tMutex;
296  static struct {XrdBwmHandle *First;
297  XrdBwmHandle *Last;
298  } hTab[256] = {{0,0}};
299  XrdBwmHandle *pP = 0;
300  int i = refID % 256;
301 
302 // If we have a handle passed, add the handle to the table
303 //
304  tMutex.Lock();
305  if (hP)
306  {hP->Next = 0;
307  if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;}
308  else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;}
309  numQueued++;
310  } else {
311  hP = hTab[i].First;
312  while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;}
313  if (hP)
314  {if (pP) pP->Next = hP->Next;
315  else hTab[i].First = hP->Next;
316  if (hTab[i].Last == hP) hTab[i].Last = pP;
317  numQueued--;
318  }
319  }
320  tMutex.UnLock();
321 
322 // All done.
323 //
324  return hP;
325 }
326 
327 /******************************************************************************/
328 /* public R e t i r e */
329 /******************************************************************************/
330 
331 // The handle must be locked upon entry! It is unlocked upon exit.
332 
334 {
335  XrdSysMutexHelper myHelper(hMutex);
336 
337 // Get the global lock as the links field can only be manipulated with it.
338 // If not idle, cancel the resource. If scheduled, remove it from the table.
339 //
340  if (Status != Idle)
341  {Policy->Done(rHandle);
342  if (Status == Scheduled && !refHandle(rHandle, this))
343  BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn);
344  Status = Idle; rHandle = 0;
345  }
346 
347 // If we have a logger, then log this event
348 //
349  if (Logger && qTime)
350  {XrdBwmLogger::Info myInfo;
351  myInfo.Tident = Parms.Tident;
352  myInfo.Lfn = Parms.Lfn;
353  myInfo.lclNode = Parms.LclNode;
354  myInfo.rmtNode = Parms.RmtNode;
355  myInfo.ATime = qTime;
356  myInfo.BTime = rTime;
357  myInfo.CTime = time(0);
358  myInfo.Size = xSize;
359  myInfo.ESec = xTime;
360  myInfo.Flow = (Parms.Direction == XrdBwmPolicy::Incoming ? 'I':'O');
361  Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq);
362  Logger->Event(myInfo);
363  }
364 
365 // Free storage appendages and recycle handle
366 //
367  if (Parms.Lfn) {free(Parms.Lfn); Parms.Lfn = 0;}
368  if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;}
369  if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;}
370  Alloc(this);
371 }
372 
373 /******************************************************************************/
374 /* s e t P o l i c y */
375 /******************************************************************************/
376 
378 {
379  pthread_t tid;
380  int rc, startThread = (Policy == 0);
381 
382 // Set the policy and then start a thread to do dispatching if we have none
383 //
384  Policy = pP;
385  if (startThread)
386  if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0,
387  0, "Handle Dispatcher")))
388  {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread");
389  return 1;
390  }
391 
392 // All done
393 //
394  Logger = lP;
395  return 0;
396 }
@ kXR_InvalidRequest
Definition: XProtocol.hh:996
@ kXR_inProgress
Definition: XProtocol.hh:1010
XrdSysError BwmEroute
void * XrdBwmHanXeq(void *pp)
Definition: XrdBwmHandle.cc:97
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define ZTRACE(act, x)
Definition: XrdBwmTrace.hh:52
#define Err(p, a, b, c)
Definition: XrdNetSocket.cc:81
XrdOucString Path
#define SFS_DATA
#define SFS_ERROR
#define SFS_STARTED
#define SFS_OK
void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
Definition: XrdBwmHandle.cc:72
static XrdBwmHandleCB * Alloc()
Definition: XrdBwmHandle.cc:63
int Same(unsigned long long arg1, unsigned long long arg2)
Definition: XrdBwmHandle.cc:79
static void * Dispatch()
int Activate(XrdOucErrInfo &einfo)
HandleState Status
Definition: XrdBwmHandle.hh:47
static XrdBwmHandle * Alloc(const char *theUsr, const char *thePath, const char *lclNode, const char *rmtNode, int Incoming)
static int setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP)
const char * rmtNode
Definition: XrdBwmLogger.hh:48
const char * lclNode
Definition: XrdBwmLogger.hh:47
const char * Tident
Definition: XrdBwmLogger.hh:45
void Event(Info &eInfo)
const char * Lfn
Definition: XrdBwmLogger.hh:46
virtual int Done(int rHandle)=0
virtual int Schedule(char *RespBuff, int RespSize, SchedParms &Parms)=0
virtual void Status(int &numqIn, int &numqOut, int &numXeq)=0
virtual int Dispatch(char *RespBuff, int RespSize)=0
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
XrdOucEICB * getErrCB()
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
char * getMsgBuff(int &mblen)
int setErrCode(int code)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)