00001 #ifndef Impala_Persistency_DistributedAccessRepositoryInFile_h
00002 #define Impala_Persistency_DistributedAccessRepositoryInFile_h
00003
00004 #include "Persistency/RepositoryInFileSystem.h"
00005 #include "Persistency/KernelMatrixLocator.h"
00006 #include "Core/Matrix/DistributedAccess.h"
00007 #include "Core/Matrix/VirtualMatrixFactory.h"
00008 #include "Core/Array/Set.h"
00009 #include "Core/Table/Read.h"
00010 #include "Core/Table/Write.h"
00011 #include "Core/Table/Copy.h"
00012 #include "Util/StringParser.h"
00013
00014 namespace Impala
00015 {
00016 namespace Persistency
00017 {
00018
00019
00020 class DistributedAccessRepositoryInFile
00021 {
00022 public:
00023
00024 typedef Core::Database::RawDataSet RawDataSet;
00025 typedef Core::Matrix::DistributedAccess DistributedAccess;
00026
00027 DistributedAccessRepositoryInFile()
00028 {
00029 }
00030
00031 bool
00032 Exists(const KernelMatrixLocator& loc)
00033 {
00034 String filename = loc.GetFeature() + ".info";
00035 String dir = GetDir(loc);
00036 Persistency::File file = RepFS().GetFile(loc, dir, filename, false,
00037 true);
00038 return file.Valid();
00039 }
00040
00041 DistributedAccess*
00042 Get(const KernelMatrixLocator& loc)
00043 {
00044 DistributedAccess* da = ReadInfoFile(loc);
00045 if (da)
00046 {
00047 LoadFeatures(da, loc);
00048 LoadQuids(da, loc);
00049 if (loc.GetDoParts() > 0)
00050 LoadParts(da, loc);
00051 }
00052 return da;
00053 }
00054
00055 void
00056 Add(const KernelMatrixLocator& loc, DistributedAccess* da)
00057 {
00058 if (!WriteInfoFile(da, loc))
00059 return;
00060 WriteFeatures(da, loc);
00061 WriteQuids(da, loc);
00062 if (loc.GetDoParts() > 0)
00063 {
00064 if (loc.GetDoParts() == 3)
00065 WritePartsAsOne(da, loc);
00066 else
00067 WriteParts(da, loc);
00068 }
00069 }
00070
00071 File
00072 ExposeFile(const KernelMatrixLocator& loc, bool toWrite)
00073 {
00074 String dir = GetDir(loc);
00075 String fName = loc.GetFeature() + ".info";
00076 bool silent = !toWrite;
00077 return RepFS().GetFile(loc, dir, fName, toWrite, silent);
00078 }
00079
00080 private:
00081
00082 RepositoryInFileSystem&
00083 RepFS()
00084 {
00085 return RepositoryInFileSystem::GetInstance();
00086 }
00087
00088
00089 String
00090 GetDir(const KernelMatrixLocator& loc)
00091 {
00092 String dir = loc.GetTopString();
00093 if (!loc.GetIsIndex())
00094 dir = FileNameConcat(dir, loc.GetWalkType());
00095 dir = FileNameConcat(dir, loc.GetAnnoSet());
00096 dir = FileNameConcat(dir, loc.GetFeatureIndexCat());
00097 dir = FileNameConcat(dir, loc.GetModel());
00098 dir = FileNameConcat(dir, loc.GetFeature());
00099 int inc = loc.GetIncrement();
00100 if (inc != 0)
00101 dir = FileNameConcat(dir, "inc_" + MakeString(inc));
00102 return dir;
00103 }
00104
00105 DistributedAccess*
00106 ReadInfoFile(const KernelMatrixLocator& loc)
00107 {
00108 String description;
00109 if (Link::Mpi::MyId() == loc.GetStartNode())
00110 {
00111 String filename = loc.GetFeature() + ".info";
00112 String dir = GetDir(loc);
00113 Persistency::File file = RepFS().GetFile(loc, dir, filename, false,
00114 false);
00115 if (!file.Valid())
00116 return 0;
00117
00118 Util::IOBuffer* buf = file.GetReadBuffer(true, "");
00119 if (!buf)
00120 {
00121 ILOG_ERROR("Could not open " << loc.GetFeature() << ".info");
00122 return 0;
00123 }
00124 description = buf->ReadLine();
00125 delete buf;
00126 }
00127 description = Link::Mpi::BroadcastString(description,
00128 loc.GetStartNode());
00129 Util::PropertySet properties(description);
00130 DistributedAccess* da = new DistributedAccess
00131 (properties.GetInt("totalrows"), properties.GetInt("totalcolumns"),
00132 properties.GetInt("rowparts"), properties.GetInt("columnparts"),
00133 loc.GetStartNode(), loc.GetNodeCount());
00134 return da;
00135 }
00136
00137 void
00138 LoadFeatures(DistributedAccess* da, const KernelMatrixLocator& loc)
00139 {
00140 String dir = GetDir(loc);
00141 String filename = loc.GetFeature() + ".input.txt";
00142 File file = RepFS().GetFile(loc, dir, filename, false, true);
00143 if (!file.Valid())
00144 return;
00145 std::vector<String> lines;
00146 file.ReadStrings(std::back_inserter(lines));
00147
00148 std::vector<double> weights;
00149 std::vector<String> features;
00150 for (int i=0 ; i<lines.size() ; i++)
00151 {
00152 Util::StringParser p(lines[i]);
00153 Real64 weight = p.GetDouble();
00154 String feature = p.GetString(' ', false);
00155 if ((weight == 0) || (feature.empty()))
00156 {
00157 ILOG_ERROR_HEADNODE("LoadFeatures: parse error in .input.txt");
00158 return;
00159 }
00160 weights.push_back(weight);
00161 features.push_back(feature);
00162 }
00163 if (features.size() == 0)
00164 {
00165 ILOG_ERROR_HEADNODE("LoadFeatures: no features found");
00166 return;
00167 }
00168
00169 filename = loc.GetFeature() + ".averages.raw";
00170 file = RepFS().GetFile(loc, dir, filename, false, true);
00171 std::vector<double> averages;
00172 if (file.Valid())
00173 {
00174 file.ReadNative(std::back_inserter(averages));
00175 if (features.size() != averages.size())
00176 {
00177 ILOG_ERROR_HEADNODE("LoadFeatures: averages do not match");
00178 return;
00179 }
00180 }
00181
00182 for (int i=0 ; i<features.size() ; i++)
00183 {
00184 if (averages.size() == 0)
00185 da->AddFeature(features[i], weights[i], -1);
00186 else
00187 da->AddFeature(features[i], weights[i], averages[i]);
00188 }
00189 if (averages.size() != 0)
00190 da->SetHasOwnAverages(true);
00191 }
00192
00193 void
00194 LoadQuids(DistributedAccess* da, const KernelMatrixLocator& loc)
00195 {
00196 String dir = GetDir(loc);
00197 String filenamebase = loc.GetFeature();
00198
00199 Core::Table::QuidTable* rowQuids = da->GetRowQuids();
00200 Core::Table::QuidTable* columnQuids = da->GetColumnQuids();
00201 if (Link::Mpi::MyId() == da->GetStartNode())
00202 {
00203 String filename = filenamebase + ".columns.tab";
00204 File file = RepFS().GetFile(loc, dir, filename, false, false);
00205 Read(columnQuids, file);
00206 filename = filenamebase + ".rows.tab";
00207 file = RepFS().GetFile(loc, dir, filename, false, true);
00208 if (file.Valid())
00209 {
00210 file = RepFS().GetFile(loc, dir, filename, false, false);
00211 Read(rowQuids, file);
00212 }
00213 else
00214 {
00215 Core::Table::Copy(rowQuids, columnQuids);
00216 }
00217
00218 if (rowQuids->Size() != da->GetRows())
00219 ILOG_ERROR("number of quids doesn't match totalrows: "
00220 << rowQuids->Size() << " vs " << da->GetRows());
00221 if (columnQuids->Size() != da->GetColumns())
00222 ILOG_ERROR("number of quids doesn't match totalcolumns: "
00223 << columnQuids->Size() << " vs " << da->GetColumns());
00224 Util::IOBuffer* buf = new Util::IOBuffer(da->GetRows()*
00225 sizeof(Quid) + 0x1000);
00226 Write(rowQuids, buf, true);
00227 Broadcast(buf, da->GetStartNode());
00228 delete buf;
00229 buf = new Util::IOBuffer(da->GetColumns()*sizeof(Quid)+0x1000);
00230 Write(columnQuids, buf, true);
00231 Broadcast(buf, da->GetStartNode());
00232 delete buf;
00233 }
00234 else
00235 {
00236 Util::IOBuffer* buf = new Util::IOBuffer();
00237 Broadcast(buf, da->GetStartNode());
00238 if (buf && buf->Valid())
00239 {
00240 Read(rowQuids, buf);
00241 delete buf;
00242 }
00243 buf = new Util::IOBuffer();
00244 Broadcast(buf, da->GetStartNode());
00245 if (buf && buf->Valid())
00246 {
00247 Read(columnQuids, buf);
00248 delete buf;
00249 }
00250 if (rowQuids->Size() != da->GetRows())
00251 ILOG_ERROR_NODE("number of quids doesn't match totalrows");
00252 if (columnQuids->Size() != da->GetColumns())
00253 ILOG_ERROR_NODE("number of quids doesn't match totalcolumns");
00254 }
00255 }
00256
00257 void
00258 LoadParts(DistributedAccess* da, const KernelMatrixLocator& loc)
00259 {
00260 typedef Core::Matrix::VirtualMatrixFactory VirtualMatrixFactory;
00261 typedef Core::Matrix::VirtualMatrix VirtualMatrix;
00262 String dir = GetDir(loc);
00263 VirtualMatrixFactory& vmf = VirtualMatrixFactory::GetInstance();
00264 for (int r=0 ; r<da->GetRowPartCount() ; r++)
00265 {
00266 for (int c=0 ; c<da->GetColumnPartCount() ; c++)
00267 {
00268 int i = da->GetPartNr(r,c);
00269 if (Link::Mpi::MyId() == da->GetOwnerOfPart(i))
00270 {
00271 String filename = loc.GetFeature() + ".precomputed.part-R" +
00272 MakeString(r) + "-C" + MakeString(c) + ".raw";
00273 File file = RepFS().GetFile(loc, dir, filename, false,
00274 false);
00275 VirtualMatrix* vm;
00276 if (loc.GetDoParts() == 1)
00277 vm = vmf.ConstructMemory(file, loc.GetEmployReal32());
00278 else if (loc.GetDoParts() == 2)
00279 vm = vmf.ConstructIOBufferReader(file);
00280 else
00281 ILOG_ERROR("[LoadParts] Do not understand GetDoParts");
00282 da->AddPart(vm);
00283 }
00284 }
00285 }
00286 }
00287
00288 bool
00289 WriteInfoFile(DistributedAccess* da, const KernelMatrixLocator& loc)
00290 {
00291 if (Link::Mpi::MyId() != da->GetStartNode())
00292 return true;
00293
00294 String filename = loc.GetFeature() + ".info";
00295 String dir = GetDir(loc);
00296 Persistency::File file = RepFS().GetFile(loc, dir, filename, true,
00297 false);
00298 if (!file.Valid())
00299 return false;
00300
00301 ILOG_INFO("Saving info in " << filename);
00302 Util::IOBuffer* buf = file.GetWriteBuffer();
00303 if (!buf)
00304 {
00305 ILOG_ERROR("Could not open " << loc.GetFeature() << ".info");
00306 return false;
00307 }
00308 Util::PropertySet ps;
00309 ps.Add("totalrows", da->GetRows());
00310 ps.Add("totalcolumns", da->GetColumns());
00311 if (loc.GetDoParts() == 3)
00312 {
00313 ps.Add("rowparts", 1);
00314 ps.Add("columnparts", 1);
00315 }
00316 else
00317 {
00318 ps.Add("rowparts", da->GetRowPartCount());
00319 ps.Add("columnparts", da->GetColumnPartCount());
00320 }
00321 ps.Print(buf);
00322 delete buf;
00323 return true;
00324 }
00325
00326 void
00327 WriteFeatures(DistributedAccess* da, const KernelMatrixLocator& loc)
00328 {
00329 if (Link::Mpi::MyId() != da->GetStartNode())
00330 return;
00331
00332 String dir = GetDir(loc);
00333 String filename = loc.GetFeature() + ".input.txt";
00334 File file = RepFS().GetFile(loc, dir, filename, true, false);
00335 Util::IOBuffer* buf = file.GetWriteBuffer();
00336 if (buf)
00337 {
00338 for (int i=0 ; i<da->GetNrFeatures() ; i++)
00339 {
00340 String s = MakeString(da->GetWeight(i)) + " " +
00341 da->GetFeature(i);
00342 buf->Puts(s);
00343 }
00344 delete buf;
00345 }
00346 else
00347 {
00348 ILOG_ERROR("Could not write input.txt");
00349 }
00350
00351 if (da->GetHasOwnAverages())
00352 {
00353 String filename = loc.GetFeature() + ".averages.raw";
00354 File file = RepFS().GetFile(loc, dir, filename, true, false);
00355 Util::IOBuffer* buf = file.GetWriteBuffer();
00356 if (buf)
00357 {
00358 for (int i=0 ; i<da->GetNrFeatures() ; i++)
00359 {
00360 double val = da->GetAverage(i);
00361 buf->Write(&val, sizeof(double));
00362 }
00363 delete buf;
00364 }
00365 else
00366 {
00367 ILOG_ERROR("Could not write averages.raw");
00368 }
00369 }
00370 }
00371
00372 void
00373 WriteQuids(DistributedAccess* da, const KernelMatrixLocator& loc)
00374 {
00375 if (Link::Mpi::MyId() != da->GetStartNode())
00376 return;
00377
00378 String dir = GetDir(loc);
00379 String filename = loc.GetFeature() + ".columns.tab";
00380 File file = RepFS().GetFile(loc, dir, filename, true, false);
00381 ILOG_INFO("Saving columns in " << file.GetPath());
00382 Write(da->GetColumnQuids(), file);
00383
00384 filename = loc.GetFeature() + ".rows.tab";
00385 file = RepFS().GetFile(loc, dir, filename, true, false);
00386 ILOG_INFO("Saving rows in " << file.GetPath());
00387 Write(da->GetRowQuids(), file);
00388 }
00389
00390 void
00391 WriteParts(DistributedAccess* da, const KernelMatrixLocator& loc)
00392 {
00393 typedef Core::Matrix::VirtualMatrixFactory VirtualMatrixFactory;
00394 String dir = GetDir(loc);
00395 VirtualMatrixFactory& vmf = VirtualMatrixFactory::GetInstance();
00396 for (int r=0 ; r<da->GetRowPartCount() ; r++)
00397 {
00398 for (int c=0 ; c<da->GetColumnPartCount() ; c++)
00399 {
00400 int i = da->GetPartNr(r,c);
00401 if (Link::Mpi::MyId() == da->GetOwnerOfPart(i))
00402 {
00403 Core::Matrix::VirtualMatrix* part = da->GetPart(i);
00404 String filename = loc.GetFeature() + ".precomputed.part-R"
00405 + MakeString(r) + "-C" + MakeString(c) + ".raw" ;
00406 File file = RepFS().GetFile(loc, dir, filename, true,
00407 false);
00408 ILOG_INFO_NODE("Saving part in " << file.GetPath());
00409 Core::Matrix::VirtualMatrix* dst =
00410 vmf.ConstructIOBufferWriter(part->NrRow(), part->NrCol(),
00411 file, loc.GetWriteReal32(),
00412 false);
00413 dst->CopyFrom(part);
00414 delete dst;
00415 }
00416 }
00417 }
00418 }
00419
00420 void
00421 WritePartsAsOne(DistributedAccess* da, const KernelMatrixLocator& loc)
00422 {
00423 typedef Core::Matrix::VirtualMatrixFactory VirtualMatrixFactory;
00424 String dir = GetDir(loc);
00425 String filename = loc.GetFeature() + ".precomputed.part-R0-C0.raw" ;
00426 File file = RepFS().GetFile(loc, dir, filename, true, false);
00427 ILOG_INFO_NODE("Saving part in " << file.GetPath());
00428 VirtualMatrixFactory& vmf = VirtualMatrixFactory::GetInstance();
00429 Core::Matrix::VirtualMatrix* dst =
00430 vmf.ConstructIOBufferWriter(da->NrRow(), da->NrCol(), file,
00431 loc.GetWriteReal32(), false);
00432 dst->CopyFrom(da);
00433 delete dst;
00434 }
00435
00436 ILOG_VAR_DEC;
00437 };
00438
00439 ILOG_VAR_INIT(DistributedAccessRepositoryInFile, Impala.Persistency);
00440
00441 }
00442 }
00443
00444 #endif