Definition at line 203 of file DistributedAccess.h. References Impala::Core::Array::Array2dTem< StorT, elemSize, ArithT >::CPB(), Impala::Core::Matrix::GetColumn(), Impala::Core::Vector::VectorTem< ElemT >::GetData(), Impala::Core::Matrix::GetDiagonal(), Impala::Util::PropertySet::GetInt(), GetPart(), Impala::Util::PropertySet::GetString(), Impala::Core::Array::Array2dTem< StorT, elemSize, ArithT >::H(), ILOG_ERROR, Impala::Core::Matrix::DistributedAccess::IndexConverter::IndexToPart(), mColumns, mRows, Impala::Core::Matrix::DistributedAccess::IndexConverter::PartToIndex(), Impala::Core::Vector::VectorTem< ElemT >::Size(), and Impala::Core::Array::Array2dTem< StorT, elemSize, ArithT >::W(). Referenced by Impala::Core::DataFactory::ServeDistributedAccess(). 00204 { 00205 int subscribers = 0; 00206 do 00207 { 00208 int source; 00209 String messageString = Link::Mpi::ReceiveString(source); 00210 Util::PropertySet message(messageString); 00211 String cmd = message.GetString("cmd"); 00212 if(cmd == "subscribe") 00213 { 00214 ++subscribers; 00215 ILOG_DEBUG_NODE("subscription added: id=" << source << " new #=" << subscribers); 00216 continue; 00217 } 00218 if(cmd == "unsubscribe") 00219 { 00220 --subscribers; 00221 ILOG_DEBUG_NODE("subscription removed: #=" << subscribers); 00222 continue; 00223 } 00224 if(cmd != "diagonal" && cmd != "row" && cmd != "column") 00225 ILOG_ERROR("unknown command: " << cmd); 00226 int part = message.GetInt("part"); 00227 ILOG_DEBUG_NODE("received cmd: " << messageString); 00228 Vector::VectorTem<double> v; 00229 Matrix::Mat* m = GetPart(part); 00230 if(cmd == "diagonal") 00231 v = Matrix::GetDiagonal(m); 00232 if(cmd == "column") 00233 { 00234 int column = message.GetInt("arg"); 00235 int columnpart = mColumns->IndexToPart(column); 00236 column -= mColumns->PartToIndex(columnpart); 00237 if(column < 0 || column >= m->H()) 00238 ILOG_ERROR_NODE("faulty column index: " << column << 00239 ". column = " << message.GetInt("arg") << "part = " << part); 00240 v = Vector::VectorTem<double>(m->W(), m->CPB(0,column), true); 00241 } 00242 if(cmd == "row") 00243 { 00244 int row = message.GetInt("arg"); 00245 int rowpart = mRows->IndexToPart(row); 00246 row -= mRows->PartToIndex(rowpart); 00247 v = Matrix::GetColumn(m, row); // the parts are mirrored, remember? 00248 } 00249 //ILOG_DEBUG_NODE("sending answer"); 00250 Link::Mpi::SendData(v.GetData(), v.Size(), source); 00251 //ILOG_DEBUG_NODE("data sent"); 00252 } while(subscribers>0); 00253 //ILOG_INFO("EventLoop returned"); 00254 }
Here is the call graph for this function:
|