00001
00002 #include "cafe/Write.hpp"
00003 #include "cafe/Config.hpp"
00004 #include "cafe/Event.hpp"
00005
00006 #include <cstdlib>
00007 #include <iostream>
00008 #include <sstream>
00009 #include <algorithm>
00010 #include <cassert>
00011 #include "TFile.h"
00012 #include "TTree.h"
00013 #include "TBranchRef.h"
00014 #include "TRefTable.h"
00015 #include "TObjArray.h"
00016 #include "TBranchElement.h"
00017
00018 namespace {
00019
00020
00021 void enableBranch(TBranch *br)
00022 {
00023 if(br != 0) {
00024 br->ResetBit(kDoNotProcess);
00025 TIter iter(br->GetListOfBranches());
00026 while(TBranch *sub = (TBranch *)iter.Next()) {
00027 enableBranch(sub);
00028 }
00029 }
00030 }
00031
00032
00033 void disableBranch(TBranch *br)
00034 {
00035 if(br != 0) {
00036 br->SetBit(kDoNotProcess);
00037 TIter iter(br->GetListOfBranches());
00038 while(TBranch *sub = (TBranch *)iter.Next()) {
00039 disableBranch(sub);
00040 }
00041 }
00042 }
00043
00044
00045 }
00046
00047 namespace cafe {
00048
00049 Write::Write(const char *name)
00050 : Processor(name),
00051 _file(0),
00052 _tree(0),
00053 _fileOwner(false),
00054 _updateObjectVersion(true)
00055 {
00056 Config config(name);
00057 _pattern.set(config.get("File", ""));
00058 _branches = config.getVString("Branches", " ,");
00059 if(_branches.empty()) _branches = config.getVString("Enable", " ,");
00060 _disable = config.getVString("Disable", " ,");
00061 _treeName = config.get("Tree", "TMBTree");
00062 _autosave = config.get("AutoSave", -1);
00063 _destination = config.get("Destination", "");
00064 _filesPerOutput = config.get("FilesPerOutput", 0);
00065 _filesPerOutputDone = _filesPerOutput;
00066 _updateObjectVersion = config.get("UpdateObjectVersion", 1);
00067 }
00068
00069 Write::~Write()
00070 {
00071 }
00072
00073
00074 void Write::finish()
00075 {
00076 if(_file && _fileOwner) {
00077 _file->Write();
00078 _file->Close();
00079 if(!_destination.empty()) {
00080 copy_file();
00081 }
00082 delete _file;
00083 }
00084 }
00085
00086
00087 bool Write::processEvent(Event& event)
00088 {
00089 if(_tree) {
00090 TTree *tree = event.getTree();
00091 Long64_t current = tree->GetReadEntry();
00092
00093
00094
00095
00096 TIter iter(_tree->GetListOfBranches());
00097 while(TBranch *br = (TBranch *)iter.Next()) {
00098
00099
00100 if(!br->TestBit(kDoNotProcess)) {
00101 event.readBranch(br->GetName());
00102 }
00103 }
00104
00105 _tree->Fill();
00106
00107 if(_autosave > 0 && (eventCount() % _autosave == 0)) {
00108 _tree->AutoSave();
00109 }
00110 }
00111 return true;
00112 }
00113
00114 void Write::inputFileOpened(TFile *file)
00115 {
00116 TTree *tree = (TTree *)file->Get(_treeName.c_str());
00117 if(tree == 0) tree = (TTree *)getDirectory()->Get(_treeName.c_str());
00118 if(tree == 0) tree = (TTree *)gROOT->Get(_treeName.c_str());
00119 if(tree == 0) {
00120 _tree = 0;
00121 err() << "Write[" << name() << "] No tree in file named: " << _treeName << std::endl;
00122 return;
00123 }
00124
00125
00126 if(_file && _filesPerOutput && (++_filesPerOutputDone < _filesPerOutput)) {
00127
00128 tree->CopyAddresses(_tree);
00129 return;
00130 }
00131
00132
00133 std::string new_name = _pattern.replace(file->GetName());
00134 if(_file) {
00135 if(new_name == _file->GetName()) {
00136
00137 tree->CopyAddresses(_tree);
00138 return;
00139 }
00140 if(_fileOwner) {
00141 _file->Write();
00142 _file->Close();
00143 if(!_destination.empty()) {
00144 copy_file();
00145 }
00146 delete _file;
00147 }
00148 _file = 0;
00149 }
00150
00151 _filesPerOutputDone = 0;
00152
00153 _file = dynamic_cast<TFile*>(gROOT->GetFile(new_name.c_str()));
00154
00155
00156 if(_file == 0) {
00157 out() << "Write[" << name() << "] Opening new output file: " << new_name << std::endl;
00158 _file = new TFile(new_name.c_str(), "RECREATE");
00159 _fileOwner = true;
00160 } else {
00161 out() << "Write[" << name() << "] Using existing file: " << new_name << std::endl;
00162 _fileOwner = false;
00163 }
00164
00165
00166
00167 if(!_branches.empty()) {
00168
00169 tree->SetBranchStatus("*",0);
00170
00171 for(std::vector<std::string>::iterator it = _branches.begin();
00172 it != _branches.end();
00173 ++it) {
00174 out() << "Write[" << name() << "] Enabling branch: " << *it << std::endl;
00175 if(TBranch *br = tree->GetBranch((*it).c_str())) {
00176 enableBranch(br);
00177 } else {
00178 err() << "Write[" << name() << "] Branch not found: " << *it << std::endl;
00179 _branches.erase(it--);
00180 }
00181 }
00182
00183 } else if(!_disable.empty()) {
00184
00185 tree->SetBranchStatus("*", 1);
00186
00187
00188 for(std::vector<std::string>::iterator it = _disable.begin();
00189 it != _disable.end();
00190 ++it) {
00191 out() << "Write[" << name() << "] Disabling branch: " << *it << std::endl;
00192 if(TBranch *br = tree->GetBranch((*it).c_str())) {
00193 disableBranch(br);
00194 } else {
00195 err() << "Write[" << name() << "] Branch not found: " << *it << std::endl;
00196 _disable.erase(it--);
00197 }
00198 }
00199 }
00200
00201
00202
00203
00204
00205
00206 _updatedBranches.clear();
00207
00208 if (_updateObjectVersion) {
00209 TIter branchNext (tree->GetListOfBranches());
00210 while (TBranch *br = static_cast<TBranch*>(branchNext.Next())) {
00211 if (!br->TestBit(kDoNotProcess) && containsUpdatedClass (br)) {
00212 _updatedBranches.push_back (br->GetName());
00213 disableBranch(br);
00214 }
00215 }
00216 }
00217
00218
00219 _tree = tree->CloneTree(0);
00220
00221
00222 tree->SetBranchStatus("*",1);
00223
00224 for(std::vector<std::string>::iterator it = _updatedBranches.begin();
00225 it != _updatedBranches.end();
00226 ++it) {
00227 out() << "Write[" << name() << "] Updating branch: " << *it << std::endl;
00228 updateBranch(tree, *it);
00229 }
00230
00231
00232 if(TBranchRef *ref = _tree->GetBranchRef()) {
00233
00234 ref->Reset();
00235 if(TRefTable *table = ref->GetRefTable()) {
00236
00237
00238 if(TObjArray *parents = table->GetParents()) {
00239 parents->Clear();
00240 }
00241 }
00242 } else {
00243
00244
00245 _tree->BranchRef();
00246 }
00247
00248
00249
00250
00251
00252 if(TList *fr = _tree->GetListOfFriends()) {
00253 fr->Clear();
00254 }
00255
00256 }
00257
00261 bool Write::containsUpdatedClass (TBranch *br)
00262 {
00263 TBranchElement *be = static_cast<TBranchElement*>(br);
00264
00265 int tree_version = be->GetInfo()->GetClassVersion();
00266 int class_version = TClass::GetClass(be->GetClassName())->GetStreamerInfo()->GetClassVersion();
00267 if (tree_version < class_version) {
00268 return true;
00269 }
00270
00271 TIter branchNext (be->GetListOfBranches());
00272 while (TBranchElement *be = static_cast<TBranchElement*>(branchNext.Next())) {
00273 if (containsUpdatedClass(be)) {
00274 return true;
00275 }
00276 }
00277 return false;
00278 }
00279
00280 void Write::updateBranch(TTree *input_tree, const std::string& branch_name)
00281 {
00282 TBranch *br = input_tree->GetBranch(branch_name.c_str());
00283 assert(br != 0);
00284
00285
00286
00287 br->GetEntry(0);
00288
00290 void* obj = br->GetAddress();
00291 if (obj != 0) {
00292 if (std::string(br->GetClassName()) == "TClonesArray") {
00293 _tree->Branch(branch_name.c_str(), "TClonesArray", obj);
00294 } else {
00295 _tree->Branch(branch_name.c_str(), br->GetClassName(), obj);
00296 }
00297 }
00298 }
00299
00300 void Write::copy_file()
00301 {
00302 std::ostringstream s;
00303
00304 if(_destination.find(':') != std::string::npos) {
00305 s << "/usr/krb5/bin/kbatch && /usr/krb5/bin/rcp "
00306 << _file->GetName() << " "
00307 << _destination;
00308 } else {
00309 s << "cp "
00310 << _file->GetName() << " "
00311 << _destination;
00312 }
00313 out() << "Write[" << name() << "] : Copied " << _file->GetName()
00314 << " to " << _destination << std::endl;
00315
00316 if(system(s.str().c_str()) == 0) {
00317 remove(_file->GetName());
00318 out() << "Write[" << name() << "] : Removed " << _file->GetName() << std::endl;
00319 } else {
00320 err() << "Write[" << name() << "] : Cannot copy " << _file->GetName()
00321 << "to" << _destination << std::endl;
00322 }
00323 }
00324
00325
00326 }
00327
00328 ClassImp(cafe::Write)