Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

DistributedAccess.h

Go to the documentation of this file.
00001 #ifndef Impala_Core_Matrix_DistributedAccess_h
00002 #define Impala_Core_Matrix_DistributedAccess_h
00003 
00004 #include "Basis/String.h"
00005 #include "Core/Matrix/Mat.h"
00006 #include "Core/Array/ReadRaw.h"
00007 #include "Link/Mpi/MpiFuncs.h"
00008 #include "Util/PropertySet.h"
00009 #include "Util/IOBufferFile.h"
00010 #include "Core/Matrix/MatTranspose.h"
00011 #include "Core/Matrix/GetDiagonal.h"
00012 #include "Core/Matrix/GetColumn.h"
00013 #include "Core/Vector/VectorTem.h"
00014 #include "Core/Table/QuidTable.h"
00015 #include "Core/Table/Read.h"
00016 #include "Core/Table/Write.h"
00017 #include "Core/Feature/FeatureTable.h"
00018 
00019 #include "Core/Matrix/IDistributedAccess.h"
00020 
00021 namespace Impala
00022 {
00023 namespace Core
00024 {
00025 namespace Matrix
00026 {
00027 
00039 class DistributedAccess : public IDistributedAccess
00040 {
00041 public:
00042     class IndexConverter
00043     {
00044     public:
00045         IndexConverter(int totalSize, int partCount)
00046             : mTotalSize(totalSize), mPartCount(partCount)
00047         {
00048             mBlockSize = mTotalSize / mPartCount + 1;
00049             ILOG_DEBUG_NODE("IndexConverter created: totalSize=" << mTotalSize
00050                             << "; partCount=" << mPartCount << "; blockSize="
00051                             << mBlockSize);
00052         }
00053         
00054         inline int PartToIndex(int part)
00055         {
00056             return Min(part * mBlockSize, mTotalSize);
00057         }
00058         
00059         inline int IndexToPart(int index)
00060         {
00061             if((index < 0) || (index >= mTotalSize))
00062             {
00063                 ILOG_ERROR_NODE("DistributedAccess::IndexConverter::IndexToPart"
00064                                 <<" range error: "<< index <<" not in [0,"<<
00065                                 mTotalSize <<")");
00066                 return 0;
00067             }
00068             return index / mBlockSize;
00069         }
00070         
00071         inline int PartCount()
00072         {
00073             return mPartCount;
00074         }
00075         
00076         inline int TotalSize()
00077         {
00078             return mTotalSize;
00079         }
00080     private:
00081         const int mTotalSize;
00082         const int mPartCount;
00083         int mBlockSize;
00084     };
00085 
00086     DistributedAccess(String filenamebase, Core::Database::RawDataSet* set, 
00087         Core::Database::RawDataSet* set2, int startNode, int nodeCount)
00088         : mRows(0), mColumns(0)
00089     {
00090         mValid = false;
00091         mStartNode = startNode;
00092         mNodeCount = nodeCount;
00093         bool read = ReadInfoFile(filenamebase, set, set2);
00094         if(!read)
00095         {
00096             ILOG_ERROR("failed to read info file");
00097         }
00098         else
00099         {
00100             LoadQuids(filenamebase, set, set2);
00101             LoadParts(filenamebase, set, set2);
00102         }
00103     }
00104 
00105     ~DistributedAccess()
00106     {
00107         if(Valid())
00108         {
00109             for(int i=0 ; i<mParts.size() ; ++i)
00110                 delete mParts[i];
00111             delete mRowQuids;
00112             delete mColumnQuids;
00113         }
00114         if(mRows) delete mRows;
00115         if(mColumns) delete mColumns;
00116     }
00117 
00118     bool Valid()
00119     {
00120         return mValid;
00121     }
00122 
00123     int GetRows()
00124     {
00125         return mRows->TotalSize();
00126     }
00127 
00128     int GetColumns()
00129     {
00130         return mColumns->TotalSize();
00131     }
00132 
00133     Table::QuidTable* GetColumnQuids()
00134     {
00135         return mColumnQuids;
00136     }
00137 
00138     Table::QuidTable* GetRowQuids()
00139     {
00140         return mRowQuids;
00141     }
00142 
00143     void Subscribe()
00144     {
00145         Util::PropertySet cmd;
00146         cmd.Add("cmd","subscribe");
00147         for(int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00148             Link::Mpi::SendString(cmd.GetDescription(), i);
00149     }
00150 
00151     void Unsubscribe()
00152     {
00153         Util::PropertySet cmd;
00154         cmd.Add("cmd","unsubscribe");
00155         for(int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00156             Link::Mpi::SendString(cmd.GetDescription(), i);
00157     }
00158 
00159     int GetColumn(int index, double* buffer, int bufferlength)
00160     {
00161         // for all pieces that contain that column, broadcast it.
00162         ILOG_DEBUG_NODE("*** GetColumn " << index);
00163         Util::PropertySet cmd;
00164         cmd.Add("cmd","column");
00165         cmd.Add("arg",index);
00166         int columnPart = mColumns->IndexToPart(index);
00167         std::vector<int> parts;
00168         for(int i=0 ; i<mRows->PartCount() ; ++i)
00169            parts.push_back(GetPartNr(i, columnPart));
00170         int msglength = GetData(buffer, bufferlength, cmd, parts);
00171         //ILOG_DEBUG_NODE("GetColumn done, received " << msglength << " elements");
00172         return msglength;
00173     }
00174 
00175     int GetRow(int index, double* buffer, int bufferlength)
00176     {
00177         // for all pieces that contain that row, broadcast it.
00178         ILOG_DEBUG_NODE("*** GetRow " << index);
00179         Util::PropertySet cmd;
00180         cmd.Add("cmd","row");
00181         cmd.Add("arg",index);
00182         int rowPart = mRows->IndexToPart(index);
00183         std::vector<int> parts;
00184         for(int i=0 ; i<mColumns->PartCount() ; ++i)
00185            parts.push_back(GetPartNr(rowPart, i));
00186         int msglength = GetData(buffer, bufferlength, cmd, parts);
00187         //ILOG_DEBUG_NODE("GetRow done, received " << msglength << " elements");
00188         return msglength;
00189     }
00190 
00191     int GetDiagonal(double* buffer, int bufferlength)
00192     {
00193         Util::PropertySet cmd;
00194         cmd.Add("cmd","diagonal");
00195         std::vector<int> parts;
00196         for(int i=0 ; i<mRows->PartCount() ; ++i)
00197            parts.push_back(GetPartNr(i, i));
00198         int msglength = GetData(buffer, bufferlength, cmd, parts);
00199         ILOG_DEBUG_NODE("GetDiagonal done, received " << msglength << " elements");
00200         return msglength;
00201     }
00202 
00203     void StartEventLoop()
00204     {
00205         int subscribers = 0;
00206         do
00207         {
00208             int source;
00209             String messageString = Link::Mpi::ReceiveString(source);
00210             Util::PropertySet message(messageString);
00211             String cmd = message.GetString("cmd");
00212             if(cmd == "subscribe")
00213             {
00214                 ++subscribers;
00215                 ILOG_DEBUG_NODE("subscription added: id=" << source << " new #=" << subscribers);
00216                 continue;
00217             }
00218             if(cmd == "unsubscribe")
00219             {
00220                 --subscribers;
00221                 ILOG_DEBUG_NODE("subscription removed: #=" << subscribers);
00222                 continue;
00223             }
00224             if(cmd != "diagonal" && cmd != "row" && cmd != "column")
00225                 ILOG_ERROR("unknown command: " << cmd);
00226             int part = message.GetInt("part");
00227             ILOG_DEBUG_NODE("received cmd: " << messageString);
00228             Vector::VectorTem<double> v;
00229             Matrix::Mat* m = GetPart(part);
00230             if(cmd == "diagonal")
00231                 v = Matrix::GetDiagonal(m);
00232             if(cmd == "column")
00233             {
00234                 int column = message.GetInt("arg");
00235                 int columnpart = mColumns->IndexToPart(column);
00236                 column -= mColumns->PartToIndex(columnpart);
00237                 if(column < 0 || column >= m->H())
00238                     ILOG_ERROR_NODE("faulty column index: " << column <<
00239                             ". column = " << message.GetInt("arg") << "part = " << part);
00240                 v = Vector::VectorTem<double>(m->W(), m->CPB(0,column), true);
00241             }
00242             if(cmd == "row")
00243             {
00244                 int row = message.GetInt("arg");
00245                 int rowpart = mRows->IndexToPart(row);
00246                 row -= mRows->PartToIndex(rowpart);
00247                 v = Matrix::GetColumn(m, row); // the parts are mirrored, remember?
00248             }
00249             //ILOG_DEBUG_NODE("sending answer");
00250             Link::Mpi::SendData(v.GetData(), v.Size(), source);
00251             //ILOG_DEBUG_NODE("data sent");
00252         } while(subscribers>0);
00253         //ILOG_INFO("EventLoop returned");
00254     }
00255 
00261     void CopyParts(Matrix::Mat* dst)
00262     {
00263         for(int i=0 ; i<mColumns->PartCount()*mRows->PartCount() ; ++i)
00264         {
00265             Matrix::Mat* part = GetPart(i);
00266             if(part)
00267             {
00268                 // transpose because of column acces optimization (see LoadParts)
00269                 Matrix::Mat* t = MatTranspose(part);
00270                 Array::SetPart(dst, t, mColumns->PartToIndex(i%mColumns->PartCount()),
00271                                        mRows->PartToIndex(i/mColumns->PartCount()));
00272                 delete t;
00273             }
00274         }
00275     }
00276 private:
00277     bool ReadInfoFile(String filenamebase, Core::Database::RawDataSet* set, 
00278         Core::Database::RawDataSet* set2)
00279     {
00280         String description;
00281         if(Link::Mpi::MyId() == 0)
00282         {
00283             Util::Database* db;
00284             String filename = filenamebase + ".info";
00285             if (set2)
00286             {
00287                 filename = set2->GetFilePathPrecomputedKernels(filename,
00288                                                                set->GetSetName(),
00289                                                                false, false);
00290                 db = set2->GetDatabase();
00291             }
00292             else
00293             {
00294                 filename = set->GetFilePathPrecomputedKernels(filename,
00295                                                               "", false, false);
00296                 db = set->GetDatabase();
00297             }
00298             Util::IOBuffer* buf = db->GetIOBuffer(filename, true, true, "");
00299             if (!buf)
00300             {
00301                 ILOG_ERROR("could not open " << filenamebase << ".info");
00302                 return false;
00303             }
00304             description = buf->ReadLine();
00305             delete buf;
00306         }
00307         description = Link::Mpi::BroadcastString(description);
00308         ILOG_DEBUG("description  = " << description);
00309         Util::PropertySet properties(description);
00310         // read the properties
00311         mRows = new IndexConverter(properties.GetInt("totalrows"), properties.GetInt("rowparts"));
00312         mColumns = new IndexConverter(properties.GetInt("totalcolumns"), properties.GetInt("columnparts"));
00313         //ILOG_DEBUG_NODE("ReadInfo done: " << mTotalColumns << " " << mColumnParts);
00314         return true;
00315     }
00316 
00317     void LoadParts(String filenamebase, Core::Database::RawDataSet* set, 
00318         Core::Database::RawDataSet* set2)
00319     {
00320         ILOG_DEBUG_NODE("starting LoadParts");
00321         int id = Link::Mpi::MyId();
00322         for(int r=0 ; r<mRows->PartCount() ; r++)
00323         {
00324             for(int c=0 ; c<mColumns->PartCount() ; c++)
00325             {
00326                 int i = GetPartNr(r,c);
00327                 if(GetOwnerOfPart(i) == id)
00328                 {
00329                     Matrix::Mat* m = 0;
00330                     String filename = filenamebase + ".precomputed.part-R" +
00331                                MakeString(r) + "-C" + MakeString(c) + ".raw";
00332                     if (set2)
00333                     {
00334                         filename = set2->GetFilePathPrecomputedKernels
00335                             (filename, set->GetSetName(), false, false);
00336                         ReadRaw(m, filename, set2->GetDatabase());
00337                     }
00338                     else
00339                     {
00340                         filename = set->GetFilePathPrecomputedKernels
00341                             (filename, "", false, false);
00342                         ReadRaw(m, filename, set->GetDatabase());
00343                     }
00344                     if(!m)
00345                     {
00346                         ILOG_ERROR_NODE("did not load matrix");
00347                         exit(0); 
00348                         // exiting one node makes the whole thing crash,
00349                         // this is intentionally
00350                     }
00351                     //ILOG_DEBUG_NODE("loaded matrix r" << r << "c" << c);
00352                     mParts.push_back(MatTranspose(m));
00353                     delete m;
00354                 }
00355             }
00356         }
00357         mValid = true;
00358     }
00359 
00360     void LoadQuids(String filenamebase, Core::Database::RawDataSet* set, 
00361         Core::Database::RawDataSet* set2)
00362     {
00363         //ILOG_DEBUG_NODE("starting LoadQuids");
00364         mRowQuids = new Table::QuidTable();
00365         mColumnQuids = new Table::QuidTable();
00366         int id = Link::Mpi::MyId();
00367         if(id == 0)
00368         {
00369             Util::Database* db = (set2) ? set2->GetDatabase()
00370                                         : set->GetDatabase();
00371             for(int r=0 ; r<mRows->PartCount() ; ++r)
00372             {
00373                 ILOG_DEBUG_NODE("loading row quids part " << r);
00374                 Table::QuidTable* temp = new Table::QuidTable();
00375                 String filename = filenamebase + ".rowindices" + MakeString(r)
00376                     + "of" + MakeString(mRows->PartCount()) + ".quids";
00377                 if (set2)
00378                 {
00379                     filename = set2->GetFilePathPrecomputedKernels
00380                         (filename, set->GetSetName(), false, false);
00381                 }
00382                 else
00383                 {
00384                     filename = set->GetFilePathPrecomputedKernels
00385                         (filename, "", false, false);
00386                 }
00387                 Read(temp, filename, db);
00388                 mRowQuids->Append(temp);
00389                 delete temp;
00390             }
00391             for (int c=0 ; c<mColumns->PartCount() ; ++c)
00392             {
00393                 ILOG_DEBUG_NODE("loading column quids part " << c);
00394                 Table::QuidTable* temp = new Table::QuidTable();
00395                 String filename = filenamebase + ".columnindices" + MakeString(c)
00396                     + "of" + MakeString(mColumns->PartCount()) + ".quids";
00397                 if (set2)
00398                 {
00399                     filename = set2->GetFilePathPrecomputedKernels
00400                         (filename, set->GetSetName(), false, false);
00401                 }
00402                 else
00403                 {
00404                     filename = set->GetFilePathPrecomputedKernels
00405                         (filename, "", false, false);
00406                 }
00407                 Read(temp, filename, db);
00408                 mColumnQuids->Append(temp);
00409                 delete temp;
00410             }
00411             if (mRowQuids->Size() != mRows->TotalSize())
00412                 ILOG_ERROR_NODE("number of row quids doesn't match totalrows "
00413                                 << mRows->TotalSize() << " != "
00414                                 << mRowQuids->Size());
00415             if (mColumnQuids->Size() != mColumns->TotalSize())
00416                 ILOG_ERROR_NODE("number of column quids doesn't match total "
00417                                 << mColumns->TotalSize() << " != "
00418                                 << mColumnQuids->Size());
00419             Util::IOBuffer* buf = new Util::IOBuffer(mRows->TotalSize()*
00420                                                      sizeof(Quid) + 0x1000);
00421             Write(mRowQuids, buf, true);
00422             //ILOG_DEBUG_NODE("broadcasting...");
00423             Broadcast(buf);
00424             //ILOG_DEBUG_NODE("done");
00425             delete buf;
00426             buf = new Util::IOBuffer(mColumns->TotalSize()*sizeof(Quid) + 0x1000);
00427             Write(mColumnQuids, buf, true);
00428             Broadcast(buf);
00429             delete buf;
00430         }
00431         else
00432         {
00433             Util::IOBuffer* buf = new Util::IOBuffer();
00434             //ILOG_DEBUG_NODE("broadcasting...");
00435             Broadcast(buf);
00436             //ILOG_DEBUG_NODE("done");
00437             if (buf && buf->Valid())
00438             {
00439                 Read(mRowQuids, buf);
00440                 //ILOG_DEBUG_NODE("rows received");
00441                 delete buf;
00442             }
00443             buf = new Util::IOBuffer();
00444             Broadcast(buf);
00445             if (buf && buf->Valid())
00446             {
00447                 Read(mColumnQuids, buf);
00448                 //ILOG_DEBUG_NODE("columns received");
00449                 delete buf;
00450             }
00451             if(mRowQuids->Size() != mRows->TotalSize())
00452                 ILOG_ERROR_NODE("number of row quids doesn't match totalrows");
00453             if(mColumnQuids->Size() != mColumns->TotalSize())
00454                 ILOG_ERROR_NODE("number of column quids doesn't match totalcolumns");
00455         }
00456     }
00457 
00458     int GetPartNr(int rowpart, int columnpart)
00459     {
00460         if(rowpart >= mRows->PartCount() || rowpart < 0 ||
00461            columnpart >= mColumns->PartCount() || columnpart < 0)
00462            ILOG_ERROR_NODE("GetPartNr: invalid args: r=" << rowpart << 
00463                       " c=" << columnpart);
00464         int partNr = rowpart*mColumns->PartCount() + columnpart;
00465         return partNr;
00466     }
00467 
00468     int GetOwnerOfPart(int partNr)
00469     {
00470         int totalParts = mRows->PartCount() * mColumns->PartCount();
00471         return mStartNode + ((partNr * mNodeCount) / totalParts);
00472     }
00473 
00474     Matrix::Mat* GetPart(int partNr)
00475     {
00476         if(GetOwnerOfPart(partNr) != Link::Mpi::MyId())
00477             return 0;
00478         int index = 0;
00479         for(int i=partNr-1 ; i>=0 ; --i)
00480         {
00481             if(GetOwnerOfPart(i) != Link::Mpi::MyId())
00482                 break;
00483             ++index;
00484         }
00485         return mParts[index];
00486     }
00487 
00488     int GetData(double* buffer, int bufferlength, Util::PropertySet& cmd,
00489                 const std::vector<int>& parts)
00490     {
00491         //ILOG_DEBUG_NODE("GetData: bufferlength = " << bufferlength << ", received = " << 0);
00492         int received=0;
00493         for(int i=0 ; i<parts.size(); ++i)
00494         {
00495             int owner = GetOwnerOfPart(parts[i]);
00496             cmd.Add("part",parts[i]);
00497             Link::Mpi::SendString(cmd.GetDescription(), owner);
00498             double* b = buffer+received;
00499             int l = Link::Mpi::ReceiveData(b, bufferlength-received, owner);
00500             received +=l;
00501             //ILOG_DEBUG_NODE("GetData: bufferlength = " << bufferlength << ", received = " << received);
00502         }
00503         return received;
00504     }
00505 
00506     bool mValid;
00507     IndexConverter* mRows;
00508     IndexConverter* mColumns;
00509     int mStartNode, mNodeCount;
00510     std::vector<Matrix::Mat*> mParts;
00511     Table::QuidTable* mColumnQuids;
00512     Table::QuidTable* mRowQuids;
00513 
00514     ILOG_VAR_DEC;
00515 };
00516 
00517 ILOG_VAR_INIT(DistributedAccess, Impala.Core.Matrix);
00518 
00519 
00520 Feature::FeatureTable* LoadDistributedMatrix(DistributedAccess& access)
00521 {
00522     Table::QuidTable* quids = access.GetRowQuids();
00523     int rows = access.GetRows();
00524     int columns = access.GetColumns();
00525     Matrix::Mat* m = new Matrix::Mat(rows, columns, 0, 0);
00526     access.CopyParts(m);
00527 
00528     Feature::FeatureDefinition fdef("precomputed");
00529     Feature::FeatureTable* f = new Feature::FeatureTable(fdef, rows, columns);
00530     f->GetColumn2()->SetStorage(m);
00531     Copy(f->GetColumn1(), quids->GetColumn1(), rows, 0, 0);
00532     f->SetSize(rows);
00533     return f;
00534 }
00535 
00536 
00537 Feature::FeatureTable* LoadDistributedMatrix(String name, 
00538                                              Core::Database::RawDataSet* set,
00539                                              Core::Database::RawDataSet* set2)
00540 {
00541     ILOG_VAR(Core.Matrix.LoadDistributedMatrix);
00542     DistributedAccess access(name, set, set2, Link::Mpi::MyId(), 1);
00543     if(!access.Valid())
00544     {
00545         ILOG_ERROR("could not open distributed matrix \"" << name << "\"");
00546         return 0;
00547     }
00548     return LoadDistributedMatrix(access);
00549 }
00550 
00551 } // namespace Matrix
00552 } // namespace Core
00553 } // namespace Impala
00554 
00555 #endif

Generated on Fri Mar 19 09:31:15 2010 for ImpalaSrc by  doxygen 1.5.1