27 #include "Xrd/XrdBuffer.hh" 28 #include "Xrd/XrdPoll.hh" 29 #include "Xrd/XrdScheduler.hh" 30 #include "XrdNet/XrdNet.hh" 31 #include "XrdOuc/XrdOucRash.hh" 32 #include "XrdOuc/XrdOucStream.hh" 33 #include "XrdSys/XrdSysPriv.hh" 34 #include "XrdSys/XrdSysPlugin.hh" 54 } XpdBroadcastPriority_t;
63 #define PutEnv(x,e) { if (e) { putenv(x); } else { delete[] x; } } 84 XPDLOC(SMGR,
"ProofServCron")
90 TRACE(XERR,
"undefined session manager: cannot start");
96 int quickcheckfreq = 5;
100 int lastrun = time(0);
101 int lastcheck = lastrun, ckfreq = mgr->
CheckFrequency(), waitt = 0;
102 int deltat = ((int)(0.1*ckfreq) >= 1) ? (
int)(0.1*ckfreq) : 1;
103 int maxdelay = 5*ckfreq;
105 TRACE(ALL,
"next full sessions check in "<<ckfreq<<
" secs");
110 waitt = ckfreq - (time(0) - lastcheck);
111 if (waitt > quickcheckfreq || waitt <= 0)
112 waitt = quickcheckfreq;
113 int pollRet = mgr->
Pipe()->
Poll(waitt);
119 if ((rc = mgr->
Pipe()->
Recv(msg)) != 0) {
120 TRACE(XERR,
"problems receiving message; errno: "<<-rc);
127 if ((rc = msg.
Get(fpid)) != 0) {
128 TRACE(XERR,
"kSessionRemoval: problems receiving process ID (buf: '"<<
129 msg.
Buf()<<
"'); errno: "<<-rc);
140 TRACE(XERR,
"kSessionRemoval: problem posting the scheduler pipe");
144 TRACE(REQ,
"kSessionRemoval: session: "<<fpid<<
145 " has been removed from the active list");
148 TRACE(XERR,
"obsolete type: XrdProofdProofServMgr::kClientDisconnect");
155 rc = (rc == 0) ? msg.
Get(svrtype) : rc;
157 TRACE(XERR,
"kCleanSessions: problems parsing message (buf: '"<<
158 msg.
Buf()<<
"'); errno: "<<-rc);
162 TRACE(REQ,
"kCleanSessions: request for user: '"<<usr<<
"', server type: "<<svrtype);
174 TRACE(XERR,
"unknown type: "<<msg.
Type());
185 if ((now - lastrun) < maxdelay) {
187 lastcheck = now + 5 - ckfreq;
190 TRACE(ALL,
"postponing sessions check (will retry in 5 secs)");
194 TRACE(ALL,
"Max time without checks reached ("<<maxdelay<<
"): force a session check");
205 if (clnlostscale <= 0) {
213 TRACE(ALL, cursess <<
" sessions are currently active");
242 XPDLOC(SMGR,
"ProofServRecover")
247 TRACE(XERR,
"undefined session manager: cannot start");
256 TRACE(ALL,
"timeout recovering sessions: "<<rc<<
" sessions not recovered");
258 TRACE(XERR,
"some problem occured while recovering sessions");
260 TRACE(ALL,
"recovering successfully terminated");
274 XPDLOC(SMGR,
"XrdProofdProofServMgr")
310 TRACE(XERR,
"unable to generate pipe for the session poller");
324 XPDLOC(SMGR,
"ProofServMgr::Config")
328 bool notify = (rcf) ? 0 : 1;
339 TRACE(XERR,
"problems parsing file ");
344 msg = (rcf) ?
"re-configuring" :
"configuring";
352 msg =
"client sessions shutdown after disconnection";
354 XPDFORM(msg,
"client sessions kept %sfor %d secs after disconnection",
388 for ( ; ircs !=
fProofServRCs.end(); ircs++) { (*ircs).Print(
"rc"); }
393 for ( ; ienvs !=
fProofServEnvs.end(); ienvs++) { (*ienvs).Print(
"env"); }
398 XPDFORM(msg,
"using %s to start proofserv sessions",
fUseFork ?
"fork()" :
"system()");
405 TRACE(XERR,
"problems trying to recover active sessions");
407 XPDFORM(msg,
"%d active sessions have been recovered", nr);
417 (
void *)&
fManagerCron, 0,
"ProofServMgr cron thread") != 0) {
418 TRACE(XERR,
"could not start cron thread");
421 XPDPRT(
"cron thread started");
433 XPDLOC(SMGR,
"ProofServMgr::AddSession")
435 TRACE(REQ,
"adding new active session ...");
439 TRACE(XERR,
"invalid inputs: "<<(
s ?
"" :
"s, ") <<
", "<< (p->
Client() ?
"" :
"p->Client()"));
450 int rc = info.SaveToFile(path.c_str());
461 XPDLOC(SMGR,
"ProofServMgr::IsSessionSocket")
463 TRACE(REQ,
"checking "<<fpid<<
" ...");
466 if (!fpid || strlen(fpid) <= 0) {
467 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
472 XrdOucString spath(fpid);
473 if (!spath.endswith(
".sock"))
return 0;
478 XrdOucString apath = spath;
479 apath.replace(
".sock",
"");
483 if (stat(apath.c_str(), &st) != 0 && (errno == ENOENT)) {
486 unlink(spath.c_str());
487 TRACE(REQ,
"missing admin path: removing "<<spath<<
" ...");
500 XPDLOC(SMGR,
"ProofServMgr::MvSession")
502 TRACE(REQ,
"moving "<<fpid<<
" ...");
505 if (!fpid || strlen(fpid) <= 0) {
506 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
511 XrdOucString opath(fpid), npath;
515 opath.replace(
".status",
"");
518 opath.replace(
".status",
"");
525 XrdOucString spath = opath;
527 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
528 TRACE(XERR,
"problems removing the UNIX socket path: "<<spath<<
"; errno: "<<errno);
529 spath.replace(
".sock",
".status");
530 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
531 TRACE(XERR,
"problems removing the status file: "<<spath<<
"; errno: "<<errno);
536 if ((rc = rename(opath.c_str(), npath.c_str())) == 0 || (errno == ENOENT)) {
543 TRACE(XERR,
"session pid file cannot be moved: "<<opath<<
544 "; target file: "<<npath<<
"; errno: "<<errno);
553 XPDLOC(SMGR,
"ProofServMgr::RmSession")
555 TRACE(REQ,
"removing "<<fpid<<
" ...");
558 if (!fpid || strlen(fpid) <= 0) {
559 TRACE(XERR,
"invalid input: "<< (fpid ? fpid :
"<nul>"));
568 if (unlink(path.c_str()) == 0)
571 TRACE(XERR,
"session pid file cannot be unlinked: "<<
572 path<<
"; error: "<<errno);
581 XPDLOC(SMGR,
"ProofServMgr::TouchSession")
583 TRACE(REQ,
"touching "<<(fpid ? fpid :
"<nul>")<<
", "<<(fpath ? fpath :
"<nul>")<<
" ...");
586 if (!fpid || strlen(fpid) <= 0) {
587 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
592 XrdOucString path(fpath);
593 if (!fpath || !fpath[0])
597 if (utime(path.c_str(), 0) == 0)
600 TRACE(XERR,
"time stamps for session pid file cannot be updated: "<<
601 path<<
"; error: "<<errno);
612 int to,
const char *fpath)
614 XPDLOC(SMGR,
"ProofServMgr::VerifySession")
617 if (!fpid || strlen(fpid) <= 0) {
618 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
624 if (fpath && strlen(fpath) > 0)
625 XPDFORM(path,
"%s/%s", fpath, fpid);
635 if (stat(path.c_str(), &st)) {
636 TRACE(XERR,
"session status file cannot be stat'ed: "<<
637 path<<
"; error: "<<errno);
642 deltat = time(0) - st.st_mtime;
644 if (path.endswith(
".status")) {
646 path.erase(path.rfind(
".status"));
649 TRACE(DBG,
"admin path for session "<<fpid<<
" hase not been touched" 650 " since at least "<< xto <<
" secs");
660 TRACE(DBG,
"admin path for session "<<fpid<<
" was touched " <<
661 deltat <<
" secs ago");
671 XPDLOC(SMGR,
"ProofServMgr::DeleteFromSessions")
673 TRACE(REQ,
"session: "<<fpid);
676 if (!fpid || strlen(fpid) <= 0) {
677 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
681 XrdOucString key = fpid;
682 key.replace(
".status",
"");
683 key.erase(0, key.rfind(
'.') + 1);
689 XPDFORM(msg,
"session: %s terminated by peer", fpid);
711 XPDLOC(SMGR,
"ProofServMgr::PrepareSessionRecovering")
719 TRACE(REQ,
"preparing recovering of active sessions ...");
723 struct dirent *ent = 0;
724 while ((ent = (
struct dirent *)readdir(dir))) {
725 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
727 XrdOucString rest,
a;
730 if (
a.length() > 0)
continue;
735 TRACE(DBG,
"found active session: "<<pid);
757 0,
"ProofServMgr session recover thread") != 0) {
758 TRACE(XERR,
"could not start session recover thread");
761 XPDPRT(
"session recover thread started");
782 XPDLOC(SMGR,
"ProofServMgr::RecoverActiveSessions")
788 TRACE(XERR,
"recovering clients list undefined");
794 TRACE(REQ,
"start recovering of "<<nrc<<
" clients");
823 TRACE(REQ, nrc<<
" clients still to recover");
836 std::list<XpdClientSessions* >::iterator ii =
fRecoverClients->begin();
838 rc += (*ii)->fProofServs.size();
863 XPDLOC(SMGR,
"ProofServMgr::IsClientRecovering")
866 TRACE(XERR,
"invalid inputs: usr: "<<(usr ? usr :
"")<<
", grp:"<<(grp ? grp :
"")<<
" ...");
874 std::list<XpdClientSessions *>::iterator ii =
fRecoverClients->begin();
876 if ((*ii)->fClient && (*ii)->fClient->Match(usr, grp)) {
884 TRACE(DBG,
"checking usr: "<<usr<<
", grp:"<<grp<<
" ... recovering? "<<
885 rc<<
", until: "<<deadline);
899 XPDLOC(SMGR,
"ProofServMgr::CheckActiveSessions")
901 TRACE(REQ,
"checking active sessions ...");
911 struct dirent *ent = 0;
912 while ((ent = (
struct dirent *)readdir(dir))) {
913 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
916 if (strstr(ent->d_name,
".sock") &&
IsSessionSocket(ent->d_name))
continue;
918 XrdOucString rest, key, after;
921 if (after !=
"status")
continue;
931 bool sessionalive = (
VerifySession(ent->d_name) == 0) ? 1 : 0;
934 if (!xps->
IsValid() || !sessionalive) rmsession = 1;
938 if (sessionalive)
continue;
956 if (!rmsession && verify && !oldvers) {
962 TRACE(REQ,
"session: "<<ent->d_name<<
"; nc: "<<nc<<
"; rm: "<<rmsession);
981 XPDLOC(SMGR,
"ProofServMgr::CheckTerminatedSessions")
983 TRACE(REQ,
"checking terminated sessions ...");
994 struct dirent *ent = 0;
995 while ((ent = (
struct dirent *)readdir(dir))) {
996 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
998 XrdOucString rest,
a;
1003 now = (now > 0) ? now : time(0);
1011 int rcst = stat(path.c_str(), &st);
1012 TRACE(DBG, pid<<
": rcst: "<<rcst<<
", now - mtime: "<<now - st.st_mtime<<
" secs")
1040 XPDLOC(SMGR,
"ProofServMgr::CleanClientSessions")
1042 TRACE(REQ,
"cleaning "<<usr<<
" ...");
1045 bool all = (!usr || strlen(usr) <= 0 || !strcmp(usr,
"all")) ? 1 : 0;
1051 XrdOucString path, rest, key,
a;
1061 if (c) mtx = c->
Mutex();
1064 std::list<int> tobedel;
1073 struct dirent *ent = 0;
1074 while ((ent = (
struct dirent *)readdir(dir))) {
1076 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
1084 if (!all && info.fUser != usr)
continue;
1086 if (srvtype !=
kXPD_AnyServer && info.fSrvType != srvtype)
continue;
1111 struct dirent *ent = 0;
1112 while ((ent = (
struct dirent *)readdir(dir))) {
1114 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
1117 if (
a ==
"status")
continue;
1122 if (!all && info.fUser != usr)
continue;
1124 if (srvtype !=
kXPD_AnyServer && info.fSrvType != srvtype)
continue;
1131 tobedel.push_back(pid);
1143 std::list<int>::iterator ii = tobedel.begin();
1144 while (ii != tobedel.end()) {
1149 std::list<XrdProofdProofServ *>::iterator ixps =
fActiveSessions.begin();
1157 if (!active)
fSessions.Del(key.c_str());
1191 char *val, XrdOucStream *cfg,
bool rcf)
1193 XPDLOC(SMGR,
"ProofServMgr::DoDirective")
1199 if (d->
fName ==
"proofservmgr") {
1201 }
else if (d->
fName ==
"putenv") {
1203 }
else if (d->
fName ==
"putrc") {
1205 }
else if (d->
fName ==
"shutdown") {
1218 XPDLOC(SMGR,
"ProofServMgr::DoDirectiveProofServMgr")
1236 XrdOucString tok(val);
1237 if (tok.beginswith(
"checkfq:")) {
1238 tok.replace(
"checkfq:",
"");
1239 checkfq = strtol(tok.c_str(), 0, 10);
1240 }
else if (tok.beginswith(
"termto:")) {
1241 tok.replace(
"termto:",
"");
1242 termto = strtol(tok.c_str(), 0, 10);
1243 }
else if (tok.beginswith(
"verifyto:")) {
1244 tok.replace(
"verifyto:",
"");
1245 verifyto = strtol(tok.c_str(), 0, 10);
1246 }
else if (tok.beginswith(
"recoverto:")) {
1247 tok.replace(
"recoverto:",
"");
1248 recoverto = strtol(tok.c_str(), 0, 10);
1249 }
else if (tok.beginswith(
"checklost:")) {
1250 tok.replace(
"checklost:",
"");
1251 checklost = strtol(tok.c_str(), 0, 10);
1252 }
else if (tok.beginswith(
"usefork:")) {
1253 tok.replace(
"usefork:",
"");
1254 usefork = strtol(tok.c_str(), 0, 10);
1257 val = cfg->GetWord();
1275 XPDFORM(msg,
"checkfq: %d s, termto: %d s, verifyto: %d s, recoverto: %d s, checklost: %d, usefork: %d",
1292 XrdOucString users, groups, rcval, rcnam;
1293 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1295 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1298 int iequ = rcnam.find(
'=');
1299 if (iequ == STR_NPOS)
return -1;
1304 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1323 XrdOucString users, groups, rcval, rcnam;
1324 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1326 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1330 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1339 XrdOucString &users, XrdOucString &groups,
1340 XrdOucString &rcval, XrdOucString &rcnam,
1341 int &smi,
int &smx,
int &vmi,
int &vmx,
bool &hex)
1343 XrdOucString ssvn, sver;
1345 while (val && val[0]) {
1346 if (!strncmp(val,
"u:", 2)) {
1349 }
else if (!strncmp(val,
"g:", 2)) {
1352 }
else if (!strncmp(val,
"s:", 2)) {
1355 idash = ssvn.find(
'-');
1356 if (idash != STR_NPOS) {
1357 if (ssvn.isdigit(0, idash-1)) smi = ssvn.atoi(0, idash-1);
1358 if (ssvn.isdigit(idash+1)) smx = ssvn.atoi(idash+1);
1360 if (ssvn.isdigit()) smi = ssvn.atoi();
1362 }
else if (!strncmp(val,
"v:", 2)) {
1366 if (sver.beginswith(
'x')) {
1370 idash = sver.find(
'-');
1371 if (idash != STR_NPOS) {
1372 if (sver.isdigit(0, idash-1)) vmi = sver.atoi(0, idash-1);
1373 if (sver.isdigit(idash+1)) vmx = sver.atoi(idash+1);
1375 if (sver.isdigit()) vmi = sver.atoi();
1378 if (rcval.length() > 0) {
1385 val = cfg->GetWord();
1395 const char *usrs,
const char *grps,
1396 int smi,
int smx,
int vmi,
int vmx,
bool hex)
1398 XPDLOC(SMGR,
"ProofServMgr::FillEnvList")
1401 TRACE(ALL,
"env list undefined!");
1405 XrdOucString users(usrs), groups(grps);
1410 XpdEnv xpe(nam, val, users.c_str(), groups.c_str(), smi, smx, vmi, vmx);
1411 if (users.length() > 0) {
1414 while ((from = users.tokenize(usr, from,
',')) != -1) {
1415 if (usr.length() > 0) {
1416 if (groups.length() > 0) {
1419 while ((fromg = groups.tokenize(grp, from,
',')) != -1) {
1420 if (grp.length() > 0) {
1421 xpe.
Reset(nam, val, usr.c_str(), grp.c_str(), smi, smx, vmi, vmx);
1426 xpe.Reset(nam, val, usr.c_str(), 0, smi, smx, vmi, vmx);
1432 if (groups.length() > 0) {
1435 while ((fromg = groups.tokenize(grp, fromg,
',')) != -1) {
1436 if (grp.length() > 0) {
1437 xpe.Reset(nam, val, 0, grp.c_str(), smi, smx, vmi, vmx);
1462 int dp = strtol(val,0,10);
1463 if (dp >= 0 && dp <= 2)
1466 if ((val = cfg->GetWord())) {
1467 int l = strlen(val);
1469 XrdOucString tval = val;
1471 if (val[
l-1] ==
's') {
1473 }
else if (val[
l-1] ==
'm') {
1476 }
else if (val[
l-1] ==
'h') {
1479 }
else if (val[
l-1] < 48 || val[
l-1] > 57) {
1483 int de = strtol(val,0,10);
1506 XPDLOC(SMGR,
"ProofServMgr::Process")
1517 XrdOucString emsg(
"Invalid request code: ");
1522 response->Send(kXR_ServerError,
1523 "ProofServMgr::Process: error posting internal pipe for authorization to proceed");
1527 response->Send(kXR_ServerError,
1528 "ProofServMgr::Process: timed-out waiting for authorization to proceed - retry later");
1550 response->Send(kXR_InvalidRequest, emsg.c_str());
1559 XPDLOC(SMGR,
"ProofServMgr::Attach")
1561 int psid = -1, rc = 0;
1566 TRACEP(p, REQ,
"psid: "<<psid<<
", CID = "<<p->
CID());
1571 TRACEP(p, XERR,
"client instance undefined");
1572 response->Send(kXR_ServerError,
"client instance undefined");
1581 while ((deadline < 0) || (now < deadline)) {
1582 if (!(xps = c->GetServer(psid)) || !xps->
IsValid()) {
1586 TRACEP(p, XERR,
"session ID not found: "<<psid);
1587 response->Send(kXR_InvalidRequest,
"session ID not found");
1591 deadline = (deadline > 0) ? deadline : defdeadline;
1602 if (!xps || !xps->
IsValid()) {
1603 TRACEP(p, XERR,
"session ID not found: "<<psid);
1604 response->Send(kXR_InvalidRequest,
"session ID not found");
1607 TRACEP(p, DBG,
"xps: "<<xps<<
", status: "<< xps->
Status());
1611 memcpy((
void *)&sid, (
const void *)&(p->
Request()->
header.streamid[0]), 2);
1628 if (!dpu.endswith(
'/'))
1632 (
void *) dpu.c_str(), dpu.length());
1652 unsigned short &sid)
1654 XPDLOC(SMGR,
"ProofServMgr::PrepareProofServ")
1662 memcpy((
void *)&sid, (
const void *)&(p->
Request()->
header.streamid[0]), 2);
1679 XPDFORM(msg,
"++++ Using NON-default ROOT version: %s ++++\n", xps->
ROOT()->
Export());
1680 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) msg.c_str(), msg.length());
1693 XrdOucString &tag, XrdOucString &ord,
1694 XrdOucString &cffile,
1695 XrdOucString &uenvs,
int &intwait)
1697 XPDLOC(SMGR,
"ProofServMgr::ParseCreateBuffer")
1700 char *buf = p->
Argp()->buff;
1704 tag.assign(buf,0,len-1);
1706 TRACEP(p, DBG,
"received buf: "<<tag);
1708 tag.erase(tag.find(
'|'));
1709 xps->
SetTag(tag.c_str());
1710 TRACEP(p, DBG,
"tag: "<<tag);
1715 ord.assign(buf,0,len-1);
1716 int iord = ord.find(
"|ord:");
1717 if (iord != STR_NPOS) {
1718 ord.erase(0,iord+5);
1719 ord.erase(ord.find(
"|"));
1726 cffile.assign(buf,0,len-1);
1727 int icf = cffile.find(
"|cf:");
1728 if (icf != STR_NPOS) {
1729 cffile.erase(0,icf+4);
1730 cffile.erase(cffile.find(
"|"));
1735 XrdOucString plitenwk;
1736 plitenwk.assign(buf,0,len-1);
1737 int inwk = plitenwk.find(
"|plite:");
1738 if (inwk != STR_NPOS) {
1739 plitenwk.erase(0,inwk+7);
1740 plitenwk.erase(plitenwk.find(
"|"));
1741 int nwk = plitenwk.atoi();
1744 TRACEP(p, DBG,
"P-Lite master with "<<nwk<<
" workers (0 means # or cores)");
1749 uenvs.assign(buf,0,len-1);
1750 int ienv = uenvs.find(
"|envs:");
1751 if (ienv != STR_NPOS) {
1752 uenvs.erase(0,ienv+6);
1753 uenvs.erase(uenvs.find(
"|"));
1760 if (uenvs.length() > 0) {
1761 TRACEP(p, DBG,
"user envs: "<<uenvs);
1763 if ((iiw = uenvs.find(
"PROOF_INTWAIT=")) != STR_NPOS) {
1764 XrdOucString
s(uenvs, iiw + strlen(
"PROOF_INTWAIT="));
1765 s.erase(
s.find(
','));
1768 TRACEP(p, ALL,
"startup internal wait set by user to "<<intwait);
1779 XPDLOC(SMGR,
"ProofServMgr::Create")
1781 int psid = -1, rc = 0;
1794 TRACEP(p,ALL,
" cursess: "<<cursess);
1795 if (mxsess <= cursess) {
1796 XPDFORM(msg,
" ++++ Max number of sessions reached (%d) - please retry later ++++ \n", cursess);
1797 response->Send(kXR_attn,
kXPD_srvmsg, (
char *) msg.c_str(), msg.length());
1809 TRACEP(p, DBG, nc <<
" threads are creating a new session");
1822 XrdOucString tag, ord, cffile, uenvs;
1826 TRACEP(p, DBG,
"{ord,cfg,psid,cid,log}: {"<<ord<<
","<<cffile<<
","<<psid
1827 <<
","<<p->
CID()<<
","<<loglevel<<
"}");
1835 response->Send(
kXP_ServerError,
"timed-out acquiring fork semaphore");
1845 "unable to create pipes for communication during setup");
1850 ProofServEnv_t in = {xps, loglevel, cffile.c_str(),
"",
"", tag.c_str(),
"",
"", 1};
1855 TRACEP(p, FORK,
"Forking external proofsrv");
1856 if (!(pid =
fMgr->
Sched()->Fork(
"proofsrv"))) {
1869 XrdOucString pmsg =
"*** spawned child process ";
1870 pmsg += (int) getpid();
1876 TRACE(XERR,
"chown on '"<<in.
fLogFile.c_str()<<
"'; errno: "<<errno);
1879 XrdOucString path, sockpath, emsg;
1882 if (fpc.
Poll() < 0) {
1883 TRACE(XERR,
"error while polling to receive the admin path from parent - EXIT" );
1886 if (fpc.
Recv(xmsg) != 0) {
1887 TRACE(XERR,
"error reading message while waiting for the admin path from parent - EXIT" );
1890 if (xmsg.Type() < 0) {
1891 TRACE(XERR,
"the parent failed to setup the admin path - EXIT" );
1897 TRACE(FORK,
"admin path: "<<path);
1901 if (fpc.
Poll() < 0) {
1902 TRACE(XERR,
"error while polling to receive the sock path from parent - EXIT" );
1905 if (fpc.
Recv(xmsg) != 0) {
1906 TRACE(XERR,
"error reading message while waiting for the sock path from parent - EXIT" );
1909 if (xmsg.Type() < 0) {
1910 TRACE(XERR,
"the parent failed to setup the sock path - EXIT" );
1914 sockpath = xmsg.Buf();
1916 TRACE(FORK,
"UNIX sock path: "<<sockpath);
1919 bool asserdatadir = 1;
1921 TRACE(ALL,
"srvtype = "<< srvtype);
1927 const char *pord = asserdatadir ? ord.c_str() : 0;
1928 const char *ptag = asserdatadir ? in.
fSessionTag.c_str() : 0;
1930 emsg =
"SetUserOwnerships did not return OK - EXIT";
1932 if (fcp.
Post(0, emsg.c_str()) != 0)
1933 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1939 emsg =
"SetUserEnvironment did not return OK - EXIT";
1941 if (fcp.
Post(0, emsg.c_str()) != 0)
1942 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1946 char *argvv[7] = {0};
1950 emsg =
"XrdProofdManager instance undefined!";
1952 if (fcp.
Post(0, emsg.c_str()) != 0)
1953 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1959 size_t len = strlen(
fMgr->
AdminPath()) + strlen(
"xpdpath:") + 1;
1960 sxpd =
new char[len];
1964 sxpd =
new char[10];
1965 snprintf(sxpd, 10,
"%d", getppid());
1969 char slog[10] = {0};
1970 snprintf(slog, 10,
"%d", loglevel);
1973 char ssrv[10] = {0};
1980 argvv[2] = (
char *)
"xpd";
1981 argvv[3] = (
char *)sxpd;
1982 argvv[4] = (
char *)slog;
1983 argvv[5] = (
char *)ssrv;
1988 emsg =
"SetProofServEnv did not return OK - EXIT";
1990 if (fcp.
Post(0, emsg.c_str()) != 0)
1991 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1994 TRACE(FORK, (
int)getpid() <<
": proofserv env set up");
1999 TRACE(XERR,
"cannot write log file path to internal pipe; errno: "<<errno);
2002 TRACE(FORK, (
int)getpid()<<
": log file path communicated");
2006 sigemptyset(&myset);
2007 sigaddset(&myset, SIGUSR1);
2008 sigaddset(&myset, SIGUSR2);
2009 pthread_sigmask(SIG_UNBLOCK, &myset, 0);
2016 ", uid: "<<getuid()<<
", euid:"<<geteuid()<<
2017 ", psrv: "<<xps->
ROOT()->
PrgmSrv()<<
", argvv[1]: "<<argvv[1]);
2022 TRACE(XERR,
"returned from execv: bad, bad sign !!! errno:" << (
int)errno);
2037 TRACEP(p, FORK,
"Parent process: child is "<<pid);
2060 XrdOucString path, sockpath;
2065 struct sockaddr_un unserver;
2066 if (sockpath.length() > (int)(
sizeof(unserver.sun_path) - 1)) {
2067 emsg =
"socket path very long (";
2068 emsg += sockpath.length();
2069 emsg +=
"): this may lead to stack corruption!";
2070 emsg +=
" Use xpd.sockpathdir to change it";
2071 TRACEP(p, XERR, emsg.c_str());
2076 if ((pathrc = fpc.
Post(0, path.c_str())) != 0) {
2077 emsg =
"failed to communicating path to child";
2079 TRACEP(p, XERR, emsg.c_str());
2082 emsg =
"failed to setup child admin path";
2084 if ((pathrc = fpc.
Post(-1, path.c_str())) != 0) {
2085 emsg +=
": failed communicating failure to child";
2087 TRACEP(p, XERR, emsg.c_str());
2095 emsg =
"failure creating UNIX socket on " ;
2098 TRACEP(p, XERR, emsg.c_str());
2104 emsg =
"failure changing ownership of the UNIX socket on " ;
2106 emsg +=
"; errno: " ;
2109 TRACEP(p, XERR, emsg.c_str());
2115 if ((pathrc = fpc.
Post(0, sockpath.c_str())) != 0) {
2116 emsg =
"failed to communicating path to child";
2118 TRACEP(p, XERR, emsg.c_str());
2121 emsg =
"failed to setup child admin path";
2123 if ((pathrc = fpc.
Post(-1, sockpath.c_str())) != 0) {
2124 emsg +=
": failed communicating failure to child";
2126 TRACEP(p, XERR, emsg.c_str());
2137 emsg.insert(npfx, 0);
2142 TRACEP(p, FORK,
"waiting for client setup status ...");
2144 emsg =
"proofserv setup";
2148 int ntry = 10, prc = 0, rst = -1;
2149 while (prc == 0 && ntry--) {
2151 if ((prc = fcp.
Poll(2)) > 0) {
2154 if (fcp.
Recv(xmsg) != 0) {
2155 emsg =
"error receiving message from pipe";
2157 TRACEP(p, XERR, emsg.c_str());
2164 XrdOucString xbuf = xmsg.
Buf();
2165 if (xbuf.length() <= 0) {
2166 emsg =
"error reading buffer {logfile, error message} from message received on the pipe";
2168 TRACEP(p, XERR, emsg.c_str());
2176 XrdOucString stag(xbuf);
2177 stag.erase(stag.rfind(
'/'));
2178 stag.erase(0, stag.find(
"session-") + strlen(
"session-"));
2179 xps->
SetTag(stag.c_str());
2187 TRACEP(p, XERR, emsg.c_str());
2191 }
else if (prc < 0) {
2192 emsg =
"error receive status-of-setup from pipe";
2194 TRACEP(p, XERR, emsg.c_str());
2197 TRACEP(p, FORK,
"receiving status-of-setup from pipe: waiting 2 s ..."<<pid);
2210 emsg =
"failure setting up proofserv" ;
2211 if (prc == 0) emsg +=
": timed-out receiving status-of-setup from pipe";
2220 TRACEP(p, XERR, emsg.c_str());
2221 emsg.insert(npfx, 0);
2233 (
void *) info.c_str(), info.length());
2237 TRACEP(p, FORK,
"server launched: wait for callback ");
2244 emsg =
"problems accepting callback: ";
2247 emsg +=
"process could not be killed - pid: ";
2249 emsg +=
"process killed - pid: ";
2256 TRACEP(p, XERR, emsg.c_str());
2257 emsg.insert(npfx, 0);
2258 response->Send(kXR_attn,
kXPD_errmsg, (
char *) emsg.c_str(), emsg.length());
2268 TRACEP(p, XERR,
"problems changing child process priority");
2269 }
else if (dp > 0) {
2270 TRACEP(p, DBG,
"priority of the child process changed by " << dp <<
" units");
2274 TRACEP(p, FORK,
"xps: "<<xps<<
", ClientID: "<<(
int *)cid<<
" (sid: "<<sid<<
")"<<
" NClients: "<<xps->
GetNClients(1));
2278 TRACEP(p, REQ,
"problems recording session in sandbox");
2284 XrdOucString key; key += pid;
2286 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2294 TRACEP(p, XERR,
"PROOF session is invalid: protocol error? " <<emsg);
2309 bool assert = (pid > 0) ? 1 : 0;
2312 if (pid > 0) path += pid;
2314 XPDFORM(emsg,
"failure setting admin path '%s'", path.c_str());
2327 unsigned int seq, XrdOucString &emsg)
2329 XPDLOC(SMGR,
"ProofServMgr::CreateSockPath")
2331 XrdOucString sockpath;
2334 TRACEP(p, ALL,
"socket path: " << sockpath);
2335 struct sockaddr_un unserver;
2336 if (sockpath.length() > (int)(
sizeof(unserver.sun_path) - 1)) {
2337 XPDFORM(emsg,
"socket path very long (%d): this may lead to stack corruption! ", sockpath.length());
2344 XPDFORM(emsg,
"failure creating UNIX socket on '%s'", sockpath.c_str());
2347 if (chmod(sockpath.c_str(), 0755) != 0) {
2348 XPDFORM(emsg,
"failure changing permissions of the UNIX socket on '%s'; errno: %d",
2349 sockpath.c_str(), (int)errno);
2362 XPDLOC(SMGR,
"ProofServMgr::SendErrLog")
2364 XrdOucString emsg(
"An error occured: the content of errlog follows:");
2365 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) emsg.c_str(), emsg.length());
2366 emsg =
"------------------------------------------------\n";
2367 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2369 int ierr =
open(errlog, O_RDONLY);
2371 XPDFORM(emsg,
"cannot open '%s' (errno: %d)", errlog, errno);
2372 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2376 if (fstat(ierr, &st) != 0) {
2377 XPDFORM(emsg,
"cannot stat '%s' (errno: %d)", errlog, errno);
2378 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2382 off_t len = st.st_size;
2383 TRACE(ALL,
" reading "<<len<<
" bytes from "<<errlog);
2384 ssize_t chunk = 2048, nb, nr;
2388 nb = (left > chunk) ? chunk : left;
2389 if ((nr = read(ierr, buf, nb)) < 0) {
2390 XPDFORM(emsg,
"problems reading from '%s' (errno: %d)", errlog, errno);
2391 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2400 emsg =
"------------------------------------------------";
2401 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2412 XPDLOC(SMGR,
"ProofServMgr::ResolveSession")
2414 TRACE(REQ,
"resolving "<< (fpid ? fpid :
"<nul>")<<
" ...");
2431 if (si.fSrvProtVers < 18) {
2432 TRACE(DBG,
"session does not support recovering: protocol " 2433 <<si.fSrvProtVers<<
" < 18");
2439 si.fUnixPath.c_str());
2441 TRACE(DBG,
"client instance not initialized");
2449 TRACE(DBG,
"server object not initialized");
2467 std::list<XpdClientSessions *>::iterator ii =
fRecoverClients->begin();
2469 if ((*ii)->fClient == c)
2474 (*ii)->fProofServs.push_back(xps);
2490 XPDLOC(SMGR,
"ProofServMgr::Recover")
2493 TRACE(XERR,
"invalid input!");
2511 if (emsg ==
"timeout") {
2512 TRACE(DBG,
"timeout while accepting callback");
2514 TRACE(XERR,
"problems accepting callback: "<<emsg);
2518 XrdOucString key; key += xps->
SrvPID();
2519 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2532 " successfully recovered ("<<left<<
" left); pid: "<<pid);
2541 #ifndef ROOT_XrdFour 2548 int to, XrdOucString &msg)
2550 XPDLOC(SMGR,
"ProofServMgr::AcceptPeer")
2553 XrdNetPeer peerpsrv;
2557 XPDFORM(msg,
"session pointer undefined or socket invalid: %p", xps);
2560 TRACE(REQ,
"waiting for server callback for "<<to<<
" secs ... on "<<xps->
UNIXSockPath());
2563 if (!(xps->
UNIXSock()->Accept(peerpsrv, XRDNET_NODNTRIM, to))) {
2570 msg =
"could not assert connected peer: ";
2584 XPDLOC(SMGR,
"ProofServMgr::SetupProtocol")
2587 XrdLink *linkpsrv = 0;
2588 XrdProtocol *xp = 0;
2593 if (peerpsrv.InetName)
free(peerpsrv.InetName);
2594 peerpsrv.InetName = XrdSysDNS::getHostName(
"localhost");
2597 if (!(linkpsrv = XrdLink::Alloc(peerpsrv, lnkopts))) {
2598 msg =
"could not allocate network object: ";
2604 peerpsrv.InetBuff = 0;
2605 TRACE(DBG,
"connection accepted: matching protocol ... ");
2608 if (!(xp = p->
Match(linkpsrv))) {
2609 msg =
"match failed: protocol error: ";
2621 if (xp->Process(linkpsrv) != 0) {
2622 msg =
"handshake with internal link failed: ";
2628 if (go && !XrdPoll::Attach(linkpsrv)) {
2629 msg =
"could not attach new internal link to poller: ";
2641 linkpsrv->setProtocol(xp);
2643 TRACE(REQ,
"Protocol "<<xp<<
" attached to link "<<linkpsrv<<
" ("<< peerpsrv.InetName <<
")");
2646 fMgr->
Sched()->Schedule((XrdJob *)linkpsrv);
2663 int to, XrdOucString &msg)
2665 XPDLOC(SMGR,
"ProofServMgr::AcceptPeer")
2671 if (!xps || !xps->UNIXSock()) {
2672 XPDFORM(msg,
"session pointer undefined or socket invalid: %p", xps);
2675 TRACE(REQ,
"waiting for server callback for "<<to<<
" secs ... on "<<xps->
UNIXSockPath());
2678 if (!(xps->
UNIXSock()->Accept(netaddr, 0, to))) {
2685 msg =
"could not assert connected peer: ";
2699 XPDLOC(SMGR,
"ProofServMgr::SetupProtocol")
2702 XrdLink *linkpsrv = 0;
2703 XrdProtocol *xp = 0;
2708 if (!(linkpsrv = XrdLink::Alloc(netaddr, lnkopts))) {
2709 msg =
"could not allocate network object: ";
2714 TRACE(DBG,
"connection accepted: matching protocol ... ");
2717 if (!(xp = p->
Match(linkpsrv))) {
2718 msg =
"match failed: protocol error: ";
2730 if (xp->Process(linkpsrv) != 0) {
2731 msg =
"handshake with internal link failed: ";
2737 if (go && !XrdPoll::Attach(linkpsrv)) {
2738 msg =
"could not attach new internal link to poller: ";
2750 linkpsrv->setProtocol(xp);
2752 TRACE(REQ,
"Protocol "<<xp<<
" attached to link "<<linkpsrv<<
" ("<< netaddr.Name() <<
")");
2755 fMgr->
Sched()->Schedule((XrdJob *)linkpsrv);
2771 XPDLOC(SMGR,
"ProofServMgr::Detach")
2773 int psid = -1, rc = 0;
2778 TRACEP(p, REQ,
"psid: "<<psid);
2783 TRACEP(p, XERR,
"session ID not found: "<<psid);
2784 response->Send(kXR_InvalidRequest,
"session ID not found");
2800 XPDLOC(SMGR,
"ProofServMgr::Destroy")
2802 int psid = -1, rc = 0;
2807 TRACEP(p, REQ,
"psid: "<<psid);
2816 TRACEP(p, XERR,
"reference session ID not found");
2817 response->Send(kXR_InvalidRequest,
"reference session ID not found");
2822 XPDFORM(msg,
"all sessions destroyed by %s", p->
Link()->ID);
2844 XPDLOC(SMGR,
"WriteSessEnvs")
2848 XpdWriteEnv_t *xwe = (XpdWriteEnv_t *)
s;
2850 if (env && xwe && xwe->fMgr && xwe->fClient && xwe->
fEnv) {
2851 if (env->
fEnv.length() > 0) {
2853 xwe->fMgr->ResolveKeywords(env->
fEnv, xwe->fClient);
2855 char *ev =
new char[env->
fEnv.length()+1];
2856 strncpy(ev, env->
fEnv.c_str(), env->
fEnv.length());
2857 ev[env->
fEnv.length()] = 0;
2858 fprintf(xwe->fEnv,
"%s\n", ev);
2860 PutEnv(ev, xwe->fExport);
2865 emsg =
"some input undefined";
2869 TRACE(XERR,
"protocol error: "<<emsg);
2879 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnvOld")
2884 if (!p || !p->
Client() || !input) {
2885 TRACE(XERR,
"at leat one input is invalid - cannot continue");
2891 TRACE(XERR,
"problems setting basic environment - exit");
2900 TRACE(XERR,
"unable to get instance of proofserv proxy");
2903 int psid = xps->
ID();
2910 size_t len = strlen(
"ROOTPROOFSESSDIR=") + in->
fWrkDir.length() + 2;
2917 len = strlen(
"ROOTPROOFLOGLEVEL=") + 5;
2924 len = strlen(
"ROOTPROOFORDINAL=")+strlen(xps->
Ordinal()) + 2;
2931 len = strlen(
"ROOTVERSIONTAG=")+strlen(p->
Client()->
ROOT()->
Tag())+2;
2938 TRACE(DBG,
"creating env file");
2939 XrdOucString envfile = in->
fWrkDir;
2941 FILE *fenv = fopen(envfile.c_str(),
"w");
2944 "unable to open env file: "<<envfile);
2947 TRACE(DBG,
"environment file: "<< envfile);
2953 XrdOucString secenvs(getenv(
"XrdSecENVS"));
2954 if (secenvs.length() > 0) {
2958 while ((from = secenvs.tokenize(env, from,
',')) != -1) {
2959 if (env.length() > 0) {
2961 ev =
new char[env.length()+1];
2962 strncpy(ev, env.c_str(), env.length());
2963 ev[env.length()] = 0;
2965 fprintf(fenv,
"%s\n", ev);
2972 XrdSecCredentials *creds = p->
AuthProt()->getCredentials();
2974 len = strlen(
"XrdSecCREDS=")+creds->size;
2975 ev =
new char[len + 1];
2976 strcpy(ev,
"XrdSecCREDS=");
2977 memcpy(ev + strlen(
"XrdSecCREDS="), creds->buffer, creds->size);
2980 TRACE(DBG,
"XrdSecCREDS set");
2982 XrdOucString credsdir = udir;
2983 credsdir +=
"/.creds";
2987 TRACE(DBG,
"problems in saving authentication creds under "<<credsdir);
2990 TRACE(XERR,
"unable to create creds dir: "<<credsdir);
2999 fprintf(fenv,
"ROOTSYS=%s\n", xps->
ROOT()->
Dir());
3002 fprintf(fenv,
"ROOTCONFDIR=%s\n", xps->
ROOT()->
Dir());
3005 fprintf(fenv,
"ROOTTMPDIR=%s\n",
fMgr->
TMPdir());
3008 fprintf(fenv,
"ROOTXPDPORT=%d\n",
fMgr->
Port());
3011 fprintf(fenv,
"ROOTPROOFWORKDIR=%s\n", udir.c_str());
3014 fprintf(fenv,
"ROOTPROOFSESSIONTAG=%s\n", in->
fSessionTag.c_str());
3018 fprintf(fenv,
"ROOTUSEUSERCFG=1\n");
3021 fprintf(fenv,
"ROOTOPENSOCK=%s\n", xps->
UNIXSockPath());
3024 fprintf(fenv,
"ROOTENTITY=%s@%s\n", p->
Client()->
User(), p->
Link()->Host());
3027 fprintf(fenv,
"ROOTSESSIONID=%d\n", psid);
3030 fprintf(fenv,
"ROOTCLIENTID=%d\n", p->
CID());
3033 fprintf(fenv,
"ROOTPROOFCLNTVERS=%d\n", p->
ProofProtocol());
3036 fprintf(fenv,
"ROOTPROOFORDINAL=%s\n", xps->
Ordinal());
3039 if (getenv(
"ROOTVERSIONTAG"))
3040 fprintf(fenv,
"ROOTVERSIONTAG=%s\n", getenv(
"ROOTVERSIONTAG"));
3043 if (in->
fCfg.length() > 0)
3044 fprintf(fenv,
"ROOTPROOFCFGFILE=%s\n", in->
fCfg.c_str());
3047 fprintf(fenv,
"ROOTPROOFLOGFILE=%s\n", in->
fLogFile.c_str());
3054 XrdOucHash<XpdEnv> sessenvs;
3059 if (envmatch >= 0) {
3060 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3064 if (envmatch > envmtcex) {
3067 sessenvs.Rep(env->
fName.c_str(), env, 0, Hash_keepdata);
3072 sessenvs.Add(env->
fName.c_str(), env, 0, Hash_keepdata);
3074 TRACE(HDBG,
"Adding: "<<(*ienvs).fEnv);
3087 XrdOucString env, namelist;
3088 int from = 0, ieq = -1;
3089 while ((from = ue.tokenize(env, from,
',')) != -1) {
3090 if (env.length() > 0 && (ieq = env.find(
'=')) != -1) {
3093 ev =
new char[env.length()+1];
3094 strncpy(ev, env.c_str(), env.length());
3095 ev[env.length()] = 0;
3097 fprintf(fenv,
"%s\n", ev);
3100 if (namelist.length() > 0)
3106 len = strlen(
"PROOF_ALLVARS=") + namelist.length() + 2;
3108 snprintf(ev, len,
"PROOF_ALLVARS=%s", namelist.c_str());
3110 fprintf(fenv,
"%s\n", ev);
3118 TRACE(DBG,
"creating symlink");
3119 XrdOucString syml = udir;
3121 syml +=
"/last-worker-session";
3123 syml +=
"/last-master-session";
3125 TRACE(XERR,
"problems creating symlink to last session (errno: "<<errno<<
")");
3138 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnv")
3143 TRACE(REQ,
"ROOT dir: "<< (
r ?
r->Dir() :
"*** undef ***"));
3146 char *libdir = (
char *)
r->LibDir();
3149 len = 32 + strlen(libdir) + strlen(mgr->
BareLibPath());
3150 ldpath =
new char[len];
3153 len = 32 + strlen(libdir);
3154 ldpath =
new char[len];
3159 char *rootsys = (
char *)
r->Dir();
3160 len = 15 + strlen(rootsys);
3162 snprintf(ev, len,
"ROOTSYS=%s", rootsys);
3166 char *bindir = (
char *)
r->BinDir();
3167 len = 15 + strlen(bindir);
3169 snprintf(ev, len,
"ROOTBINDIR=%s", bindir);
3173 char *confdir = (
char *)
r->DataDir();
3174 len = 20 + strlen(confdir);
3176 snprintf(ev, len,
"ROOTCONFDIR=%s", confdir);
3180 len = 20 + strlen(mgr->
TMPdir());
3190 TRACE(XERR,
"XrdROOT instance undefined!");
3198 const char *sessiondir,
3200 XrdOucString &outfn)
3203 XrdOucString ord = xps->
Ordinal();
3207 if (host.find(
".") != STR_NPOS)
3208 host.erase(host.find(
"."));
3211 else role =
"master";
3216 XPDFORM(outfn,
"%s/%s-%s-%s.%s",
3230 XrdOucString &sesstag, XrdOucString &topsesstag,
3231 XrdOucString &sessiondir, XrdOucString &sesswrkdir)
3233 XPDLOC(SMGR,
"GetTagDirs")
3242 if (host.find(
".") != STR_NPOS)
3243 host.erase(host.find(
"."));
3244 XPDFORM(sesstag,
"%s-%d-", host.c_str(), (int)time(0));
3249 sessiondir +=
"/session-";
3250 sessiondir += sesstag;
3251 topsesstag = sesstag;
3254 sessiondir += xps->
Tag();
3255 topsesstag = xps->
Tag();
3256 topsesstag.replace(
"session-",
"");
3260 TRACE(XERR,
"problems asserting dir '"<<sessiondir<<
"' - errno: "<<errno);
3265 }
else if (pid > 0) {
3272 topsesstag = sesstag;
3274 xps->
SetTag(sesstag.c_str());
3278 if (pid == (
int) getpid()) {
3286 sesswrkdir = sessiondir;
3288 XPDFORM(sesswrkdir,
"%s/worker-%s-%s", sessiondir.c_str(), xps->
Ordinal(), sesstag.c_str());
3290 XPDFORM(sesswrkdir,
"%s/master-%s-%s", sessiondir.c_str(), xps->
Ordinal(), sesstag.c_str());
3293 TRACE(XERR,
"negative pid ("<<pid<<
"): should not have got here!");
3305 XPDLOC(SMGR,
"WriteSessRCs")
3308 FILE *frc = (FILE *)f;
3310 XrdOucString rc = erc->
fEnv;
3311 if (rc.length() > 0) {
3312 if (rc.find(
"Proof.DataSetManager") != STR_NPOS) {
3313 TRACE(ALL,
"Proof.DataSetManager ignored: use xpd.datasetsrc to define dataset managers");
3315 fprintf(frc,
"%s\n", rc.c_str());
3321 emsg =
"file or input entry undefined";
3325 TRACE(XERR,
"protocol error: "<<emsg);
3334 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnv")
3337 if (!p || !p->
Client() || !input) {
3338 TRACE(XERR,
"at leat one input is invalid - cannot continue");
3344 TRACE(DBG,
"rootvers: "<< rootvers);
3345 if (rootvers < 14 && rootvers > -1)
3353 TRACE(XERR,
"unable to get instance of proofserv proxy");
3356 int psid = xps->
ID();
3375 TRACE(XERR,
"problems setting basic environment - exit");
3380 TRACE(DBG,
"creating rc and env files");
3381 XrdOucString rcfile, envfile;
3384 TRACE(XERR,
"problems creating RC file "<<rcfile.c_str());
3390 TRACE(XERR,
"problems creating environment file "<<envfile.c_str());
3396 TRACE(REQ,
"creating symlink");
3397 XrdOucString syml = udir;
3399 syml +=
"/last-worker-session";
3401 syml +=
"/last-master-session";
3403 TRACE(XERR,
"problems creating symlink to " 3404 " last session (errno: "<<errno<<
")");
3418 const char *envfn,
const char *rcfn)
3420 XPDLOC(SMGR,
"ProofServMgr::CreateProofServEnvFile")
3423 if (!p || !input || (!envfn ||
3424 (envfn && strlen(envfn) <= 0)) || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3425 TRACE(XERR,
"invalid inputs!");
3435 TRACE(XERR,
"unable to get instance of proofserv proxy");
3439 FILE *fenv = fopen(envfn,
"w");
3441 TRACE(XERR,
"unable to open env file: "<<envfn);
3444 TRACE(REQ,
"environment file: "<< envfn);
3452 XrdOucString secenvs(getenv(
"XrdSecENVS"));
3453 if (secenvs.length() > 0) {
3457 while ((from = secenvs.tokenize(env, from,
',')) != -1) {
3458 if (env.length() > 0) {
3460 ev =
new char[env.length()+1];
3461 strncpy(ev, env.c_str(), env.length());
3462 ev[env.length()] = 0;
3463 fprintf(fenv,
"%s\n", ev);
3471 XrdSecCredentials *creds = p->
AuthProt()->getCredentials();
3473 int lev = strlen(
"XrdSecCREDS=") + creds->size;
3474 ev =
new char[lev+1];
3475 strncpy(ev,
"XrdSecCREDS=", lev);
3476 memcpy(ev+strlen(
"XrdSecCREDS="), creds->buffer, creds->size);
3479 TRACE(DBG,
"XrdSecCREDS set");
3483 credsdir +=
"/.creds";
3487 TRACE(DBG,
"problems in saving authentication creds under "<<credsdir);
3490 TRACE(XERR,
"unable to create creds dir: "<<credsdir);
3502 fprintf(fenv,
"ROOTSYS=%s\n", xps->
ROOT()->
Dir());
3505 fprintf(fenv,
"ROOTCONFDIR=%s\n", xps->
ROOT()->
Dir());
3508 fprintf(fenv,
"TMPDIR=%s\n",
fMgr->
TMPdir());
3512 len = strlen(
"ROOTRCFILE=") + strlen(rcfn) + 2;
3514 snprintf(ev, len,
"ROOTRCFILE=%s", rcfn);
3515 fprintf(fenv,
"%s\n", ev);
3521 len = strlen(
"ROOTVERSIONTAG=") + strlen(p->
Client()->
ROOT()->
Tag()) + 2;
3524 fprintf(fenv,
"%s\n", ev);
3530 len = strlen(
"ROOTPROOFLOGFILE=") + in->
fLogFile.length() + 2;
3533 fprintf(fenv,
"%s\n", ev);
3540 XrdOucString locdatasrv;
3553 TRACE(HDBG, nrk <<
" placeholders resolved for LOCALDATASERVER");
3554 len = strlen(
"LOCALDATASERVER=") + locdatasrv.length() + 2;
3556 snprintf(ev, len,
"LOCALDATASERVER=%s", locdatasrv.c_str());
3557 fprintf(fenv,
"%s\n", ev);
3563 len = strlen(
"XRDCF=") + strlen(
CfgFile()) + 2;
3566 fprintf(fenv,
"%s\n", ev);
3575 XrdOucHash<XpdEnv> sessenvs;
3580 if (envmatch >= 0) {
3581 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3585 if (envmatch > envmtcex) {
3588 sessenvs.Rep(env->
fName.c_str(), env, 0, Hash_keepdata);
3593 sessenvs.Add(env->
fName.c_str(), env, 0, Hash_keepdata);
3595 TRACE(HDBG,
"Adding: "<<(*ienvs).fEnv);
3607 XrdOucString env, namelist;
3608 int from = 0, ieq = -1;
3609 while ((from = ue.tokenize(env, from,
',')) != -1) {
3610 if (env.length() > 0 && (ieq = env.find(
'=')) != -1) {
3613 ev =
new char[env.length()+1];
3614 strncpy(ev, env.c_str(), env.length());
3615 ev[env.length()] = 0;
3616 if (env.find(
"WRAPPERCMD") == STR_NPOS || !xps->
IsPLite())
3617 fprintf(fenv,
"%s\n", ev);
3620 if (env.find(
"WRAPPERCMD") == STR_NPOS || !xps->
IsPLite()) {
3622 if (namelist.length() > 0)
3629 len = strlen(
"PROOF_ALLVARS=") + namelist.length() + 2;
3631 snprintf(ev, len,
"PROOF_ALLVARS=%s", namelist.c_str());
3632 fprintf(fenv,
"%s\n", ev);
3649 void *input,
const char *rcfn)
3651 XPDLOC(SMGR,
"ProofServMgr::CreateProofServRootRc")
3654 if (!p || !input || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3655 TRACE(XERR,
"invalid inputs!");
3665 TRACE(XERR,
"unable to get instance of proofserv proxy");
3668 int psid = xps->
ID();
3670 FILE *frc = fopen(rcfn,
"w");
3672 TRACE(XERR,
"unable to open rootrc file: "<<rcfn);
3678 TRACE(XERR,
"problems creating symlink to 'session.rootrc' (errno: "<<errno<<
")");
3681 TRACE(REQ,
"session rootrc file: "<< rcfn);
3684 fprintf(frc,
"# XrdProofdProtocol listening port\n");
3685 fprintf(frc,
"ProofServ.XpdPort: %d\n",
fMgr->
Port());
3689 fprintf(frc,
"# Prefix to be prepended to local paths\n");
3696 if (!purl.endswith(
"/"))
3698 fprintf(frc,
"# URL for the data pool entry-point\n");
3699 fprintf(frc,
"ProofServ.PoolUrl: %s\n", purl.c_str());
3704 fprintf(frc,
"# The session working dir\n");
3705 fprintf(frc,
"ProofServ.SessionDir: %s\n", in->
fWrkDir.c_str());
3709 fprintf(frc,
"# Proof Log/Debug level\n");
3710 fprintf(frc,
"Proof.DebugLevel: %d\n", in->
fLogLevel);
3713 fprintf(frc,
"# Ordinal number\n");
3714 fprintf(frc,
"ProofServ.Ordinal: %s\n", xps->
Ordinal());
3718 fprintf(frc,
"# ROOT Version tag\n");
3719 fprintf(frc,
"ProofServ.RootVersionTag: %s\n", p->
Client()->
ROOT()->
Tag());
3723 fprintf(frc,
"# Proof group\n");
3724 fprintf(frc,
"ProofServ.ProofGroup: %s\n", p->
Client()->
Group());
3729 fprintf(frc,
"# File with group information\n");
3735 fprintf(frc,
"# Users sandbox\n");
3736 fprintf(frc,
"ProofServ.Sandbox: %s\n", udir.c_str());
3740 fprintf(frc,
"# Server image\n");
3741 fprintf(frc,
"ProofServ.Image: %s\n",
fMgr->
Image());
3746 fprintf(frc,
"# Session tag\n");
3747 fprintf(frc,
"ProofServ.SessionTag: %s\n", in->
fSessionTag.c_str());
3748 fprintf(frc,
"# Top Session tag\n");
3749 fprintf(frc,
"ProofServ.TopSessionTag: %s\n", in->
fTopSessionTag.c_str());
3753 fprintf(frc,
"# Session admin path\n");
3755 if (proofvrs < 0 || proofvrs < 27) {
3757 fprintf(frc,
"ProofServ.AdminPath: %s\n", xps->
AdminPath());
3761 fprintf(frc,
"ProofServ.AdminPath: %s.status\n", xps->
AdminPath());
3767 fprintf(frc,
"# Whether user specific config files are enabled\n");
3768 fprintf(frc,
"ProofServ.UseUserCfg: 1\n");
3771 fprintf(frc,
"# Open socket\n");
3772 fprintf(frc,
"ProofServ.OpenSock: %s\n", xps->
UNIXSockPath());
3774 fprintf(frc,
"# Entity\n");
3776 fprintf(frc,
"ProofServ.Entity: %s:%s@%s\n",
3779 fprintf(frc,
"ProofServ.Entity: %s@%s\n", p->
Client()->
User(), p->
Link()->Host());
3783 fprintf(frc,
"# Session ID\n");
3784 fprintf(frc,
"ProofServ.SessionID: %d\n", psid);
3787 fprintf(frc,
"# Client ID\n");
3788 fprintf(frc,
"ProofServ.ClientID: %d\n", p->
CID());
3791 fprintf(frc,
"# Client Protocol\n");
3792 fprintf(frc,
"ProofServ.ClientVersion: %d\n", p->
ProofProtocol());
3795 if (in->
fCfg.length() > 0) {
3796 if (in->
fCfg ==
"masteronly") {
3797 fprintf(frc,
"# MasterOnly option\n");
3799 fprintf(frc,
"Proof.MasterOnly: 1\n");
3801 fprintf(frc,
"# Config file\n");
3803 fprintf(frc,
"ProofServ.ProofConfFile: %s\n", in->
fCfg.c_str());
3806 fprintf(frc,
"# Config file\n");
3808 fprintf(frc,
"ProofServ.ProofConfFile: sm:\n");
3810 fprintf(frc,
"ProofServ.ProofConfFile: lite:\n");
3811 fprintf(frc,
"# Number of ProofLite workers\n");
3812 fprintf(frc,
"ProofLite.Workers: %d\n", xps->
PLiteNWrks());
3813 fprintf(frc,
"# Users sandbox\n");
3814 fprintf(frc,
"ProofLite.Sandbox: %s\n", udir.c_str());
3815 fprintf(frc,
"# No subpaths\n");
3816 fprintf(frc,
"ProofLite.SubPath: 0\n");
3818 fprintf(frc,
"ProofServ.ProofConfFile: %s\n",
fProofPlugin.c_str());
3824 fprintf(frc,
"# Default settings for XrdClient\n");
3825 fprintf(frc,
"XNet.FirstConnectMaxCnt 3\n");
3826 fprintf(frc,
"XNet.ConnectTimeout 5\n");
3831 fprintf(frc,
"# Force remote reading also for local files to avoid a wrong TTreeCache initialization\n");
3832 fprintf(frc,
"Path.ForceRemote 1\n");
3838 fprintf(frc,
"# Additional rootrcs (xpd.putrc directives)\n");
3840 XrdOucHash<XpdEnv> sessrcs;
3846 XpdEnv *rcenv = sessrcs.Find((*ircs).fName.c_str());
3850 if (rcmatch > rcmtcex) {
3853 sessrcs.Rep(rcenv->
fName.c_str(), rcenv, 0, Hash_keepdata);
3858 sessrcs.Add(rcenv->
fName.c_str(), rcenv, 0, Hash_keepdata);
3860 TRACE(HDBG,
"Adding: "<<(*ircs).fEnv);
3868 fprintf(frc,
"# Dataset sources\n");
3869 XrdOucString rc(
"Proof.DataSetManager: ");
3870 std::list<XrdProofdDSInfo *>::iterator ii;
3879 rc += (*ii)->fObscure;
3881 fprintf(frc,
"%s\n", rc.c_str());
3886 fprintf(frc,
"# Dataset staging requests repository\n");
3892 fprintf(frc,
"# Data directory\n");
3900 fprintf(frc,
"%s\n", rc.c_str());
3920 XPDLOC(SMGR,
"ProofServMgr::CleanupLostProofServ")
3923 TRACE(REQ,
"disabled ...");
3927 TRACE(REQ,
"checking for orphalin proofserv processes ...");
3931 std::map<int,XrdOucString> procs;
3933 TRACE(DBG,
" no proofservs around: nothing to do");
3944 XrdOucRash<int, int> controlled, xrdproc;
3947 XrdOucHash<XrdOucString> sessionspaths;
3951 XrdOucString cmd, apath, pidpath, sessiondir, emsg, rest, after;
3952 std::map<int,XrdOucString>::iterator ip;
3953 for (ip = procs.begin(); ip != procs.end(); ip++) {
3956 if ((ia = cmd.find(
"xpdpath:")) != STR_NPOS) {
3957 cmd.tokenize(apath, ia,
' ');
3958 apath.replace(
"xpdpath:",
"");
3959 if (apath.length() <= 0) {
3960 TRACE(ALL,
"admin path not found; initial cmd line: "<<cmd);
3964 XPDFORM(pidpath,
"%s/xrootd.pid", apath.c_str());
3965 TRACE(ALL,
"pidpath: "<<pidpath);
3967 int *alive = xrdproc.Find(xpid);
3970 xrdproc.Add(xpid,
a);
3978 const char *subdir[2] = {
"activesessions",
"terminatedsessions"};
3979 for (
int i = 0; i < 2; i++) {
3980 XPDFORM(sessiondir,
"%s/%s", apath.c_str(), subdir[i]);
3981 if (!sessionspaths.Find(sessiondir.c_str())) {
3982 DIR *sdir = opendir(sessiondir.c_str());
3984 XPDFORM(emsg,
"cannot open '%s' - errno: %d", apath.c_str(), errno);
3985 TRACE(XERR, emsg.c_str());
3988 struct dirent *sent = 0;
3989 while ((sent = readdir(sdir))) {
3990 if (!strncmp(sent->d_name,
".", 1) || !strncmp(sent->d_name,
"..", 2))
3995 controlled.Add(ppid, ppid);
3998 sessionspaths.Add(sessiondir.c_str(), 0, 0, Hash_data_is_key);
4000 ok = (controlled.Find(pid)) ? 1 : ok;
4007 TRACE(ALL,
"process: "<<pid<<
" lost its controller: killing");
4030 XPDLOC(SMGR,
"ProofServMgr::CleanupProofServ")
4032 TRACE(REQ,
"all: "<<all<<
", usr: " << (usr ? usr :
"undef"));
4036 const char *pn =
"proofserv";
4043 TRACE(DBG,
"usr must be defined for all = FALSE");
4047 TRACE(DBG,
"problems getting info for user " << usr);
4055 DIR *dir = opendir(
"/proc");
4057 XrdOucString emsg(
"cannot open /proc - errno: ");
4059 TRACE(DBG, emsg.c_str());
4063 struct dirent *ent = 0;
4064 while ((ent = readdir(dir))) {
4065 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
4066 if (
DIGIT(ent->d_name[0])) {
4067 XrdOucString fn(
"/proc/", 256);
4071 FILE *ffn = fopen(fn.c_str(),
"r");
4073 XrdOucString emsg(
"cannot open file ");
4074 emsg += fn; emsg +=
" - errno: "; emsg += errno;
4079 bool xname = 1, xpid = 1, xppid = 1;
4080 bool xuid = (all) ? 0 : 1;
4083 char line[2048] = { 0 };
4084 while (fgets(
line,
sizeof(
line), ffn) &&
4085 (xname || xpid || xppid || xuid)) {
4087 if (xname && strstr(
line,
"Name:")) {
4088 if (!strstr(
line, pn))
4092 if (xpid && strstr(
line,
"Pid:")) {
4096 if (xppid && strstr(
line,
"PPid:")) {
4104 if (xuid && strstr(
line,
"Uid:")) {
4113 if (!xname && !xpid && !xppid && !xuid) {
4121 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4133 #elif defined(__sun) 4136 DIR *dir = opendir(
"/proc");
4138 XrdOucString emsg(
"cannot open /proc - errno: ");
4144 struct dirent *ent = 0;
4145 while ((ent = readdir(dir))) {
4146 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
4147 if (
DIGIT(ent->d_name[0])) {
4148 XrdOucString fn(
"/proc/", 256);
4152 int ffd =
open(fn.c_str(), O_RDONLY);
4154 XrdOucString emsg(
"cannot open file ");
4155 emsg += fn; emsg +=
" - errno: "; emsg += errno;
4161 bool xuid = (all) ? 0 : 1;
4165 if (read(ffd, &psi,
sizeof(psinfo_t)) !=
sizeof(psinfo_t)) {
4166 XrdOucString emsg(
"cannot read ");
4167 emsg += fn; emsg +=
": errno: "; emsg += errno;
4177 if (!strstr(psi.pr_fname, pn))
4183 if (refuid == psi.pr_uid)
4187 int ppid = psi.pr_ppid;
4195 if (!xname && !xppid && !xuid) {
4202 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4214 #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__) 4220 if ((ern = XrdProofdAux::GetMacProcList(&pl, np)) != 0) {
4221 XrdOucString emsg(
"cannot get the process list: errno: ");
4230 if (strstr(pl[ii].kp_proc.p_comm, pn)) {
4231 if (all || (
int)(pl[ii].kp_eproc.e_ucred.cr_uid) == refuid) {
4233 int ppid = pl[ii].kp_eproc.e_ppid;
4235 if (ppid != getpid()) {
4238 if (strstr(pl[jj].kp_proc.p_comm,
"xrootd") &&
4239 pl[jj].kp_proc.p_pid == ppid) {
4252 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4269 XrdOucString cmd =
"ps ";
4273 const char *cusr = (usr && strlen(usr) && fSuperUser) ? usr : fPClient->ID();
4275 const char *cusr = (usr && strlen(usr)) ? usr : 0;
4289 cmd +=
" | grep proofserv 2>/dev/null";
4293 snprintf(cpid, 10,
"%d", getpid());
4296 XrdOucString pids =
":";
4297 FILE *fp = popen(cmd.c_str(),
"r");
4299 char line[2048] = { 0 };
4300 while (fgets(
line,
sizeof(
line), fp)) {
4302 char *px = strstr(
line,
"xpd");
4306 char *
pi = strstr(px+3, cpid);
4311 TRACE(HDBG,
"found alternative parent ID: "<< ppid);
4319 from += strlen(cusr);
4327 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4351 const char *ord,
const char *stag)
4353 XPDLOC(SMGR,
"ProofServMgr::SetUserOwnerships")
4355 TRACE(REQ,
"enter");
4362 std::list<XrdProofdDSInfo *>::iterator ii;
4364 TRACE(ALL,
"Checking dataset source: url:"<<(*ii)->fUrl<<
", local:" 4365 <<(*ii)->fLocal<<
", rw:"<<(*ii)->fRW);
4366 if ((*ii)->fLocal && (*ii)->fRW) {
4375 TRACE(XERR,
"problems setting permissions 0755 on: "<<d);
4378 TRACE(XERR,
"problems asserting: "<<d);
4381 TRACE(XERR,
"problems setting permissions 0777 on: "<<d);
4384 TRACE(XERR,
"problems asserting: "<<d);
4396 XrdOucString dgr, dus[3];
4400 unsigned int mode = 0755;
4404 XPDFORM(dus[1],
"%s/%s", dus[0].c_str(), ord);
4405 XPDFORM(dus[2],
"%s/%s", dus[1].c_str(), stag);
4406 for (
int i = 0; i < 3; i++) {
4409 std::ios_base::fmtflags oflags = std::cerr.flags();
4410 TRACE(XERR,
"problems setting permissions "<< oct << mode<<
" on: "<<dus[i]);
4411 std::cerr.flags(oflags);
4414 TRACE(XERR,
"problems asserting: "<<dus[i]);
4419 TRACE(XERR,
"problems setting permissions 0777 on: "<<dgr);
4422 TRACE(XERR,
"problems asserting: "<<dgr);
4431 TRACE(XERR,
"can't change ownership of "<<creds);
4449 XPDLOC(SMGR,
"ProofServMgr::SetUserEnvironment")
4451 TRACE(REQ,
"enter");
4462 char *
h =
new char[len];
4469 char *u =
new char[len];
4472 TRACE(DBG,
"set "<<u);
4476 TRACE(DBG,
"setting ACLs");
4479 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
4481 TRACE(XERR,
"could not get privileges");
4492 if (XrdSysPriv::ChangePerm((uid_t)p->
Client()->
UI().
fUid,
4509 XrdOucString key; key += pid;
4518 XPDLOC(SMGR,
"BroadcastPriority")
4520 XpdBroadcastPriority_t *bp = (XpdBroadcastPriority_t *)
s;
4522 int nb = *(bp->fNBroadcast);
4529 ? bp->fGroupMgr->GetGroup(
ps->Group()) : 0;
4530 TRACE(DBG,
"group: "<<
g<<
", client: "<<
ps->Client());
4531 if (
g &&
g->Active() > 0) {
4532 TRACE(DBG,
"priority: "<<
g->Priority()<<
" active: "<<
g->Active());
4533 int prio = (int) (
g->Priority() * 100);
4534 ps->BroadcastPriority(prio);
4541 emsg =
"input entry undefined";
4545 TRACE(XERR,
"protocol error: "<<emsg);
4554 XPDLOC(SMGR,
"ProofServMgr::BroadcastClusterInfo")
4556 TRACE(REQ,
"enter");
4558 int tot = 0, act = 0;
4559 std::list<XrdProofdProofServ *>::iterator si =
fActiveSessions.begin();
4568 XPDPRT(
"tot: "<<tot<<
", act: "<<act);
4573 (*si)->SrvType() !=
kXPD_Worker) (*si)->SendClusterInfo(tot, act);
4577 TRACE(DBG,
"No master or submaster controlled by this manager");
4587 XPDLOC(SMGR,
"ProofServMgr::BroadcastPriorities")
4589 TRACE(REQ,
"enter");
4640 std::map<XrdProofdProtocol*,int>::iterator iter =
fDestroyTimes.begin();
4642 int rect = now - iter->second;
4644 if (p == iter->first) alive =
false;
4659 XPDLOC(SMGR,
"FreeClientID")
4661 int pid = *((
int *)
s);
4664 ps->FreeClientID(pid);
4670 TRACE(XERR,
"protocol error: undefined session!");
4690 XPDLOC(SMGR,
"CountTopMasters")
4692 int *ntm = (
int *)
s;
4700 emsg =
"input entry undefined";
4704 TRACE(XERR,
"protocol error: "<<emsg);
4713 XPDLOC(SMGR,
"ProofServMgr::CurrentSessions")
4715 TRACE(REQ,
"enter");
4739 if (!isWorker &&
s.find(
"<logfilemst>") != STR_NPOS) {
4741 if (lfr.endswith(
".log")) lfr.erase(lfr.rfind(
".log"));
4742 s.replace(
"<logfilemst>", lfr);
4743 }
else if (isWorker &&
s.find(
"<logfilewrk>") != STR_NPOS) {
4745 if (lfr.endswith(
".log")) lfr.erase(lfr.rfind(
".log"));
4746 s.replace(
"<logfilewrk>", lfr);
4750 if (getenv(
"USER") &&
s.find(
"<user>") != STR_NPOS) {
4751 XrdOucString usr(getenv(
"USER"));
4752 s.replace(
"<user>", usr);
4756 if (getenv(
"ROOTSYS") &&
s.find(
"<rootsys>") != STR_NPOS) {
4757 XrdOucString rootsys(getenv(
"ROOTSYS"));
4758 s.replace(
"<rootsys>", rootsys);
4778 fPid =
s ?
s->SrvPID() : -1;
4779 fID =
s ?
s->ID() : -1;
4784 fTag =
s ?
s->Tag() :
"";
4787 fROOTTag = (
s &&
s->ROOT())?
s->ROOT()->Tag() :
"";
4799 XPDLOC(SMGR,
"SessionInfo::FillProofServ")
4801 s.SetClient(
fUser.c_str());
4811 s.SetTag(
fTag.c_str());
4819 "' not availabe anymore: setting the default");
4833 XPDLOC(SMGR,
"SessionInfo::SaveToFile")
4840 TRACE(HDBG,
"session saved to file: "<<
file);
4843 FILE *fpid = fopen(
file,
"w");
4845 fprintf(fpid,
"%s %s\n",
fUser.c_str(),
fGroup.c_str());
4846 fprintf(fpid,
"%s\n",
fUnixPath.c_str());
4849 fprintf(fpid,
"%s\n",
fLogFile.c_str());
4852 fprintf(fpid,
"\n%s",
fUserEnvs.c_str());
4857 if (chmod(
file, 0666) != 0) {
4858 TRACE(XERR,
"could not change mode to 0666 on file "<<
4859 file<<
"; error: "<<errno);
4865 TRACE(XERR,
"session pid file cannot be (re-)created: "<<
4866 file<<
"; error: "<<errno);
4899 XPDLOC(SMGR,
"SessionInfo::ReadFromFile")
4910 FILE *fpid = fopen(
file,
"r");
4913 XrdOucString sline, t;
4915 if (fgets(
line,
sizeof(
line), fpid)) {
4918 if ((from = sline.tokenize(
fUser, from,
' ')) == -1)
4919 TRACE(XERR,
"warning: fUser: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4920 if ((from = sline.tokenize(
fGroup, from,
' ')) == -1)
4921 TRACE(XERR,
"warning: fGroup: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4923 if (fgets(
line,
sizeof(
line), fpid)) {
4927 if (fgets(
line,
sizeof(
line), fpid)) {
4931 if ((from = sline.tokenize(t, from,
' ')) == -1)
4932 TRACE(XERR,
"warning: fPid: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4934 if ((from = sline.tokenize(t, from,
' ')) == -1)
4935 TRACE(XERR,
"warning: fID: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4937 if ((from = sline.tokenize(t, from,
' ')) == -1)
4938 TRACE(XERR,
"warning: fSrvType: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4941 if (fgets(
line,
sizeof(
line), fpid)) {
4945 if ((from = sline.tokenize(
fOrdinal, from,
' ')) == -1)
4946 TRACE(XERR,
"warning: fOrdinal: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4947 if ((from = sline.tokenize(
fTag, from,
' ')) == -1)
4948 TRACE(XERR,
"warning: fTag: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4949 if ((from = sline.tokenize(
fAlias, from,
' ')) == -1)
4952 if (fgets(
line,
sizeof(
line), fpid)) {
4956 if (fgets(
line,
sizeof(
line), fpid)) {
4960 if ((from = sline.tokenize(t, from,
' ')) == -1)
4961 TRACE(XERR,
"warning: fSrvProtVers: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4963 if ((from = sline.tokenize(
fROOTTag, from,
' ')) == -1)
4964 TRACE(XERR,
"warning: fROOTTag: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4968 off_t lnow = lseek(fileno(fpid), (off_t) 0, SEEK_CUR);
4969 off_t ltot = lseek(fileno(fpid), (off_t) 0, SEEK_END);
4970 int left = (int)(ltot - lnow);
4973 int wanted = (left > 4095) ? 4095 : left;
4974 while ((len = read(fileno(fpid),
line, wanted)) < 0 &&
4977 if (len < 0 || len < wanted) {
4985 }
while (len > 0 && left > 0);
4992 if (!stat(
file, &st))
4995 TRACE(XERR,
"session file cannot be open: "<<
file<<
"; error: "<<errno);
5000 XrdOucString fs(
file);
5002 fpid = fopen(fs.c_str(),
"r");
5005 if (fgets(
line,
sizeof(
line), fpid)) {
5012 TRACE(DBG,
"no session status file for: "<< fs<<
"; session was probably terminated");
5027 XPDLOC(SMGR,
"XpdEnv::Matches")
5031 if (
fUsers.length() > 0) {
5032 XrdOucString u(usr);
5033 if ((nmtc = u.matches(
fUsers.c_str())) == 0)
return -1;
5041 XrdOucString
g(grp);
5042 if ((nmtcg =
g.matches(
fGroups.c_str())) == 0)
return -1;
5044 nmtcg = strlen(grp);
5048 TRACE(HDBG,
fEnv <<
", u:"<<usr<<
", g:"<<grp<<
" --> nmtc: "<<nmtc);
5065 int maj = -1, min = -1, ptc = -1, xv = ver;
5070 ptc = xv - min * 256;
5075 ptc = xv - min * 100;
5078 int vc = (maj << 16) + (min << 8) + ptc;
5089 XrdOucString vmi(
"-1"), vmx(
"-1");
5092 int min = ((
fVerMin - maj * 65536) >> 8);
5093 int ptc =
fVerMin - maj * 65536 - min * 256;
5094 XPDFORM(vmi,
"%d%d%d", maj, min, ptc);
5098 int min = ((
fVerMax - maj * 65536) >> 8);
5099 int ptc =
fVerMax - maj * 65536 - min * 256;
5100 XPDFORM(vmx,
"%d%d%d", maj, min, ptc);
5102 XrdOucString u(
"allusers"),
g(
"allgroups");
5107 "} svn:["<<
fSvnMin<<
","<<
fSvnMax<<
"] vers:["<<vmi<<
","<<vmx<<
"]");
int CreateSockPath(XrdProofdProofServ *xps, XrdProofdProtocol *p, unsigned int seq, XrdOucString &emsg)
Create the socket path for the starting session Return 0 on success, -1 on error (error message in 'e...
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
void SetParent(XrdClientID *cid)
XrdSysRecMutex * Mutex() const
int SetupProtocol(XrdNetPeer &peerpsrv, XrdProofdProofServ *xps, XrdOucString &e)
Setup the protocol object serving the peer described by 'peerpsrv'.
static int BroadcastPriority(const char *, XrdProofdProofServ *ps, void *s)
Run thorugh entries to broadcast the relevant priority.
void Reset(const char *n, const char *env, const char *usr=0, const char *grp=0, int smi=-1, int smx=-1, int vmi=-1, int vmx=-1)
int BroadcastPriorities()
Broadcast priorities to the active sessions.
XrdOucHash< XrdProofdProofServ > fSessions
void SetReconnectTime(bool on=1)
Change reconnecting status.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
int CurrentSessions(bool recalculate=0)
Return the number of current sessions (top masters)
static int ToVersCode(int ver, bool hex=0)
Transform version number ver (format patch + 100*minor + 10000*maj, e.g.
static constexpr double pi
const char * RootdExe() const
int Process(XrdProofdProtocol *p)
Process manager request.
const char * BareLibPath() const
void Print(const char *what)
Print the content of this env.
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
int CleanupProofServ(bool all=0, const char *usr=0)
Cleanup (kill) all 'proofserv' processes from the process table.
const char * Group() const
XrdROOT * DefaultVersion() const
bool IsClientRecovering(const char *usr, const char *grp, int &deadline)
Returns true (an the recovering deadline) if the client has sessions in recovering state; returns fal...
void ParseCreateBuffer(XrdProofdProtocol *p, XrdProofdProofServ *xps, XrdOucString &tag, XrdOucString &ord, XrdOucString &cffile, XrdOucString &uenvs, int &intwait)
Extract relevant quantities from the buffer received during a create request.
XrdProofdNetMgr * NetMgr() const
int SetUserOwnerships(XrdProofdProtocol *p, const char *ord, const char *stag)
Set user ownerships on some critical files or directories.
int SetProofServEnv(XrdProofdProtocol *p, void *in)
Set environment for proofserv.
const char * Client() const
std::map< XrdProofdProtocol *, int > fDestroyTimes
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
bool IsSessionSocket(const char *fpid)
Checks is fpid is the path of a session UNIX socket Returns TRUE is yes; cleans the socket if the ses...
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
int CreateProofServRootRc(XrdProofdProtocol *p, void *input, const char *rcfn)
Create in 'rcfn' the rootrc file for the proofserv being created return 0 on success, -1 on error.
#define TRACE(Flag, Args)
void UpdateCounter(int t, int n)
void SetSid(unsigned short sid)
int RmSession(const char *fpid)
Remove session file from the terminated sessions area.
XrdProofdProofServ * GetServObj(int id)
Get server at 'id'. If needed, increase the vector size.
virtual int MaxSessions() const
void SetPLiteNWrks(int n)
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
void Reset()
Reset this instance.
static constexpr double ps
static int FreeClientID(const char *, XrdProofdProofServ *ps, void *s)
Run through entries to reset the disconnecting client slots.
XrdNet * UNIXSock() const
XrdScheduler * Sched() const
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
XrdSysSemWait fProcessSem
#define kXPD_MasterMaster
XrdSrvBuffer * StartMsg() const
void ResolveKeywords(XrdOucString &s, ProofServEnv_t *in)
Resolve some keywords in 's' <logfileroot>, <user>, <rootsys>
void SetOrdinal(const char *o)
void SetAdminPath(const char *p)
const char * GetCfgFile() const
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
XrdSysRecMutex fRecoverMutex
int AddSession(XrdProofdProtocol *p, XrdProofdProofServ *s)
Add new active session.
#define kXPD_ClientMaster
short int ProofProtocol() const
const char * DataDir() const
kXR_int16 SrvProtVers() const
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
XrdSecProtocol * AuthProt() const
XrdOucString fParentExecs
const char * StageReqRepo() const
struct ClientRequestHdr header
XrdSysSemWait * ProcessSem()
const char * SockPathDir() const
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
int Recv(XpdMsg &msg)
Recv message from the pipe.
static int CountTopMasters(const char *, XrdProofdProofServ *ps, void *s)
Run thorugh entries to count top-masters.
void * XrdProofdProofServRecover(void *p)
Waiting for session to recover after an abrupt shutdown.
XrdSecCredsSaver_t fCredsSaver
int CreateProofServEnvFile(XrdProofdProtocol *p, void *input, const char *envfn, const char *rcfn)
Create in 'rcfn' the rootrc file for the proofserv being created return 0 on success, -1 on error.
static int GetVersionCode(const char *release)
Translate 'release' into a version code integer following the rules in $ROOTSYS/include/RVersion.h.
void ExtractEnv(char *, XrdOucStream *, XrdOucString &users, XrdOucString &groups, XrdOucString &rcval, XrdOucString &rcnam, int &smi, int &smx, int &vmi, int &vmx, bool &hex)
Extract env information from the stream 'cfg'.
static int ParsePidPath(const char *path, XrdOucString &before, XrdOucString &after)
Parse a path in the form of "<before>[.<pid>][.<after>]", filling 'rest' and returning 'pid'...
XrdProofSched * ProofSched() const
int Destroy(XrdProofdProtocol *p)
Handle a request to shutdown an existing session.
const char * UNIXSockPath() const
static int SymLink(const char *path, const char *link)
Create a symlink 'link' to 'path' Return 0 in case of success, -1 in case of error.
void SetProtocol(XrdProofdProtocol *p)
XrdProofGroupMgr * GroupsMgr() const
const char * User() const
void FormFileNameInSessionDir(XrdProofdProtocol *p, XrdProofdProofServ *xps, const char *sessiondir, const char *extension, XrdOucString &outfn)
int CreateAdminPath(XrdProofdProofServ *xps, XrdProofdProtocol *p, int pid, XrdOucString &emsg)
Create the admin path for the starting session Return 0 on success, -1 on error (error message in 'em...
XrdProofSessionInfo(XrdProofdClient *c, XrdProofdProofServ *s)
Construct from 'c' and 's'.
int SetProcessPriority(int pid, const char *usr, int &dp)
Change priority of process pid belonging to user, if needed.
std::list< XpdEnv > fProofServEnvs
bool IsReconnecting()
Return true if in reconnection state, i.e.
XrdOucString fTermAdminPath
XrdSysRecMutex fEnvsMutex
void Close()
If open, close and invalidated the pipe descriptors.
void DeleteUNIXSock()
Delete the current UNIX socket.
bool WorkerUsrCfg() const
XrdProofdClientMgr * fClientMgr
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
const char * Ordinal() const
void SetUNIXSockPath(const char *s)
int TouchSession(const char *fpid, const char *path=0)
Update the access time for the session pid file to the current time.
std::list< XrdProofdProofServ * > fActiveSessions
static int EUidAtStartup()
void DisconnectFromProofServ(int pid)
Change reconnecting status.
void SetGroup(const char *g)
int Matches(const char *usr, const char *grp, int ver=-1)
Check if this env applies to 'usr', 'grp, 'ver'.
std::list< XpdClientSessions * > * fRecoverClients
const char * NameSpace() const
void FillProofServ(XrdProofdProofServ &s, XrdROOTMgr *rmgr)
Fill 's' fields using the stored info.
void RegisterDirectives()
Register directives for configuration.
XrdProofSched * fProofSched
std::list< XrdProofdProofServ * > fProofServs
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
void SetFileout(const char *f)
int Attach(XrdProofdProtocol *p)
Handle a request to attach to an existing session.
int GetNClients(bool check)
Get the number of connected clients.
std::list< XpdEnv > fProofServRCs
int CheckFrequency() const
#define kXPD_MasterWorker
static XpdManagerCron_t fManagerCron
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
static int GetIDFromPath(const char *path, XrdOucString &emsg)
Extract an integer from a file.
const char * DataDirUrlOpts() const
int ResolveSession(const char *fpid)
Handle a request to recover a session after stop&restart.
void SetP(XrdProofdProtocol *p)
XrdROOT * GetVersion(const char *tag)
Return pointer to the ROOT version corresponding to 'tag' or 0 if not found.
#define XrdSysMutexHelper
XrdProofdSandbox * Sandbox() const
static int WriteSessEnvs(const char *, XpdEnv *env, void *s)
Run thorugh entries to broadcast the relevant priority.
struct XPClientProofRequest proof
const char * Host() const
void Reset()
Reset the content.
int Recover(XpdClientSessions *cl)
Handle a request to recover a session after stop&restart for a specific client.
#define TRACEP(p, act, x)
int CleanClientSessions(const char *usr, int srvtype)
Go through the sessions admin path and clean all sessions belonging to 'usr'.
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
#define XpdBadPGuard(g, u)
XrdProofdClientMgr * ClientMgr() const
int PrepareSessionRecovering()
Go through the active sessions admin path and prepare reconnection of those still alive...
int VerifySession(const char *fpid, int to=-1, const char *path=0)
Check if the session is alive, i.e.
std::list< XrdProofdDSInfo * > * DataSetSrcs()
int Detach(XrdProofdProtocol *p)
Handle a request to detach from an existing session.
static long int GetLong(char *str)
Extract first integer from string at 'str', if any.
void GetTagDirs(int opt, XrdProofdProtocol *p, XrdProofdProofServ *xps, XrdOucString &sesstag, XrdOucString &topsesstag, XrdOucString &sessiondir, XrdOucString &sesswrkdir)
Determine the unique tag and relevant dirs for this session.
void SetTag(const char *t)
int AddSession(const char *tag)
Record entry for new proofserv session tagged 'tag' in the active sessions file (<SandBox>/.sessions).
XrdOucString fProofPlugin
const char * TMPdir() const
const char * AdminPath() const
const char * Fileout() const
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
static int AssertDir(const char *path, XrdProofUI ui, bool changeown)
Make sure that 'path' exists and is owned by the entity described by 'ui'.
XrdROOTMgr * ROOTMgr() const
void TerminateSessions(int srvtype, XrdProofdProofServ *ref, const char *msg, XrdProofdPipe *pipe, bool changeown)
Terminate client sessions; IDs of signalled processes are added to sigpid.
XrdProofdProtocol * Protocol() const
XrdProofdProofServMgr * fSessionMgr
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found, return -1.
int CheckTerminatedSessions()
Go through the terminated sessions admin path and make sure sessions they are gone.
int ResolveKeywords(XrdOucString &s, XrdProofdClient *pcl)
Resolve special keywords in 's' for client 'pcl'.
const char * LocalROOT() const
int DoDirectiveString(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for a string.
void SendErrLog(const char *errlog, XrdProofdResponse *r)
Send content of errlog upstream asynchronously.
int CleanupLostProofServ()
Cleanup (kill) all 'proofserv' processes which lost control from their creator or controller daemon...
int SetProofServEnvOld(XrdProofdProtocol *p, void *in)
Set environment for proofserv; old version preparing the environment for proofserv protocol version <...
static constexpr double s
void SetValid(bool valid=1)
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
void SetUserEnvs(const char *t)
const char * AdminPath() const
const char * EffectiveUser() const
XrdProofdProofServ * PrepareProofServ(XrdProofdProtocol *p, XrdProofdResponse *r, unsigned short &sid)
Allocate and prepare the XrdProofdProofServ object describing this session.
static int ChangeMod(const char *path, unsigned int mode)
Change the permission mode of 'path' to 'mode'.
const char * PoolURL() const
int DoDirectiveShutdown(char *, XrdOucStream *, bool)
Process 'shutdown' directive.
const char * Export() const
#define XPD_SETRESP(p, x)
R__EXTERN C unsigned int sleep(unsigned int seconds)
static int WriteSessRCs(const char *, XpdEnv *erc, void *f)
Run thorugh entries to broadcast the relevant priority.
unsigned int fSeqSessionN
void FillEnvList(std::list< XpdEnv > *el, const char *nam, const char *val, const char *usrs=0, const char *grps=0, int smi=-1, int smx=-1, int vmi=-1, int vmx=-1, bool hex=0)
Fill env entry(ies) in the relevant list.
void SetNextSessionsCheck(int t)
XrdClientID * Parent() const
XrdProofdPriorityMgr * PriorityMgr() const
int Get(int &i)
Get next token and interpret it as an int.
const char * DataDirOpts() const
XrdProofdProofServMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
int Create(XrdProofdProtocol *p)
Handle a request to create a new session.
static int ChangeToDir(const char *dir, XrdProofUI ui, bool changeown)
Change current directory to 'dir'.
void * XrdProofdProofServCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
const char * CfgFile() const
XrdOucString fTopSessionTag
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
int DoDirectivePutRc(char *, XrdOucStream *, bool)
Process 'putrc' directives.
const char * Image() const
XrdProofdProofServ * GetActiveSession(int pid)
Return active session with process ID pid, if any.
int ReadFromFile(const char *file)
Read content from 'file'.
XrdProofdClient * GetClient(const char *usr, const char *grp=0, bool create=1)
Handle request for localizing a client instance for {usr, grp} from the list.
XrdProofdClient * Client() const
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
int Post(int type, const char *msg)
Post message on the pipe.
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table. ...
int AcceptPeer(XrdProofdProofServ *xps, int to, XrdOucString &e)
Accept a callback from a starting-up server and setup the related protocol object.
int RecoverActiveSessions()
Accept connections from sessions still alive.
int DoDirectivePutEnv(char *, XrdOucStream *, bool)
Process 'putenv' directives.
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
static int GetProcesses(const char *pn, std::map< int, XrdOucString > *plist)
Get from the process table list of PIDs for processes named "proofserv' For {linux, sun, macosx} it uses the system info; for other systems it invokes the command shell 'ps ax' via popen.
static void LogEmsgToFile(const char *flog, const char *emsg, const char *pfx=0)
Logs error message 'emsg' to file 'flog' using standard technology.
XrdOucString fActiAdminPath
int fCounters[PSMMAXCNTS]
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
XrdProofdProofServ * GetFreeServObj()
Get next free server ID.
virtual int Config(bool rcf=0)
const char * UserEnvs() const
bool ReadFile(bool update=true)
Return true if the file has never been read or did change since last reading, false otherwise...
XrdProofdClient * fClient
void SetClient(const char *c)
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
void Register(const char *dname, XrdProofdDirective *d)
XPClientRequest * Request() const
int SaveToFile(const char *file)
Save content to 'file'.
int DoDirectiveProofServMgr(char *, XrdOucStream *, bool)
Process 'proofswrvmgr' directive eg: xpd.proofswrvmgr checkfq:120 termto:100 verifyto:5 recoverto:20...
void BroadcastClusterInfo()
Broadcast cluster info to the active sessions.
int SetUserEnvironment(XrdProofdProtocol *p)
Set user environment: set effective user and group ID of the process to the ones of the owner of this...
const char * PrgmSrv() const
static constexpr double g