Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

DistributedAccess.h

Go to the documentation of this file.
00001 #ifndef Impala_Core_Matrix_DistributedAccess_h
00002 #define Impala_Core_Matrix_DistributedAccess_h
00003 
00004 #include "Basis/String.h"
00005 #include "Core/Matrix/Mat.h"
00006 #include "Core/Matrix/MatFunc.h"
00007 #include "Core/Array/PrintData.h"
00008 #include "Core/Array/Set.h"
00009 #include "Core/Array/SetPart.h"
00010 #include "Link/Mpi/MpiFuncs.h"
00011 #include "Util/PropertySet.h"
00012 #include "Core/Matrix/MatTranspose.h"
00013 #include "Core/Matrix/GetDiagonal.h"
00014 #include "Core/Matrix/GetRow.h"
00015 #include "Core/Matrix/GetColumn.h"
00016 #include "Core/Vector/VectorTem.h"
00017 #include "Core/Table/QuidTable.h"
00018 #include "Core/Table/Copy.h"
00019 
00020 #include "Core/Matrix/IDistributedAccess.h"
00021 #include "Core/Matrix/VirtualMatrix.h"
00022 
00023 namespace Impala
00024 {
00025 namespace Core
00026 {
00027 namespace Matrix
00028 {
00029 
00041     class DistributedAccess : public IDistributedAccess, public VirtualMatrix
00042 {
00043 public:
00044 
00045     class IndexConverter
00046     {
00047     public:
00048         IndexConverter(int totalSize, int partCount)
00049             : mTotalSize(totalSize), mPartCount(partCount)
00050         {
00051             mBlockSize = mTotalSize / mPartCount + 1;
00052             ILOG_DEBUG_NODE("IndexConverter created: totalSize=" << mTotalSize
00053                             << "; partCount=" << mPartCount << "; blockSize="
00054                             << mBlockSize);
00055         }
00056         
00057         int
00058         PartToIndex(int part) const
00059         {
00060             return Min(part * mBlockSize, mTotalSize);
00061         }
00062         
00063         int
00064         IndexToPart(int index)
00065         {
00066             if ((index < 0) || (index >= mTotalSize))
00067             {
00068                 ILOG_ERROR_NODE("DistributedAccess::IndexConverter::IndexToPart"
00069                                 <<" range error: "<< index <<" not in [0,"<<
00070                                 mTotalSize <<")");
00071                 return 0;
00072             }
00073             return index / mBlockSize;
00074         }
00075         
00076         int
00077         PartCount() const
00078         {
00079             return mPartCount;
00080         }
00081         
00082         int
00083         TotalSize() const
00084         {
00085             return mTotalSize;
00086         }
00087 
00088     private:
00089 
00090         const int mTotalSize;
00091         const int mPartCount;
00092         int mBlockSize;
00093     };
00094 
00096 
00097     typedef Vector::VectorTem<Real64> Vector64;
00098 
00099     DistributedAccess()
00100     {
00101         int cpuCount = Link::Mpi::NrProcs();
00102         int partCount = sqrt((double)cpuCount);
00103         if (partCount * partCount != cpuCount)
00104         {
00105             ILOG_ERROR("Unsupported number of nodes, number of nodes MUST be"
00106                        << " sqare of a natural number");
00107             mRows = 0;
00108             return;
00109         }
00110 
00111         mRows = new IndexConverter(0, partCount);
00112         mColumns = new IndexConverter(0, partCount);
00113         mStartNode = 0;
00114         mNodeCount = cpuCount;
00115         mRowQuids = new Table::QuidTable();
00116         mColumnQuids = new Table::QuidTable();
00117         mHasOwnAverages = false;
00118     }
00119 
00120     DistributedAccess(int totalRows, int totalColumns,
00121                       int rowParts, int columnParts,
00122                       int startNode, int nodeCount)
00123     {
00124         mRows = new IndexConverter(totalRows, rowParts);
00125         mColumns = new IndexConverter(totalColumns, columnParts);
00126         mStartNode = startNode;
00127         mNodeCount = nodeCount;
00128         mRowQuids = new Table::QuidTable();
00129         mColumnQuids = new Table::QuidTable();
00130         mHasOwnAverages = false;
00131     }
00132 
00133     virtual
00134     ~DistributedAccess()
00135     {
00136         for (int i=0 ; i<mParts.size() ; i++)
00137             delete mParts[i];
00138         mParts.clear();
00139         if (mRowQuids)
00140             delete mRowQuids;
00141         if (mColumnQuids)
00142             delete mColumnQuids;
00143         if (mRows)
00144             delete mRows;
00145         if (mColumns)
00146             delete mColumns;
00147     }
00148 
00149     bool
00150     Valid(bool ctorOnly, bool withParts)
00151     {
00152         if (mRows == 0)
00153             return false;
00154         if (ctorOnly)
00155             return true;
00156         if ((mColumnQuids->Size() == 0) || (mRowQuids->Size() == 0))
00157             return false;
00158         if (!withParts)
00159             return true;
00160         return (mParts.size() != 0);
00161     }
00162 
00163     bool
00164     Symmetric()
00165     {
00166         if (mRows == 0)
00167             return false;
00168         if (GetRows() != GetColumns())
00169             return false;
00170         // Check some quids?
00171         return true;
00172     }
00173 
00174     // Specialiation of VirtualMatrix
00175     virtual int
00176     NrRow()
00177     {
00178         return mRows->TotalSize();
00179     }
00180 
00181     // Specialiation of VirtualMatrix
00182     virtual int
00183     NrCol()
00184     {
00185         return mColumns->TotalSize();
00186     }
00187 
00188     int
00189     GetRows()
00190     {
00191         return mRows->TotalSize();
00192     }
00193 
00194     void
00195     SetRows(int nr)
00196     {
00197         int partCount = mRows->PartCount();
00198         delete mRows;
00199         mRows = new IndexConverter(nr, partCount);
00200     }
00201 
00202     int
00203     GetColumns()
00204     {
00205         return mColumns->TotalSize();
00206     }
00207 
00208     void
00209     SetColumns(int nr)
00210     {
00211         int partCount = mColumns->PartCount();
00212         delete mColumns;
00213         mColumns = new IndexConverter(nr, partCount);
00214     }
00215 
00216     int
00217     GetRowPartCount() const
00218     {
00219         return mRows->PartCount();
00220     }
00221 
00222     int
00223     GetColumnPartCount() const
00224     {
00225         return mColumns->PartCount();
00226     }
00227 
00228     int
00229     GetTotalPartCount() const
00230     {
00231         return GetRowPartCount() * GetColumnPartCount();
00232     }
00233 
00234     int
00235     GetStartNode() const
00236     {
00237         return mStartNode;
00238     }
00239 
00240     Table::QuidTable*
00241     GetRowQuids()
00242     {
00243         return mRowQuids;
00244     }
00245 
00246     Table::QuidTable*
00247     GetColumnQuids()
00248     {
00249         return mColumnQuids;
00250     }
00251 
00252     void
00253     CopyQuidsFrom(DistributedAccess* arg)
00254     {
00255         Copy(mRowQuids, arg->mRowQuids);
00256         Copy(mColumnQuids, arg->mColumnQuids);
00257     }
00258 
00259     int
00260     GetRowStartOfPart(int rowPartIdx) const
00261     {
00262         return mRows->PartToIndex(rowPartIdx);
00263     }
00264 
00265     int
00266     GetRowEndOfPart(int rowPartIdx)
00267     {
00268         return mRows->PartToIndex(rowPartIdx + 1);
00269     }
00270 
00271     int
00272     GetColumnStartOfPart(int columnPartIdx)
00273     {
00274         return mColumns->PartToIndex(columnPartIdx);
00275     }
00276 
00277     int
00278     GetColumnEndOfPart(int columnPartIdx)
00279     {
00280         return mColumns->PartToIndex(columnPartIdx + 1);
00281     }
00282 
00283     int
00284     GetPartNr(int rowpart, int columnpart)
00285     {
00286         if (rowpart >= GetRowPartCount() || rowpart < 0 ||
00287             columnpart >= GetColumnPartCount() || columnpart < 0)
00288         {
00289            ILOG_ERROR_NODE("GetPartNr: invalid args: r=" << rowpart << 
00290                            " c=" << columnpart);
00291         }
00292         int partNr = rowpart * GetColumnPartCount() + columnpart;
00293         return partNr;
00294     }
00295 
00296     int
00297     GetOwnerOfPart(int partNr)
00298     {
00299         int totalParts = GetRowPartCount() * GetColumnPartCount();
00300         return mStartNode + ((partNr * mNodeCount) / totalParts);
00301     }
00302 
00303     int
00304     GetRowPartOfPart(int partNr)
00305     {
00306         return partNr / GetColumnPartCount();
00307     }
00308 
00309     int
00310     GetColumnPartOfPart(int partNr)
00311     {
00312         return partNr % GetColumnPartCount();
00313     }
00314 
00315     VirtualMatrix*
00316     GetPart(int partNr)
00317     {
00318         if (GetOwnerOfPart(partNr) != Link::Mpi::MyId())
00319         {
00320             ILOG_ERROR_NODE("GetPart: I am not the owner of part " << partNr);
00321             return 0;
00322         }
00323         int index = 0;
00324         for (int i=partNr-1 ; i>=0 ; --i)
00325         {
00326             if (GetOwnerOfPart(i) != Link::Mpi::MyId())
00327                 break;
00328             index++;
00329         }
00330         if ((index < 0) || (index >= mParts.size()))
00331         {
00332             ILOG_ERROR_NODE("GetPart: cannot find part " << partNr <<
00333                             " (index = " << index << ")");
00334             return 0;
00335         }
00336         return mParts[index];
00337     }
00338 
00339     Matrix::Mat*
00340     StealPart()
00341     {
00342         if (mParts.size() != 1)
00343         {
00344             ILOG_ERROR("[StealPart] Only works with one part");
00345             return 0;
00346         }
00347         Matrix::Mat* m = mParts[0]->GetStorage();
00348         if (!m)
00349         {
00350             ILOG_ERROR("[StealPart] Couldn't steal from VirtualMatrix");
00351             return 0;
00352         }
00353         mParts.clear();
00354         return m;
00355     }
00356 
00357     void
00358     AddPart(VirtualMatrix* part)
00359     {
00360         mParts.push_back(part);
00361     }
00362 
00363     void
00364     AddFeature(CString feature, double weight, double average)
00365     {
00366         mFeatures.push_back(feature);
00367         mWeights.push_back(weight);
00368         mAverages.push_back(average);
00369     }
00370 
00371     int
00372     GetNrFeatures() const
00373     {
00374         return mFeatures.size();
00375     }
00376 
00377     String
00378     GetFeature(int i) const
00379     {
00380         return mFeatures[i];
00381     }
00382 
00383     double
00384     GetWeight(int i) const
00385     {
00386         return mWeights[i];
00387     }
00388 
00389     double
00390     GetTotalWeight() const
00391     {
00392         int total = 0;
00393         for (int i=0 ; i<GetNrFeatures() ; i++)
00394             total += GetWeight(i);
00395         return total;
00396     }
00397 
00398     double
00399     GetAverage(int i) const
00400     {
00401         return mAverages[i];
00402     }
00403 
00404     void
00405     SetAverage(int i, double value)
00406     {
00407         if ((i >= 0) && (i < mAverages.size()))
00408             mAverages[i] = value;
00409         else
00410             ILOG_ERROR("SetAverage: index " << i << " out of range");
00411     }
00412 
00413     // Indicates whether this has its own averages or whether they come from
00414     // a development set.
00415     bool
00416     GetHasOwnAverages() const
00417     {
00418         return mHasOwnAverages;
00419     }
00420 
00421     void
00422     SetHasOwnAverages(bool flag)
00423     {
00424         mHasOwnAverages = flag;
00425     }
00426 
00427     void
00428     CopyFeaturesFrom(DistributedAccess* arg)
00429     {
00430         mFeatures = arg->mFeatures;
00431         mWeights = arg->mWeights;
00432         mAverages = arg->mAverages;
00433         mHasOwnAverages = arg->mHasOwnAverages;
00434     }
00435 
00436     void
00437     Subscribe()
00438     {
00439         Util::PropertySet cmd;
00440         cmd.Add("cmd","subscribe");
00441         for (int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00442             Link::Mpi::SendString(cmd.GetDescription(), i);
00443     }
00444 
00445     void
00446     Unsubscribe()
00447     {
00448         Util::PropertySet cmd;
00449         cmd.Add("cmd","unsubscribe");
00450         for (int i=mStartNode ; i<mStartNode+mNodeCount ; ++i)
00451             Link::Mpi::SendString(cmd.GetDescription(), i);
00452     }
00453 
00454     int
00455     GetColumn(int index, double* buffer, int bufferlength)
00456     {
00457         if (Symmetric())
00458             return GetRow(index, buffer, bufferlength);
00459 
00460         Util::PropertySet cmd;
00461         cmd.Add("cmd", "column");
00462         cmd.Add("arg", index);
00463         int columnPart = mColumns->IndexToPart(index);
00464         std::vector<int> parts;
00465         for (int i=0 ; i<GetRowPartCount() ; i++)
00466            parts.push_back(GetPartNr(i, columnPart));
00467         int msglength = GetData(buffer, bufferlength, cmd, parts);
00468         return msglength;
00469     }
00470 
00471     int
00472     GetRow(int index, double* buffer, int bufferlength)
00473     {
00474         Util::PropertySet cmd;
00475         cmd.Add("cmd", "row");
00476         cmd.Add("arg", index);
00477         int rowPart = mRows->IndexToPart(index);
00478         std::vector<int> parts;
00479         for (int i=0 ; i<GetColumnPartCount() ; i++)
00480            parts.push_back(GetPartNr(rowPart, i));
00481         int msglength = GetData(buffer, bufferlength, cmd, parts);
00482         return msglength;
00483     }
00484 
00485     int
00486     GetDiagonal(double* buffer, int bufferlength)
00487     {
00488         Util::PropertySet cmd;
00489         cmd.Add("cmd", "diagonal");
00490         std::vector<int> parts;
00491         for (int i=0 ; i<GetRowPartCount() ; ++i)
00492            parts.push_back(GetPartNr(i, i));
00493         int msglength = GetData(buffer, bufferlength, cmd, parts);
00494         return msglength;
00495     }
00496 
00497     void
00498     StartEventLoop()
00499     {
00500         int subscribers = 0;
00501         do
00502         {
00503             int source;
00504             String messageString = Link::Mpi::ReceiveString(source);
00505             Util::PropertySet message(messageString);
00506             String cmd = message.GetString("cmd");
00507             ILOG_DEBUG_NODE("received cmd: " << messageString);
00508             if (cmd == "subscribe")
00509             {
00510                 subscribers++;
00511                 ILOG_DEBUG_NODE("subscription added: id=" << source <<
00512                                 " new #=" << subscribers);
00513                 continue;
00514             }
00515             if (cmd == "unsubscribe")
00516             {
00517                 subscribers--;
00518                 ILOG_DEBUG_NODE("subscription removed: #=" << subscribers);
00519                 continue;
00520             }
00521             if ((cmd != "diagonal") && (cmd != "row") && (cmd != "column"))
00522                 ILOG_ERROR("unknown command: " << cmd);
00523             int part = message.GetInt("part");
00524             int index = message.GetInt("arg");
00525             Vector64 v = GetDataFromPart(cmd, part, index);
00526             Link::Mpi::SendData(v.GetData(), v.Size(), source);
00527         }
00528         while (subscribers > 0);
00529     }
00530 
00531     void
00532     Dump(int cornerWidth = 4, int cornerHeight = 4)
00533     {
00534         std::cout << "Dumping DistributedAccess" << std::endl;
00535         std::cout << "rows x columns = " << GetRows() << " x " << GetColumns()
00536                   << ", parts = " << GetRowPartCount() << " x "
00537                   << GetColumnPartCount() << std::endl;
00538         for (int i=0 ; i<mFeatures.size() ; i++)
00539         {
00540             std::cout << "feature " << i << " = " << mFeatures[i]
00541                       << " weight=" << mWeights[i] << " average="
00542                       << mAverages[i] << std::endl;
00543         }
00544         int id = Link::Mpi::MyId();
00545         std::cout << "my id = " << id << std::endl;
00546         if (mParts.size() == 0)
00547         {
00548             std::cout << "no parts loaded" << std::endl;
00549             return;
00550         }
00551 
00552         for (int r=0 ; r<GetRowPartCount() ; r++)
00553         {
00554             for (int c=0 ; c<GetColumnPartCount() ; c++)
00555             {
00556                 int i = GetPartNr(r,c);
00557                 std::cout << "r=" << r << ", c=" << c << ", part=" << i
00558                           << ", owner=" << GetOwnerOfPart(i);
00559                 VirtualMatrix* mat = GetPart(i);
00560                 if (mat)
00561                     std::cout << ", part rows x cols = " << mat->NrRow()
00562                               << " x " << mat->NrCol() << std::endl;
00563                     else
00564                         std::cout << ", part=0" << std::endl;
00565             }
00566         }
00567 
00568         int rowSize = GetColumns();
00569         std::cout << "rowSize = " << rowSize << std::endl;
00570         double* rowBuf = new double[rowSize];
00571         for (int y=0 ; y<GetRows() ; y++)
00572         {
00573             if ((y < cornerHeight) || (y >= (GetRows()-cornerHeight)))
00574             {
00575                 int res = GetRow(y, rowBuf, rowSize);
00576                 if (res != rowSize)
00577                 {
00578                     ILOG_ERROR("[Dump2]: row=" << y << ", received " << res <<
00579                                " instead of " << rowSize);
00580                 }
00581                 std::cout << "row = " << y << " :- ";
00582                 for (int x=0 ; x<rowSize ; x++)
00583                 {
00584                     if ((x < cornerWidth) || (x >= (rowSize-cornerWidth)))
00585                     {
00586                         std::cout << rowBuf[x] << ", ";
00587                     }
00588                     else
00589                     {
00590                         if (x == cornerWidth)
00591                             std::cout << "..., ";
00592                     }
00593                 }
00594                 std::cout << std::endl;
00595             }
00596             else
00597             {
00598                 if (y == cornerHeight)
00599                     std::cout << "................." << std::endl;
00600             }
00601         }
00602         delete rowBuf;
00603         std::cout << "Dump done" << std::endl;
00604     }
00605 
00606 private:
00607 
00608     // Specialiation of VirtualMatrix
00609     virtual bool
00610     HasGetRowImplReal64()
00611     {
00612         return true;
00613     }
00614 
00615     // Specialiation of VirtualMatrix
00616     virtual int
00617     GetRowImpl(int rowNr, Real64* buffer, int bufferSize)
00618     {
00619         return GetRow(rowNr, buffer, bufferSize);
00620     }
00621 
00622     // Specialiation of VirtualMatrix
00623     virtual int
00624     GetDiagonalImpl(Real64* buffer, int bufferSize)
00625     {
00626         return GetDiagonal(buffer, bufferSize);
00627     }
00628 
00629     int
00630     GetData(double* buffer, int bufferlength, Util::PropertySet& cmd,
00631             const std::vector<int>& parts)
00632     {
00633         int received=0;
00634         for (int i=0 ; i<parts.size(); i++)
00635         {
00636             if (Link::Mpi::MpiUsed())
00637             {
00638                 int owner = GetOwnerOfPart(parts[i]);
00639                 cmd.Add("part", parts[i]);
00640                 Link::Mpi::SendString(cmd.GetDescription(), owner);
00641                 double* b = buffer + received;
00642                 int l = Link::Mpi::ReceiveData(b, bufferlength-received, owner);
00643                 received += l;
00644             }
00645             else
00646             {
00647                 Vector64 v = GetDataFromPart(cmd.GetString("cmd"), parts[i],
00648                                              cmd.GetInt("arg"));
00649                 double* b = buffer + received;
00650                 if (v.Size() > bufferlength-received)
00651                     ILOG_ERROR("[GetData] buffer too small");
00652                 for (int j=0 ; j<v.Size() ; j++)
00653                     b[j] = v.Elem(j);
00654                 received += v.Size();
00655             }
00656         }
00657         return received;
00658     }
00659 
00660     Vector64
00661     GetDataFromPart(CString cmd, int part, int index)
00662     {
00663         Vector64 v;
00664         if (cmd == "diagonal")
00665             v = GetDiagonalFromPart(part);
00666         if (cmd == "column")
00667             v = GetColumnFromPart(part, index);
00668         if (cmd == "row")
00669             v = GetRowFromPart(part, index);
00670         return v;
00671     }
00672 
00673     Vector64
00674     GetColumnFromPart(int part, int index)
00675     {
00676         Vector64 v;
00677         int colPart = mColumns->IndexToPart(index);
00678         index -= mColumns->PartToIndex(colPart);
00679         ILOG_ERROR("[GetColumnFromPart] Not supported");
00680         return v;
00681     }
00682 
00683     Vector64
00684     GetRowFromPart(int part, int index)
00685     {
00686         Vector64 v;
00687         int rowPart = mRows->IndexToPart(index);
00688         index -= mRows->PartToIndex(rowPart);
00689         if (mParts.size() != 0)
00690             v = GetPart(part)->GetRow(index);
00691         return v;
00692     }
00693 
00694     Vector64
00695     GetDiagonalFromPart(int part)
00696     {
00697         Vector64 v;
00698         if (mParts.size() != 0)
00699             v = GetPart(part)->GetDiagonal();
00700         return v;
00701     }
00702 
00703     IndexConverter*             mRows;
00704     IndexConverter*             mColumns;
00705     int                         mStartNode;
00706     int                         mNodeCount;
00707     Table::QuidTable*           mRowQuids;
00708     Table::QuidTable*           mColumnQuids;
00709     std::vector<VirtualMatrix*> mParts;
00710     std::vector<String>         mFeatures;
00711     std::vector<double>         mWeights;
00712     std::vector<double>         mAverages;
00713     bool                        mHasOwnAverages;
00714 
00715     ILOG_VAR_DEC;
00716 };
00717 
00718 ILOG_VAR_INIT(DistributedAccess, Impala.Core.Matrix);
00719 
00720 
00721 } // namespace Matrix
00722 } // namespace Core
00723 } // namespace Impala
00724 
00725 #endif

Generated on Thu Jan 13 09:04:32 2011 for ImpalaSrc by  doxygen 1.5.1