00001 #ifndef Impala_Core_Matrix_DistributedAccess_h
00002 #define Impala_Core_Matrix_DistributedAccess_h
00003
00004 #include "Basis/String.h"
00005 #include "Core/Matrix/Mat.h"
00006 #include "Core/Array/ReadRaw.h"
00007 #include "Link/Mpi/MpiFuncs.h"
00008 #include "Util/PropertySet.h"
00009 #include "Util/IOBufferFile.h"
00010 #include "Core/Matrix/MatTranspose.h"
00011 #include "Core/Matrix/GetDiagonal.h"
00012 #include "Core/Matrix/GetColumn.h"
00013 #include "Core/Vector/VectorTem.h"
00014 #include "Core/Table/QuidTable.h"
00015 #include "Core/Table/Read.h"
00016 #include "Core/Table/Write.h"
00017 #include "Core/Feature/FeatureTable.h"
00018
00019 #include "Core/Matrix/IDistributedAccess.h"
00020
00021 namespace Impala
00022 {
00023 namespace Core
00024 {
00025 namespace Matrix
00026 {
00027
00039 class DistributedAccess : public IDistributedAccess
00040 {
00041 public:
00042 class IndexConverter
00043 {
00044 public:
00045 IndexConverter(int totalSize, int partCount)
00046 : mTotalSize(totalSize), mPartCount(partCount)
00047 {
00048 mBlockSize = mTotalSize / mPartCount + 1;
00049 ILOG_DEBUG_NODE("IndexConverter created: totalSize=" << mTotalSize
00050 << "; partCount=" << mPartCount << "; blockSize="
00051 << mBlockSize);
00052 }
00053
00054 inline int PartToIndex(int part)
00055 {
00056 return Min(part * mBlockSize, mTotalSize);
00057 }
00058
00059 inline int IndexToPart(int index)
00060 {
00061 if((index < 0) || (index >= mTotalSize))
00062 {
00063 ILOG_ERROR_NODE("DistributedAccess::IndexConverter::IndexToPart"
00064 <<" range error: "<< index <<" not in [0,"<<
00065 mTotalSize <<")");
00066 return 0;
00067 }
00068 return index / mBlockSize;
00069 }
00070
00071 inline int PartCount()
00072 {
00073 return mPartCount;
00074 }
00075
00076 inline int TotalSize()
00077 {
00078 return mTotalSize;
00079 }
00080 private:
00081 const int mTotalSize;
00082 const int mPartCount;
00083 int mBlockSize;
00084 };
00085
00086 DistributedAccess(String filenamebase, Core::Database::RawDataSet* set,
00087 Core::Database::RawDataSet* set2, int startNode, int nodeCount)
00088 : mRows(0), mColumns(0)
00089 {
00090 mValid = false;
00091 mStartNode = startNode;
00092 mNodeCount = nodeCount;
00093 bool read = ReadInfoFile(filenamebase, set, set2);
00094 if(!read)
00095 {
00096 ILOG_ERROR("failed to read info file");
00097 }
00098 else
00099 {
00100 LoadQuids(filenamebase, set, set2);
00101 LoadParts(filenamebase, set, set2);
00102 }
00103 }
00104
00105 ~DistributedAccess()
00106 {
00107 if(Valid())
00108 {
00109 for(int i=0 ; i<mParts.size() ; ++i)
00110 delete mParts[i];
00111 delete mRowQuids;
00112 delete mColumnQuids;
00113 }
00114 if(mRows) delete mRows;
00115 if(mColumns) delete mColumns;
00116 }
00117
00118 bool Valid()
00119 {
00120 return mValid;
00121 }
00122
00123 int GetRows()
00124 {
00125 return mRows->TotalSize();
00126 }
00127
00128 int GetColumns()
00129 {
00130 return mColumns->TotalSize();
00131 }
00132
00133 Table::QuidTable* GetColumnQuids()
00134 {
00135 return mColumnQuids;
00136 }
00137
00138 Table::QuidTable* GetRowQuids()
00139 {
00140 return mRowQuids;
00141 }
00142
00143 void Subscribe()
00144 {
00145 Util::PropertySet cmd;
00146 cmd.Add("cmd","subscribe");
00147 for(int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00148 Link::Mpi::SendString(cmd.GetDescription(), i);
00149 }
00150
00151 void Unsubscribe()
00152 {
00153 Util::PropertySet cmd;
00154 cmd.Add("cmd","unsubscribe");
00155 for(int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00156 Link::Mpi::SendString(cmd.GetDescription(), i);
00157 }
00158
00159 int GetColumn(int index, double* buffer, int bufferlength)
00160 {
00161
00162 ILOG_DEBUG_NODE("*** GetColumn " << index);
00163 Util::PropertySet cmd;
00164 cmd.Add("cmd","column");
00165 cmd.Add("arg",index);
00166 int columnPart = mColumns->IndexToPart(index);
00167 std::vector<int> parts;
00168 for(int i=0 ; i<mRows->PartCount() ; ++i)
00169 parts.push_back(GetPartNr(i, columnPart));
00170 int msglength = GetData(buffer, bufferlength, cmd, parts);
00171
00172 return msglength;
00173 }
00174
00175 int GetRow(int index, double* buffer, int bufferlength)
00176 {
00177
00178 ILOG_DEBUG_NODE("*** GetRow " << index);
00179 Util::PropertySet cmd;
00180 cmd.Add("cmd","row");
00181 cmd.Add("arg",index);
00182 int rowPart = mRows->IndexToPart(index);
00183 std::vector<int> parts;
00184 for(int i=0 ; i<mColumns->PartCount() ; ++i)
00185 parts.push_back(GetPartNr(rowPart, i));
00186 int msglength = GetData(buffer, bufferlength, cmd, parts);
00187
00188 return msglength;
00189 }
00190
00191 int GetDiagonal(double* buffer, int bufferlength)
00192 {
00193 Util::PropertySet cmd;
00194 cmd.Add("cmd","diagonal");
00195 std::vector<int> parts;
00196 for(int i=0 ; i<mRows->PartCount() ; ++i)
00197 parts.push_back(GetPartNr(i, i));
00198 int msglength = GetData(buffer, bufferlength, cmd, parts);
00199 ILOG_DEBUG_NODE("GetDiagonal done, received " << msglength << " elements");
00200 return msglength;
00201 }
00202
00203 void StartEventLoop()
00204 {
00205 int subscribers = 0;
00206 do
00207 {
00208 int source;
00209 String messageString = Link::Mpi::ReceiveString(source);
00210 Util::PropertySet message(messageString);
00211 String cmd = message.GetString("cmd");
00212 if(cmd == "subscribe")
00213 {
00214 ++subscribers;
00215 ILOG_DEBUG_NODE("subscription added: id=" << source << " new #=" << subscribers);
00216 continue;
00217 }
00218 if(cmd == "unsubscribe")
00219 {
00220 --subscribers;
00221 ILOG_DEBUG_NODE("subscription removed: #=" << subscribers);
00222 continue;
00223 }
00224 if(cmd != "diagonal" && cmd != "row" && cmd != "column")
00225 ILOG_ERROR("unknown command: " << cmd);
00226 int part = message.GetInt("part");
00227 ILOG_DEBUG_NODE("received cmd: " << messageString);
00228 Vector::VectorTem<double> v;
00229 Matrix::Mat* m = GetPart(part);
00230 if(cmd == "diagonal")
00231 v = Matrix::GetDiagonal(m);
00232 if(cmd == "column")
00233 {
00234 int column = message.GetInt("arg");
00235 int columnpart = mColumns->IndexToPart(column);
00236 column -= mColumns->PartToIndex(columnpart);
00237 if(column < 0 || column >= m->H())
00238 ILOG_ERROR_NODE("faulty column index: " << column <<
00239 ". column = " << message.GetInt("arg") << "part = " << part);
00240 v = Vector::VectorTem<double>(m->W(), m->CPB(0,column), true);
00241 }
00242 if(cmd == "row")
00243 {
00244 int row = message.GetInt("arg");
00245 int rowpart = mRows->IndexToPart(row);
00246 row -= mRows->PartToIndex(rowpart);
00247 v = Matrix::GetColumn(m, row);
00248 }
00249
00250 Link::Mpi::SendData(v.GetData(), v.Size(), source);
00251
00252 } while(subscribers>0);
00253
00254 }
00255
00261 void CopyParts(Matrix::Mat* dst)
00262 {
00263 for(int i=0 ; i<mColumns->PartCount()*mRows->PartCount() ; ++i)
00264 {
00265 Matrix::Mat* part = GetPart(i);
00266 if(part)
00267 {
00268
00269 Matrix::Mat* t = MatTranspose(part);
00270 Array::SetPart(dst, t, mColumns->PartToIndex(i%mColumns->PartCount()),
00271 mRows->PartToIndex(i/mColumns->PartCount()));
00272 delete t;
00273 }
00274 }
00275 }
00276 private:
00277 bool ReadInfoFile(String filenamebase, Core::Database::RawDataSet* set,
00278 Core::Database::RawDataSet* set2)
00279 {
00280 String description;
00281 if(Link::Mpi::MyId() == 0)
00282 {
00283 Util::Database* db;
00284 String filename = filenamebase + ".info";
00285 if (set2)
00286 {
00287 filename = set2->GetFilePathPrecomputedKernels(filename,
00288 set->GetSetName(),
00289 false, false);
00290 db = set2->GetDatabase();
00291 }
00292 else
00293 {
00294 filename = set->GetFilePathPrecomputedKernels(filename,
00295 "", false, false);
00296 db = set->GetDatabase();
00297 }
00298 Util::IOBuffer* buf = db->GetIOBuffer(filename, true, true, "");
00299 if (!buf)
00300 {
00301 ILOG_ERROR("could not open " << filenamebase << ".info");
00302 return false;
00303 }
00304 description = buf->ReadLine();
00305 delete buf;
00306 }
00307 description = Link::Mpi::BroadcastString(description);
00308 ILOG_DEBUG("description = " << description);
00309 Util::PropertySet properties(description);
00310
00311 mRows = new IndexConverter(properties.GetInt("totalrows"), properties.GetInt("rowparts"));
00312 mColumns = new IndexConverter(properties.GetInt("totalcolumns"), properties.GetInt("columnparts"));
00313
00314 return true;
00315 }
00316
00317 void LoadParts(String filenamebase, Core::Database::RawDataSet* set,
00318 Core::Database::RawDataSet* set2)
00319 {
00320 ILOG_DEBUG_NODE("starting LoadParts");
00321 int id = Link::Mpi::MyId();
00322 for(int r=0 ; r<mRows->PartCount() ; r++)
00323 {
00324 for(int c=0 ; c<mColumns->PartCount() ; c++)
00325 {
00326 int i = GetPartNr(r,c);
00327 if(GetOwnerOfPart(i) == id)
00328 {
00329 Matrix::Mat* m = 0;
00330 String filename = filenamebase + ".precomputed.part-R" +
00331 MakeString(r) + "-C" + MakeString(c) + ".raw";
00332 if (set2)
00333 {
00334 filename = set2->GetFilePathPrecomputedKernels
00335 (filename, set->GetSetName(), false, false);
00336 ReadRaw(m, filename, set2->GetDatabase());
00337 }
00338 else
00339 {
00340 filename = set->GetFilePathPrecomputedKernels
00341 (filename, "", false, false);
00342 ReadRaw(m, filename, set->GetDatabase());
00343 }
00344 if(!m)
00345 {
00346 ILOG_ERROR_NODE("did not load matrix");
00347 exit(0);
00348
00349
00350 }
00351
00352 mParts.push_back(MatTranspose(m));
00353 delete m;
00354 }
00355 }
00356 }
00357 mValid = true;
00358 }
00359
00360 void LoadQuids(String filenamebase, Core::Database::RawDataSet* set,
00361 Core::Database::RawDataSet* set2)
00362 {
00363
00364 mRowQuids = new Table::QuidTable();
00365 mColumnQuids = new Table::QuidTable();
00366 int id = Link::Mpi::MyId();
00367 if(id == 0)
00368 {
00369 Util::Database* db = (set2) ? set2->GetDatabase()
00370 : set->GetDatabase();
00371 for(int r=0 ; r<mRows->PartCount() ; ++r)
00372 {
00373 ILOG_DEBUG_NODE("loading row quids part " << r);
00374 Table::QuidTable* temp = new Table::QuidTable();
00375 String filename = filenamebase + ".rowindices" + MakeString(r)
00376 + "of" + MakeString(mRows->PartCount()) + ".quids";
00377 if (set2)
00378 {
00379 filename = set2->GetFilePathPrecomputedKernels
00380 (filename, set->GetSetName(), false, false);
00381 }
00382 else
00383 {
00384 filename = set->GetFilePathPrecomputedKernels
00385 (filename, "", false, false);
00386 }
00387 Read(temp, filename, db);
00388 mRowQuids->Append(temp);
00389 delete temp;
00390 }
00391 for (int c=0 ; c<mColumns->PartCount() ; ++c)
00392 {
00393 ILOG_DEBUG_NODE("loading column quids part " << c);
00394 Table::QuidTable* temp = new Table::QuidTable();
00395 String filename = filenamebase + ".columnindices" + MakeString(c)
00396 + "of" + MakeString(mColumns->PartCount()) + ".quids";
00397 if (set2)
00398 {
00399 filename = set2->GetFilePathPrecomputedKernels
00400 (filename, set->GetSetName(), false, false);
00401 }
00402 else
00403 {
00404 filename = set->GetFilePathPrecomputedKernels
00405 (filename, "", false, false);
00406 }
00407 Read(temp, filename, db);
00408 mColumnQuids->Append(temp);
00409 delete temp;
00410 }
00411 if (mRowQuids->Size() != mRows->TotalSize())
00412 ILOG_ERROR_NODE("number of row quids doesn't match totalrows "
00413 << mRows->TotalSize() << " != "
00414 << mRowQuids->Size());
00415 if (mColumnQuids->Size() != mColumns->TotalSize())
00416 ILOG_ERROR_NODE("number of column quids doesn't match total "
00417 << mColumns->TotalSize() << " != "
00418 << mColumnQuids->Size());
00419 Util::IOBuffer* buf = new Util::IOBuffer(mRows->TotalSize()*
00420 sizeof(Quid) + 0x1000);
00421 Write(mRowQuids, buf, true);
00422
00423 Broadcast(buf);
00424
00425 delete buf;
00426 buf = new Util::IOBuffer(mColumns->TotalSize()*sizeof(Quid) + 0x1000);
00427 Write(mColumnQuids, buf, true);
00428 Broadcast(buf);
00429 delete buf;
00430 }
00431 else
00432 {
00433 Util::IOBuffer* buf = new Util::IOBuffer();
00434
00435 Broadcast(buf);
00436
00437 if (buf && buf->Valid())
00438 {
00439 Read(mRowQuids, buf);
00440
00441 delete buf;
00442 }
00443 buf = new Util::IOBuffer();
00444 Broadcast(buf);
00445 if (buf && buf->Valid())
00446 {
00447 Read(mColumnQuids, buf);
00448
00449 delete buf;
00450 }
00451 if(mRowQuids->Size() != mRows->TotalSize())
00452 ILOG_ERROR_NODE("number of row quids doesn't match totalrows");
00453 if(mColumnQuids->Size() != mColumns->TotalSize())
00454 ILOG_ERROR_NODE("number of column quids doesn't match totalcolumns");
00455 }
00456 }
00457
00458 int GetPartNr(int rowpart, int columnpart)
00459 {
00460 if(rowpart >= mRows->PartCount() || rowpart < 0 ||
00461 columnpart >= mColumns->PartCount() || columnpart < 0)
00462 ILOG_ERROR_NODE("GetPartNr: invalid args: r=" << rowpart <<
00463 " c=" << columnpart);
00464 int partNr = rowpart*mColumns->PartCount() + columnpart;
00465 return partNr;
00466 }
00467
00468 int GetOwnerOfPart(int partNr)
00469 {
00470 int totalParts = mRows->PartCount() * mColumns->PartCount();
00471 return mStartNode + ((partNr * mNodeCount) / totalParts);
00472 }
00473
00474 Matrix::Mat* GetPart(int partNr)
00475 {
00476 if(GetOwnerOfPart(partNr) != Link::Mpi::MyId())
00477 return 0;
00478 int index = 0;
00479 for(int i=partNr-1 ; i>=0 ; --i)
00480 {
00481 if(GetOwnerOfPart(i) != Link::Mpi::MyId())
00482 break;
00483 ++index;
00484 }
00485 return mParts[index];
00486 }
00487
00488 int GetData(double* buffer, int bufferlength, Util::PropertySet& cmd,
00489 const std::vector<int>& parts)
00490 {
00491
00492 int received=0;
00493 for(int i=0 ; i<parts.size(); ++i)
00494 {
00495 int owner = GetOwnerOfPart(parts[i]);
00496 cmd.Add("part",parts[i]);
00497 Link::Mpi::SendString(cmd.GetDescription(), owner);
00498 double* b = buffer+received;
00499 int l = Link::Mpi::ReceiveData(b, bufferlength-received, owner);
00500 received +=l;
00501
00502 }
00503 return received;
00504 }
00505
00506 bool mValid;
00507 IndexConverter* mRows;
00508 IndexConverter* mColumns;
00509 int mStartNode, mNodeCount;
00510 std::vector<Matrix::Mat*> mParts;
00511 Table::QuidTable* mColumnQuids;
00512 Table::QuidTable* mRowQuids;
00513
00514 ILOG_VAR_DEC;
00515 };
00516
00517 ILOG_VAR_INIT(DistributedAccess, Impala.Core.Matrix);
00518
00519
00520 Feature::FeatureTable* LoadDistributedMatrix(DistributedAccess& access)
00521 {
00522 Table::QuidTable* quids = access.GetRowQuids();
00523 int rows = access.GetRows();
00524 int columns = access.GetColumns();
00525 Matrix::Mat* m = new Matrix::Mat(rows, columns, 0, 0);
00526 access.CopyParts(m);
00527
00528 Feature::FeatureDefinition fdef("precomputed");
00529 Feature::FeatureTable* f = new Feature::FeatureTable(fdef, rows, columns);
00530 f->GetColumn2()->SetStorage(m);
00531 Copy(f->GetColumn1(), quids->GetColumn1(), rows, 0, 0);
00532 f->SetSize(rows);
00533 return f;
00534 }
00535
00536
00537 Feature::FeatureTable* LoadDistributedMatrix(String name,
00538 Core::Database::RawDataSet* set,
00539 Core::Database::RawDataSet* set2)
00540 {
00541 ILOG_VAR(Core.Matrix.LoadDistributedMatrix);
00542 DistributedAccess access(name, set, set2, Link::Mpi::MyId(), 1);
00543 if(!access.Valid())
00544 {
00545 ILOG_ERROR("could not open distributed matrix \"" << name << "\"");
00546 return 0;
00547 }
00548 return LoadDistributedMatrix(access);
00549 }
00550
00551 }
00552 }
00553 }
00554
00555 #endif