00001 #ifndef Impala_Core_Matrix_DistributedAccess_h
00002 #define Impala_Core_Matrix_DistributedAccess_h
00003
00004 #include "Basis/String.h"
00005 #include "Core/Matrix/Mat.h"
00006 #include "Core/Matrix/MatFunc.h"
00007 #include "Core/Array/PrintData.h"
00008 #include "Core/Array/Set.h"
00009 #include "Core/Array/SetPart.h"
00010 #include "Link/Mpi/MpiFuncs.h"
00011 #include "Util/PropertySet.h"
00012 #include "Core/Matrix/MatTranspose.h"
00013 #include "Core/Matrix/GetDiagonal.h"
00014 #include "Core/Matrix/GetRow.h"
00015 #include "Core/Matrix/GetColumn.h"
00016 #include "Core/Vector/VectorTem.h"
00017 #include "Core/Table/QuidTable.h"
00018 #include "Core/Table/Copy.h"
00019
00020 #include "Core/Matrix/IDistributedAccess.h"
00021 #include "Core/Matrix/VirtualMatrix.h"
00022
00023 namespace Impala
00024 {
00025 namespace Core
00026 {
00027 namespace Matrix
00028 {
00029
00041 class DistributedAccess : public IDistributedAccess, public VirtualMatrix
00042 {
00043 public:
00044
00045 class IndexConverter
00046 {
00047 public:
00048 IndexConverter(int totalSize, int partCount)
00049 : mTotalSize(totalSize), mPartCount(partCount)
00050 {
00051 mBlockSize = mTotalSize / mPartCount + 1;
00052 ILOG_DEBUG_NODE("IndexConverter created: totalSize=" << mTotalSize
00053 << "; partCount=" << mPartCount << "; blockSize="
00054 << mBlockSize);
00055 }
00056
00057 int
00058 PartToIndex(int part) const
00059 {
00060 return Min(part * mBlockSize, mTotalSize);
00061 }
00062
00063 int
00064 IndexToPart(int index)
00065 {
00066 if ((index < 0) || (index >= mTotalSize))
00067 {
00068 ILOG_ERROR_NODE("DistributedAccess::IndexConverter::IndexToPart"
00069 <<" range error: "<< index <<" not in [0,"<<
00070 mTotalSize <<")");
00071 return 0;
00072 }
00073 return index / mBlockSize;
00074 }
00075
00076 int
00077 PartCount() const
00078 {
00079 return mPartCount;
00080 }
00081
00082 int
00083 TotalSize() const
00084 {
00085 return mTotalSize;
00086 }
00087
00088 private:
00089
00090 const int mTotalSize;
00091 const int mPartCount;
00092 int mBlockSize;
00093 };
00094
00096
00097 typedef Vector::VectorTem<Real64> Vector64;
00098
00099 DistributedAccess()
00100 {
00101 int cpuCount = Link::Mpi::NrProcs();
00102 int partCount = sqrt((double)cpuCount);
00103 if (partCount * partCount != cpuCount)
00104 {
00105 ILOG_ERROR("Unsupported number of nodes, number of nodes MUST be"
00106 << " sqare of a natural number");
00107 mRows = 0;
00108 return;
00109 }
00110
00111 mRows = new IndexConverter(0, partCount);
00112 mColumns = new IndexConverter(0, partCount);
00113 mStartNode = 0;
00114 mNodeCount = cpuCount;
00115 mRowQuids = new Table::QuidTable();
00116 mColumnQuids = new Table::QuidTable();
00117 mHasOwnAverages = false;
00118 }
00119
00120 DistributedAccess(int totalRows, int totalColumns,
00121 int rowParts, int columnParts,
00122 int startNode, int nodeCount)
00123 {
00124 mRows = new IndexConverter(totalRows, rowParts);
00125 mColumns = new IndexConverter(totalColumns, columnParts);
00126 mStartNode = startNode;
00127 mNodeCount = nodeCount;
00128 mRowQuids = new Table::QuidTable();
00129 mColumnQuids = new Table::QuidTable();
00130 mHasOwnAverages = false;
00131 }
00132
00133 virtual
00134 ~DistributedAccess()
00135 {
00136 for (int i=0 ; i<mParts.size() ; i++)
00137 delete mParts[i];
00138 mParts.clear();
00139 if (mRowQuids)
00140 delete mRowQuids;
00141 if (mColumnQuids)
00142 delete mColumnQuids;
00143 if (mRows)
00144 delete mRows;
00145 if (mColumns)
00146 delete mColumns;
00147 }
00148
00149 bool
00150 Valid(bool ctorOnly, bool withParts)
00151 {
00152 if (mRows == 0)
00153 return false;
00154 if (ctorOnly)
00155 return true;
00156 if ((mColumnQuids->Size() == 0) || (mRowQuids->Size() == 0))
00157 return false;
00158 if (!withParts)
00159 return true;
00160 return (mParts.size() != 0);
00161 }
00162
00163 bool
00164 Symmetric()
00165 {
00166 if (mRows == 0)
00167 return false;
00168 if (GetRows() != GetColumns())
00169 return false;
00170
00171 return true;
00172 }
00173
00174
00175 virtual int
00176 NrRow()
00177 {
00178 return mRows->TotalSize();
00179 }
00180
00181
00182 virtual int
00183 NrCol()
00184 {
00185 return mColumns->TotalSize();
00186 }
00187
00188 int
00189 GetRows()
00190 {
00191 return mRows->TotalSize();
00192 }
00193
00194 void
00195 SetRows(int nr)
00196 {
00197 int partCount = mRows->PartCount();
00198 delete mRows;
00199 mRows = new IndexConverter(nr, partCount);
00200 }
00201
00202 int
00203 GetColumns()
00204 {
00205 return mColumns->TotalSize();
00206 }
00207
00208 void
00209 SetColumns(int nr)
00210 {
00211 int partCount = mColumns->PartCount();
00212 delete mColumns;
00213 mColumns = new IndexConverter(nr, partCount);
00214 }
00215
00216 int
00217 GetRowPartCount() const
00218 {
00219 return mRows->PartCount();
00220 }
00221
00222 int
00223 GetColumnPartCount() const
00224 {
00225 return mColumns->PartCount();
00226 }
00227
00228 int
00229 GetTotalPartCount() const
00230 {
00231 return GetRowPartCount() * GetColumnPartCount();
00232 }
00233
00234 int
00235 GetStartNode() const
00236 {
00237 return mStartNode;
00238 }
00239
00240 Table::QuidTable*
00241 GetRowQuids()
00242 {
00243 return mRowQuids;
00244 }
00245
00246 Table::QuidTable*
00247 GetColumnQuids()
00248 {
00249 return mColumnQuids;
00250 }
00251
00252 void
00253 CopyQuidsFrom(DistributedAccess* arg)
00254 {
00255 Copy(mRowQuids, arg->mRowQuids);
00256 Copy(mColumnQuids, arg->mColumnQuids);
00257 }
00258
00259 int
00260 GetRowStartOfPart(int rowPartIdx) const
00261 {
00262 return mRows->PartToIndex(rowPartIdx);
00263 }
00264
00265 int
00266 GetRowEndOfPart(int rowPartIdx)
00267 {
00268 return mRows->PartToIndex(rowPartIdx + 1);
00269 }
00270
00271 int
00272 GetColumnStartOfPart(int columnPartIdx)
00273 {
00274 return mColumns->PartToIndex(columnPartIdx);
00275 }
00276
00277 int
00278 GetColumnEndOfPart(int columnPartIdx)
00279 {
00280 return mColumns->PartToIndex(columnPartIdx + 1);
00281 }
00282
00283 int
00284 GetPartNr(int rowpart, int columnpart)
00285 {
00286 if (rowpart >= GetRowPartCount() || rowpart < 0 ||
00287 columnpart >= GetColumnPartCount() || columnpart < 0)
00288 {
00289 ILOG_ERROR_NODE("GetPartNr: invalid args: r=" << rowpart <<
00290 " c=" << columnpart);
00291 }
00292 int partNr = rowpart * GetColumnPartCount() + columnpart;
00293 return partNr;
00294 }
00295
00296 int
00297 GetOwnerOfPart(int partNr)
00298 {
00299 int totalParts = GetRowPartCount() * GetColumnPartCount();
00300 return mStartNode + ((partNr * mNodeCount) / totalParts);
00301 }
00302
00303 int
00304 GetRowPartOfPart(int partNr)
00305 {
00306 return partNr / GetColumnPartCount();
00307 }
00308
00309 int
00310 GetColumnPartOfPart(int partNr)
00311 {
00312 return partNr % GetColumnPartCount();
00313 }
00314
00315 VirtualMatrix*
00316 GetPart(int partNr)
00317 {
00318 if (GetOwnerOfPart(partNr) != Link::Mpi::MyId())
00319 {
00320 ILOG_ERROR_NODE("GetPart: I am not the owner of part " << partNr);
00321 return 0;
00322 }
00323 int index = 0;
00324 for (int i=partNr-1 ; i>=0 ; --i)
00325 {
00326 if (GetOwnerOfPart(i) != Link::Mpi::MyId())
00327 break;
00328 index++;
00329 }
00330 if ((index < 0) || (index >= mParts.size()))
00331 {
00332 ILOG_ERROR_NODE("GetPart: cannot find part " << partNr <<
00333 " (index = " << index << ")");
00334 return 0;
00335 }
00336 return mParts[index];
00337 }
00338
00339 Matrix::Mat*
00340 StealPart()
00341 {
00342 if (mParts.size() != 1)
00343 {
00344 ILOG_ERROR("[StealPart] Only works with one part");
00345 return 0;
00346 }
00347 Matrix::Mat* m = mParts[0]->GetStorage();
00348 if (!m)
00349 {
00350 ILOG_ERROR("[StealPart] Couldn't steal from VirtualMatrix");
00351 return 0;
00352 }
00353 mParts.clear();
00354 return m;
00355 }
00356
00357 void
00358 AddPart(VirtualMatrix* part)
00359 {
00360 mParts.push_back(part);
00361 }
00362
00363 void
00364 AddFeature(CString feature, double weight, double average)
00365 {
00366 mFeatures.push_back(feature);
00367 mWeights.push_back(weight);
00368 mAverages.push_back(average);
00369 }
00370
00371 int
00372 GetNrFeatures() const
00373 {
00374 return mFeatures.size();
00375 }
00376
00377 String
00378 GetFeature(int i) const
00379 {
00380 return mFeatures[i];
00381 }
00382
00383 double
00384 GetWeight(int i) const
00385 {
00386 return mWeights[i];
00387 }
00388
00389 double
00390 GetTotalWeight() const
00391 {
00392 int total = 0;
00393 for (int i=0 ; i<GetNrFeatures() ; i++)
00394 total += GetWeight(i);
00395 return total;
00396 }
00397
00398 double
00399 GetAverage(int i) const
00400 {
00401 return mAverages[i];
00402 }
00403
00404 void
00405 SetAverage(int i, double value)
00406 {
00407 if ((i >= 0) && (i < mAverages.size()))
00408 mAverages[i] = value;
00409 else
00410 ILOG_ERROR("SetAverage: index " << i << " out of range");
00411 }
00412
00413
00414
00415 bool
00416 GetHasOwnAverages() const
00417 {
00418 return mHasOwnAverages;
00419 }
00420
00421 void
00422 SetHasOwnAverages(bool flag)
00423 {
00424 mHasOwnAverages = flag;
00425 }
00426
00427 void
00428 CopyFeaturesFrom(DistributedAccess* arg)
00429 {
00430 mFeatures = arg->mFeatures;
00431 mWeights = arg->mWeights;
00432 mAverages = arg->mAverages;
00433 mHasOwnAverages = arg->mHasOwnAverages;
00434 }
00435
00436 void
00437 Subscribe()
00438 {
00439 Util::PropertySet cmd;
00440 cmd.Add("cmd","subscribe");
00441 for (int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00442 Link::Mpi::SendString(cmd.GetDescription(), i);
00443 }
00444
00445 void
00446 Unsubscribe()
00447 {
00448 Util::PropertySet cmd;
00449 cmd.Add("cmd","unsubscribe");
00450 for (int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00451 Link::Mpi::SendString(cmd.GetDescription(), i);
00452 }
00453
00454 int
00455 GetColumn(int index, double* buffer, int bufferlength)
00456 {
00457 if (Symmetric())
00458 return GetRow(index, buffer, bufferlength);
00459
00460 Util::PropertySet cmd;
00461 cmd.Add("cmd", "column");
00462 cmd.Add("arg", index);
00463 int columnPart = mColumns->IndexToPart(index);
00464 std::vector<int> parts;
00465 for (int i=0 ; i<GetRowPartCount() ; i++)
00466 parts.push_back(GetPartNr(i, columnPart));
00467 int msglength = GetData(buffer, bufferlength, cmd, parts);
00468 return msglength;
00469 }
00470
00471 int
00472 GetRow(int index, double* buffer, int bufferlength)
00473 {
00474 Util::PropertySet cmd;
00475 cmd.Add("cmd", "row");
00476 cmd.Add("arg", index);
00477 int rowPart = mRows->IndexToPart(index);
00478 std::vector<int> parts;
00479 for (int i=0 ; i<GetColumnPartCount() ; i++)
00480 parts.push_back(GetPartNr(rowPart, i));
00481 int msglength = GetData(buffer, bufferlength, cmd, parts);
00482 return msglength;
00483 }
00484
00485 int
00486 GetDiagonal(double* buffer, int bufferlength)
00487 {
00488 Util::PropertySet cmd;
00489 cmd.Add("cmd", "diagonal");
00490 std::vector<int> parts;
00491 for (int i=0 ; i<GetRowPartCount() ; ++i)
00492 parts.push_back(GetPartNr(i, i));
00493 int msglength = GetData(buffer, bufferlength, cmd, parts);
00494 return msglength;
00495 }
00496
00497 void
00498 StartEventLoop()
00499 {
00500 int subscribers = 0;
00501 do
00502 {
00503 int source;
00504 String messageString = Link::Mpi::ReceiveString(source);
00505 Util::PropertySet message(messageString);
00506 String cmd = message.GetString("cmd");
00507 ILOG_DEBUG_NODE("received cmd: " << messageString);
00508 if (cmd == "subscribe")
00509 {
00510 subscribers++;
00511 ILOG_DEBUG_NODE("subscription added: id=" << source <<
00512 " new #=" << subscribers);
00513 continue;
00514 }
00515 if (cmd == "unsubscribe")
00516 {
00517 subscribers--;
00518 ILOG_DEBUG_NODE("subscription removed: #=" << subscribers);
00519 continue;
00520 }
00521 if ((cmd != "diagonal") && (cmd != "row") && (cmd != "column"))
00522 ILOG_ERROR("unknown command: " << cmd);
00523 int part = message.GetInt("part");
00524 int index = message.GetInt("arg");
00525 Vector64 v = GetDataFromPart(cmd, part, index);
00526 Link::Mpi::SendData(v.GetData(), v.Size(), source);
00527 }
00528 while (subscribers > 0);
00529 }
00530
00531 void
00532 Dump(int cornerWidth = 4, int cornerHeight = 4)
00533 {
00534 std::cout << "Dumping DistributedAccess" << std::endl;
00535 std::cout << "rows x columns = " << GetRows() << " x " << GetColumns()
00536 << ", parts = " << GetRowPartCount() << " x "
00537 << GetColumnPartCount() << std::endl;
00538 for (int i=0 ; i<mFeatures.size() ; i++)
00539 {
00540 std::cout << "feature " << i << " = " << mFeatures[i]
00541 << " weight=" << mWeights[i] << " average="
00542 << mAverages[i] << std::endl;
00543 }
00544 int id = Link::Mpi::MyId();
00545 std::cout << "my id = " << id << std::endl;
00546 if (mParts.size() == 0)
00547 {
00548 std::cout << "no parts loaded" << std::endl;
00549 return;
00550 }
00551
00552 for (int r=0 ; r<GetRowPartCount() ; r++)
00553 {
00554 for (int c=0 ; c<GetColumnPartCount() ; c++)
00555 {
00556 int i = GetPartNr(r,c);
00557 std::cout << "r=" << r << ", c=" << c << ", part=" << i
00558 << ", owner=" << GetOwnerOfPart(i);
00559 VirtualMatrix* mat = GetPart(i);
00560 if (mat)
00561 std::cout << ", part rows x cols = " << mat->NrRow()
00562 << " x " << mat->NrCol() << std::endl;
00563 else
00564 std::cout << ", part=0" << std::endl;
00565 }
00566 }
00567
00568 int rowSize = GetColumns();
00569 std::cout << "rowSize = " << rowSize << std::endl;
00570 double* rowBuf = new double[rowSize];
00571 for (int y=0 ; y<GetRows() ; y++)
00572 {
00573 if ((y < cornerHeight) || (y >= (GetRows()-cornerHeight)))
00574 {
00575 int res = GetRow(y, rowBuf, rowSize);
00576 if (res != rowSize)
00577 {
00578 ILOG_ERROR("[Dump2]: row=" << y << ", received " << res <<
00579 " instead of " << rowSize);
00580 }
00581 std::cout << "row = " << y << " :- ";
00582 for (int x=0 ; x<rowSize ; x++)
00583 {
00584 if ((x < cornerWidth) || (x >= (rowSize-cornerWidth)))
00585 {
00586 std::cout << rowBuf[x] << ", ";
00587 }
00588 else
00589 {
00590 if (x == cornerWidth)
00591 std::cout << "..., ";
00592 }
00593 }
00594 std::cout << std::endl;
00595 }
00596 else
00597 {
00598 if (y == cornerHeight)
00599 std::cout << "................." << std::endl;
00600 }
00601 }
00602 delete rowBuf;
00603 std::cout << "Dump done" << std::endl;
00604 }
00605
00606 private:
00607
00608
00609 virtual bool
00610 HasGetRowImplReal64()
00611 {
00612 return true;
00613 }
00614
00615
00616 virtual int
00617 GetRowImpl(int rowNr, Real64* buffer, int bufferSize)
00618 {
00619 return GetRow(rowNr, buffer, bufferSize);
00620 }
00621
00622
00623 virtual int
00624 GetDiagonalImpl(Real64* buffer, int bufferSize)
00625 {
00626 return GetDiagonal(buffer, bufferSize);
00627 }
00628
00629 int
00630 GetData(double* buffer, int bufferlength, Util::PropertySet& cmd,
00631 const std::vector<int>& parts)
00632 {
00633 int received=0;
00634 for (int i=0 ; i<parts.size(); i++)
00635 {
00636 if (Link::Mpi::MpiUsed())
00637 {
00638 int owner = GetOwnerOfPart(parts[i]);
00639 cmd.Add("part", parts[i]);
00640 Link::Mpi::SendString(cmd.GetDescription(), owner);
00641 double* b = buffer + received;
00642 int l = Link::Mpi::ReceiveData(b, bufferlength-received, owner);
00643 received += l;
00644 }
00645 else
00646 {
00647 Vector64 v = GetDataFromPart(cmd.GetString("cmd"), parts[i],
00648 cmd.GetInt("arg"));
00649 double* b = buffer + received;
00650 if (v.Size() > bufferlength-received)
00651 ILOG_ERROR("[GetData] buffer too small");
00652 for (int j=0 ; j<v.Size() ; j++)
00653 b[j] = v.Elem(j);
00654 received += v.Size();
00655 }
00656 }
00657 return received;
00658 }
00659
00660 Vector64
00661 GetDataFromPart(CString cmd, int part, int index)
00662 {
00663 Vector64 v;
00664 if (cmd == "diagonal")
00665 v = GetDiagonalFromPart(part);
00666 if (cmd == "column")
00667 v = GetColumnFromPart(part, index);
00668 if (cmd == "row")
00669 v = GetRowFromPart(part, index);
00670 return v;
00671 }
00672
00673 Vector64
00674 GetColumnFromPart(int part, int index)
00675 {
00676 Vector64 v;
00677 int colPart = mColumns->IndexToPart(index);
00678 index -= mColumns->PartToIndex(colPart);
00679 ILOG_ERROR("[GetColumnFromPart] Not supported");
00680 return v;
00681 }
00682
00683 Vector64
00684 GetRowFromPart(int part, int index)
00685 {
00686 Vector64 v;
00687 int rowPart = mRows->IndexToPart(index);
00688 index -= mRows->PartToIndex(rowPart);
00689 if (mParts.size() != 0)
00690 v = GetPart(part)->GetRow(index);
00691 return v;
00692 }
00693
00694 Vector64
00695 GetDiagonalFromPart(int part)
00696 {
00697 Vector64 v;
00698 if (mParts.size() != 0)
00699 v = GetPart(part)->GetDiagonal();
00700 return v;
00701 }
00702
00703 IndexConverter* mRows;
00704 IndexConverter* mColumns;
00705 int mStartNode;
00706 int mNodeCount;
00707 Table::QuidTable* mRowQuids;
00708 Table::QuidTable* mColumnQuids;
00709 std::vector<VirtualMatrix*> mParts;
00710 std::vector<String> mFeatures;
00711 std::vector<double> mWeights;
00712 std::vector<double> mAverages;
00713 bool mHasOwnAverages;
00714
00715 ILOG_VAR_DEC;
00716 };
00717
00718 ILOG_VAR_INIT(DistributedAccess, Impala.Core.Matrix);
00719
00720
00721 }
00722 }
00723 }
00724
00725 #endif