146 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
156 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
237 if (entry == -1) entry=0;
266 if (
b->GetDirectory()==0)
continue;
267 if (
b->GetDirectory()->GetFile() !=
fFile)
continue;
268 Int_t nb =
b->GetMaxBaskets();
269 Int_t *lbaskets =
b->GetBasketBytes();
271 if (!lbaskets || !entries)
continue;
274 Int_t blistsize =
b->GetListOfBaskets()->GetSize();
275 for (
Int_t j=0;j<nb;j++) {
277 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
280 Int_t len = lbaskets[j];
281 if (pos <= 0 || len <= 0)
continue;
284 if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry))
continue;
287 if (j<nb-1) emax = entries[j+1]-1;
288 if (!elist->
ContainsRange(entries[j]+chainOffset,emax+chainOffset))
continue;
426 if (
gDebug > 0)
Info(
"SendSignal",
" fUnzipCondition->Signal()");
456 class TTreeCacheUnzipData {
471 if (nt > 10) nt = 10;
474 Info(
"StartThreadUnzip",
"Going to start %d threads.", nt);
476 for (
Int_t i = 0; i < nt; i++) {
482 Info(
"StartThreadUnzip",
"Going to start thread '%s'",
nm.Data());
484 TTreeCacheUnzipData *d =
new TTreeCacheUnzipData;
490 Error(
"TTreeCacheUnzip::StartThreadUnzip",
" Unable to create new thread.");
515 for (
Int_t i = 0; i < 1; i++) {
542 TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
548 Int_t thrnum = d->fCount;
549 Int_t startindex = thrnum;
550 Int_t locbuffsz = 16384;
551 char *locbuff =
new char[16384];
560 if (myCycle != unzipMng->
fCycle) startindex = thrnum;
561 myCycle = unzipMng->
fCycle;
562 if (unzipMng->
fNseek) startindex = startindex % unzipMng->
fNseek;
563 else startindex = -1;
567 res = unzipMng->
UnzipCache(startindex, locbuffsz, locbuff);
617 Int_t nread = maxbytes;
620 if (nb < 0)
return nread;
622 const Int_t headerSize = 16;
623 if (nread < headerSize)
return nread;
628 if (!olen) olen = nbytes-klen;
675 char **aUnzipChunks =
new char *[
fNseek];
676 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
740 char **aUnzipChunks =
new char *[
fNseek];
741 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
807 if ( myCycle !=
fCycle ) {
809 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
953 const Int_t hlen=128;
954 Int_t nbytes=0, objlen=0, keylen=0;
962 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
976 Bool_t oldCase = objlen==nbytes-keylen
980 if (objlen > nbytes-keylen || oldCase) {
983 memcpy(*
dest, src, keylen);
986 char *objbuf = *
dest + keylen;
996 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
997 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
998 if (oldCase && (nin > objlen || nbuf > objlen)) {
1000 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
1003 memcpy( *
dest + keylen, src + keylen, objlen);
1008 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1011 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1012 nin, bufcur, nbuf, objbuf, nout);
1016 if (noutot >= objlen)
break;
1021 if (noutot != objlen) {
1022 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1023 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1025 if(alloc)
delete [] *
dest;
1031 memcpy(*
dest, src, keylen);
1033 memcpy(*
dest + keylen, src + keylen, objlen);
1063 const Int_t hlen=128;
1064 Int_t objlen=0, keylen=0;
1068 Int_t idxtounzip = -1;
1076 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1098 rdoffs =
fSeek[idxtounzip];
1109 if (idxtounzip < 0) {
1111 Info(
"UnzipCache",
"Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1121 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1129 if(locbuffsz < rdlen) {
1130 if (locbuff)
delete [] locbuff;
1132 locbuff =
new char[locbuffsz];
1134 }
else if(locbuffsz > rdlen*3) {
1135 if (locbuff)
delete [] locbuff;
1136 locbuffsz = rdlen*2;
1137 locbuff =
new char[locbuffsz];
1142 Info(
"UnzipCache",
"Going to unzip block %d", idxtounzip);
1151 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1168 Info(
"UnzipCache",
"Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1174 Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1182 Info(
"UnzipCache",
"Block %d is too big, skipping.", idxtounzip);
1200 if ((loclen > 0) && (loclen == objlen+keylen)) {
1205 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1226 Info(
"UnzipCache",
"reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1227 idxtounzip, rdoffs, rdlen, loclen);
1233 Info(
"argh",
"loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1246 printf(
"******TreeCacheUnzip statistics for file: %s ******\n",
fFile->
GetName());
1248 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
1249 printf(
"Number of hits: %d\n",
fNFound);
1250 printf(
"Number of stalls: %d\n",
fNStalls);
1251 printf(
"Number of misses: %d\n",
fNMissed);
TCondition * fUnzipStartCondition
Used to signal the threads to start.
virtual const char * GetName() const
Returns name of object.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Int_t fNtot
Total size of prefetched blocks.
void frombuf(char *&buf, Bool_t *x)
Long64_t fEntryMax
! last entry in the cache
Int_t fNUnzip
! number of blocks that were unzipped
void WaitUnzipStartSignal()
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
TFile * fFile
Pointer to file.
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
Int_t fNStalls
! number of hits which caused a stall
TObjArray * fBranches
! List of branches to be stored in the cache
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
TMutex * fMutexList
Mutex to protect the various lists. Used by the condvars.
A specialized TFileCacheRead object for a TTree.
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Byte_t * fUnzipStatus
! [fNSeek] For each blk, tells us if it's unzipped or pending
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Int_t * fUnzipLen
! [fNseek] Length of the unzipped buffers
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Long64_t * GetTreeOffset() const
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
Bool_t fActiveThread
Used to terminate gracefully the unzippers.
Int_t fNFound
! number of blocks that were found in the cache
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
std::queue< Int_t > fActiveBlks
The blocks which are active now.
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Helper class to iterate over cluster of baskets.
static void * UnzipLoop(void *arg)
This is a static function.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void Print(Option_t *option="") const
Print cache statistics.
static Long_t SelfId()
Static method returning the id for the current thread.
Specialization of TTreeCache for parallel Unzipping.
Int_t Run(void *arg=0)
Start the thread.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
! pointer to the current Tree
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
Int_t fNseekMax
! fNseek can change so we need to know its max size
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
A TEventList object is a list of selected events (entries) in a TTree.
Long_t Join(void **ret=0)
Join this thread.
static constexpr double nm
Int_t fNReadPref
Number of blocks that were prefetched.
Long64_t fEntryCurrent
! current lowest entry number in the cache
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
TObject * UncheckedAt(Int_t i) const
virtual void Print(Option_t *option="") const
Print cache statistics.
Bool_t fIsLearning
! true if cache is in learning mode
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
virtual Int_t GetTreeNumber() const
virtual Int_t GetBufferSize() const
TCondition * fUnzipDoneCondition
Used to wait for an unzip tour to finish. Gives the Async feel.
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char ** fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
Int_t fNbranches
! Number of branches in the cache
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containing TTree objects.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
A TTree object has a header with a name and a title.
TEventList * GetEventList() const
virtual const char * GetName() const
Returns name of object.
A TTree is a list of TBranches.
Long64_t fEntryNext
! next entry number where cache must be filled
Long64_t fTotalUnzipBytes
! The total sum of the currently unzipped blks
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Int_t fNseek
Number of blocks to be prefetched.