Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

DistributedAccessRepositoryInFile.h

Go to the documentation of this file.
00001 #ifndef Impala_Persistency_DistributedAccessRepositoryInFile_h
00002 #define Impala_Persistency_DistributedAccessRepositoryInFile_h
00003 
00004 #include "Persistency/RepositoryInFileSystem.h"
00005 #include "Persistency/KernelMatrixLocator.h"
00006 #include "Core/Matrix/DistributedAccess.h"
00007 #include "Core/Matrix/VirtualMatrixFactory.h"
00008 #include "Core/Array/Set.h"
00009 #include "Core/Table/Read.h"
00010 #include "Core/Table/Write.h"
00011 #include "Core/Table/Copy.h"
00012 #include "Util/StringParser.h"
00013 
00014 namespace Impala
00015 {
00016 namespace Persistency
00017 {
00018 
00019 
00020 class DistributedAccessRepositoryInFile
00021 {
00022 public:
00023 
00024     typedef Core::Database::RawDataSet RawDataSet;
00025     typedef Core::Matrix::DistributedAccess DistributedAccess;
00026 
00027     DistributedAccessRepositoryInFile()
00028     {
00029     }
00030 
00031     bool
00032     Exists(const KernelMatrixLocator& loc)
00033     {
00034         String filename = loc.GetFeature() + ".info";
00035         String dir = GetDir(loc);
00036         Persistency::File file = RepFS().GetFile(loc, dir, filename, false,
00037                                                  true);
00038         return file.Valid();
00039     }
00040 
00041     DistributedAccess*
00042     Get(const KernelMatrixLocator& loc)
00043     {
00044         DistributedAccess* da = ReadInfoFile(loc);
00045         if (da)
00046         {
00047             LoadFeatures(da, loc);
00048             LoadQuids(da, loc);
00049             if (loc.GetDoParts() > 0)
00050                 LoadParts(da, loc);
00051         }
00052         return da;
00053     }
00054 
00055     void
00056     Add(const KernelMatrixLocator& loc, DistributedAccess* da)
00057     {
00058         if (!WriteInfoFile(da, loc))
00059             return;
00060         WriteFeatures(da, loc);
00061         WriteQuids(da, loc);
00062         if (loc.GetDoParts() > 0)
00063         {
00064             if (loc.GetDoParts() == 3)
00065                 WritePartsAsOne(da, loc);
00066             else
00067                 WriteParts(da, loc);
00068         }
00069     }
00070 
00071     File
00072     ExposeFile(const KernelMatrixLocator& loc, bool toWrite)
00073     {
00074         String dir = GetDir(loc);
00075         String fName = loc.GetFeature() + ".info";
00076         bool silent = !toWrite; // if (toWrite) force creating directory
00077         return RepFS().GetFile(loc, dir, fName, toWrite, silent);
00078     }
00079 
00080 private:
00081 
00082     RepositoryInFileSystem&
00083     RepFS()
00084     {
00085         return RepositoryInFileSystem::GetInstance();
00086     }
00087 
00088     // This function should follow KernelMatrixRepositoryInFile::GetDir
00089     String
00090     GetDir(const KernelMatrixLocator& loc)
00091     {
00092         String dir = loc.GetTopString();
00093         if (!loc.GetIsIndex())
00094             dir = FileNameConcat(dir, loc.GetWalkType());
00095         dir = FileNameConcat(dir, loc.GetAnnoSet());
00096         dir = FileNameConcat(dir, loc.GetFeatureIndexCat());
00097         dir = FileNameConcat(dir, loc.GetModel());
00098         dir = FileNameConcat(dir, loc.GetFeature());
00099         int inc = loc.GetIncrement();
00100         if (inc != 0)
00101             dir = FileNameConcat(dir, "inc_" + MakeString(inc));
00102         return dir;
00103     }
00104 
00105     DistributedAccess*
00106     ReadInfoFile(const KernelMatrixLocator& loc)
00107     {
00108         String description;
00109         if (Link::Mpi::MyId() == loc.GetStartNode())
00110         {
00111             String filename = loc.GetFeature() + ".info";
00112             String dir = GetDir(loc);
00113             Persistency::File file = RepFS().GetFile(loc, dir, filename, false,
00114                                                      false);
00115             if (!file.Valid())
00116                 return 0;
00117 
00118             Util::IOBuffer* buf = file.GetReadBuffer(true, "");
00119             if (!buf)
00120             {
00121                 ILOG_ERROR("Could not open " << loc.GetFeature() << ".info");
00122                 return 0;
00123             }
00124             description = buf->ReadLine();
00125             delete buf;
00126         }
00127         description = Link::Mpi::BroadcastString(description,
00128                                                  loc.GetStartNode());
00129         Util::PropertySet properties(description);
00130         DistributedAccess* da = new DistributedAccess
00131             (properties.GetInt("totalrows"), properties.GetInt("totalcolumns"),
00132              properties.GetInt("rowparts"), properties.GetInt("columnparts"),
00133              loc.GetStartNode(), loc.GetNodeCount());
00134         return da;
00135     }
00136 
00137     void
00138     LoadFeatures(DistributedAccess* da, const KernelMatrixLocator& loc)
00139     {
00140         String dir = GetDir(loc);
00141         String filename = loc.GetFeature() + ".input.txt";
00142         File file = RepFS().GetFile(loc, dir, filename, false, true);
00143         if (!file.Valid())
00144             return;
00145         std::vector<String> lines;
00146         file.ReadStrings(std::back_inserter(lines));
00147 
00148         std::vector<double> weights;
00149         std::vector<String> features;
00150         for (int i=0 ; i<lines.size() ; i++)
00151         {
00152             Util::StringParser p(lines[i]);
00153             Real64 weight = p.GetDouble();
00154             String feature = p.GetString(' ', false);
00155             if ((weight == 0) || (feature.empty()))
00156             {
00157                 ILOG_ERROR_HEADNODE("LoadFeatures: parse error in .input.txt");
00158                 return;
00159             }
00160             weights.push_back(weight);
00161             features.push_back(feature);
00162         }
00163         if (features.size() == 0)
00164         {
00165             ILOG_ERROR_HEADNODE("LoadFeatures: no features found");
00166             return;
00167         }
00168 
00169         filename = loc.GetFeature() + ".averages.raw";
00170         file = RepFS().GetFile(loc, dir, filename, false, true);
00171         std::vector<double> averages;
00172         if (file.Valid())
00173         {
00174             file.ReadNative(std::back_inserter(averages));
00175             if (features.size() != averages.size())
00176             {
00177                 ILOG_ERROR_HEADNODE("LoadFeatures: averages do not match");
00178                 return;
00179             }
00180         }
00181 
00182         for (int i=0 ; i<features.size() ; i++)
00183         {
00184             if (averages.size() == 0)
00185                 da->AddFeature(features[i], weights[i], -1);
00186             else
00187                 da->AddFeature(features[i], weights[i], averages[i]);
00188         }
00189         if (averages.size() != 0)
00190             da->SetHasOwnAverages(true);
00191     }
00192 
00193     void
00194     LoadQuids(DistributedAccess* da, const KernelMatrixLocator& loc)
00195     {
00196         String dir = GetDir(loc);
00197         String filenamebase = loc.GetFeature();
00198 
00199         Core::Table::QuidTable* rowQuids = da->GetRowQuids();
00200         Core::Table::QuidTable* columnQuids = da->GetColumnQuids();
00201         if (Link::Mpi::MyId() == da->GetStartNode())
00202         {
00203             String filename = filenamebase + ".columns.tab";
00204             File file = RepFS().GetFile(loc, dir, filename, false, false);
00205             Read(columnQuids, file);
00206             filename = filenamebase + ".rows.tab";
00207             file = RepFS().GetFile(loc, dir, filename, false, true);
00208             if (file.Valid())
00209             {
00210                 file = RepFS().GetFile(loc, dir, filename, false, false);
00211                 Read(rowQuids, file);
00212             }
00213             else
00214             {   // assume square kernel
00215                 Core::Table::Copy(rowQuids, columnQuids);
00216             }
00217 
00218             if (rowQuids->Size() != da->GetRows())
00219                 ILOG_ERROR("number of quids doesn't match totalrows: "
00220                            << rowQuids->Size() << " vs " << da->GetRows());
00221             if (columnQuids->Size() != da->GetColumns())
00222                 ILOG_ERROR("number of quids doesn't match totalcolumns: "
00223                            << columnQuids->Size() << " vs " << da->GetColumns());
00224             Util::IOBuffer* buf = new Util::IOBuffer(da->GetRows()*
00225                                                      sizeof(Quid) + 0x1000);
00226             Write(rowQuids, buf, true);
00227             Broadcast(buf, da->GetStartNode());
00228             delete buf;
00229             buf = new Util::IOBuffer(da->GetColumns()*sizeof(Quid)+0x1000);
00230             Write(columnQuids, buf, true);
00231             Broadcast(buf, da->GetStartNode());
00232             delete buf;
00233         }
00234         else
00235         {
00236             Util::IOBuffer* buf = new Util::IOBuffer();
00237             Broadcast(buf, da->GetStartNode());
00238             if (buf && buf->Valid())
00239             {
00240                 Read(rowQuids, buf);
00241                 delete buf;
00242             }
00243             buf = new Util::IOBuffer();
00244             Broadcast(buf, da->GetStartNode());
00245             if (buf && buf->Valid())
00246             {
00247                 Read(columnQuids, buf);
00248                 delete buf;
00249             }
00250             if (rowQuids->Size() != da->GetRows())
00251                 ILOG_ERROR_NODE("number of quids doesn't match totalrows");
00252             if (columnQuids->Size() != da->GetColumns())
00253                 ILOG_ERROR_NODE("number of quids doesn't match totalcolumns");
00254         }
00255     }
00256 
00257     void
00258     LoadParts(DistributedAccess* da, const KernelMatrixLocator& loc)
00259     {
00260         typedef Core::Matrix::VirtualMatrixFactory VirtualMatrixFactory;
00261         typedef Core::Matrix::VirtualMatrix VirtualMatrix;
00262         String dir = GetDir(loc);
00263         VirtualMatrixFactory& vmf = VirtualMatrixFactory::GetInstance();
00264         for (int r=0 ; r<da->GetRowPartCount() ; r++)
00265         {
00266             for (int c=0 ; c<da->GetColumnPartCount() ; c++)
00267             {
00268                 int i = da->GetPartNr(r,c);
00269                 if (Link::Mpi::MyId() == da->GetOwnerOfPart(i))
00270                 {
00271                     String filename = loc.GetFeature() + ".precomputed.part-R" +
00272                                MakeString(r) + "-C" + MakeString(c) + ".raw";
00273                     File file = RepFS().GetFile(loc, dir, filename, false,
00274                                                 false);
00275                     VirtualMatrix* vm;
00276                     if (loc.GetDoParts() == 1)
00277                         vm = vmf.ConstructMemory(file, loc.GetEmployReal32());
00278                     else if (loc.GetDoParts() == 2)
00279                         vm = vmf.ConstructIOBufferReader(file);
00280                     else
00281                         ILOG_ERROR("[LoadParts] Do not understand GetDoParts");
00282                     da->AddPart(vm);
00283                 }
00284             }
00285         }
00286     }
00287 
00288     bool
00289     WriteInfoFile(DistributedAccess* da, const KernelMatrixLocator& loc)
00290     {
00291         if (Link::Mpi::MyId() != da->GetStartNode())
00292             return true;
00293 
00294         String filename = loc.GetFeature() + ".info";
00295         String dir = GetDir(loc);
00296         Persistency::File file = RepFS().GetFile(loc, dir, filename, true,
00297                                                  false);
00298         if (!file.Valid())
00299             return false;
00300 
00301         ILOG_INFO("Saving info in " << filename);
00302         Util::IOBuffer* buf = file.GetWriteBuffer();
00303         if (!buf)
00304         {
00305             ILOG_ERROR("Could not open " << loc.GetFeature() << ".info");
00306             return false;
00307         }
00308         Util::PropertySet ps;
00309         ps.Add("totalrows", da->GetRows());
00310         ps.Add("totalcolumns", da->GetColumns());
00311         if (loc.GetDoParts() == 3)
00312         {
00313             ps.Add("rowparts", 1);
00314             ps.Add("columnparts", 1);
00315         }
00316         else
00317         {
00318             ps.Add("rowparts", da->GetRowPartCount());
00319             ps.Add("columnparts", da->GetColumnPartCount());
00320         }
00321         ps.Print(buf);
00322         delete buf;
00323         return true;
00324     }
00325 
00326     void
00327     WriteFeatures(DistributedAccess* da, const KernelMatrixLocator& loc)
00328     {
00329         if (Link::Mpi::MyId() != da->GetStartNode())
00330             return;
00331 
00332         String dir = GetDir(loc);
00333         String filename = loc.GetFeature() + ".input.txt";
00334         File file = RepFS().GetFile(loc, dir, filename, true, false);
00335         Util::IOBuffer* buf = file.GetWriteBuffer();
00336         if (buf)
00337         {
00338             for (int i=0 ; i<da->GetNrFeatures() ; i++)
00339             {
00340                 String s = MakeString(da->GetWeight(i)) + " " +
00341                     da->GetFeature(i);
00342                 buf->Puts(s);
00343             }
00344             delete buf;
00345         }
00346         else
00347         {
00348             ILOG_ERROR("Could not write input.txt");
00349         }
00350 
00351         if (da->GetHasOwnAverages())
00352         {
00353             String filename = loc.GetFeature() + ".averages.raw";
00354             File file = RepFS().GetFile(loc, dir, filename, true, false);
00355             Util::IOBuffer* buf = file.GetWriteBuffer();
00356             if (buf)
00357             {
00358                 for (int i=0 ; i<da->GetNrFeatures() ; i++)
00359                 {
00360                     double val = da->GetAverage(i);
00361                     buf->Write(&val, sizeof(double));
00362                 }
00363                 delete buf;
00364             }
00365             else
00366             {
00367                 ILOG_ERROR("Could not write averages.raw");
00368             }
00369         }
00370     }
00371 
00372     void
00373     WriteQuids(DistributedAccess* da, const KernelMatrixLocator& loc)
00374     {
00375         if (Link::Mpi::MyId() != da->GetStartNode())
00376             return;
00377 
00378         String dir = GetDir(loc);
00379         String filename = loc.GetFeature() + ".columns.tab";
00380         File file = RepFS().GetFile(loc, dir, filename, true, false);
00381         ILOG_INFO("Saving columns in " << file.GetPath());
00382         Write(da->GetColumnQuids(), file);
00383 
00384         filename = loc.GetFeature() + ".rows.tab";
00385         file = RepFS().GetFile(loc, dir, filename, true, false);
00386         ILOG_INFO("Saving rows in " << file.GetPath());
00387         Write(da->GetRowQuids(), file);
00388     }
00389 
00390     void
00391     WriteParts(DistributedAccess* da, const KernelMatrixLocator& loc)
00392     {
00393         typedef Core::Matrix::VirtualMatrixFactory VirtualMatrixFactory;
00394         String dir = GetDir(loc);
00395         VirtualMatrixFactory& vmf = VirtualMatrixFactory::GetInstance();
00396         for (int r=0 ; r<da->GetRowPartCount() ; r++)
00397         {
00398             for (int c=0 ; c<da->GetColumnPartCount() ; c++)
00399             {
00400                 int i = da->GetPartNr(r,c);
00401                 if (Link::Mpi::MyId() == da->GetOwnerOfPart(i))
00402                 {
00403                     Core::Matrix::VirtualMatrix* part = da->GetPart(i);
00404                     String filename = loc.GetFeature() + ".precomputed.part-R"
00405                         + MakeString(r) + "-C" + MakeString(c) + ".raw" ;
00406                     File file = RepFS().GetFile(loc, dir, filename, true,
00407                                                 false);
00408                     ILOG_INFO_NODE("Saving part in " << file.GetPath());
00409                     Core::Matrix::VirtualMatrix* dst = 
00410                         vmf.ConstructIOBufferWriter(part->NrRow(), part->NrCol(),
00411                                                     file, loc.GetWriteReal32(),
00412                                                     false);
00413                     dst->CopyFrom(part);
00414                     delete dst;
00415                 }
00416             }
00417         }
00418     }
00419 
00420     void
00421     WritePartsAsOne(DistributedAccess* da, const KernelMatrixLocator& loc)
00422     {
00423         typedef Core::Matrix::VirtualMatrixFactory VirtualMatrixFactory;
00424         String dir = GetDir(loc);
00425         String filename = loc.GetFeature() + ".precomputed.part-R0-C0.raw" ;
00426         File file = RepFS().GetFile(loc, dir, filename, true, false);
00427         ILOG_INFO_NODE("Saving part in " << file.GetPath());
00428         VirtualMatrixFactory& vmf = VirtualMatrixFactory::GetInstance();
00429         Core::Matrix::VirtualMatrix* dst = 
00430             vmf.ConstructIOBufferWriter(da->NrRow(), da->NrCol(), file,
00431                                         loc.GetWriteReal32(), false);
00432         dst->CopyFrom(da);
00433         delete dst;
00434     }
00435 
00436     ILOG_VAR_DEC;
00437 };
00438 
00439 ILOG_VAR_INIT(DistributedAccessRepositoryInFile, Impala.Persistency);
00440 
00441 } // namespace Persistency
00442 } // namespace Impala
00443 
00444 #endif

Generated on Thu Jan 13 09:05:01 2011 for ImpalaSrc by  doxygen 1.5.1