Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

VirtualMatrixIOBufferReader.h

Go to the documentation of this file.
00001 #ifndef Impala_Core_Matrix_VirtualMatrixIOBufferReader_h
00002 #define Impala_Core_Matrix_VirtualMatrixIOBufferReader_h
00003 
00004 #include "Core/Matrix/VirtualMatrix.h"
00005 #include "Util/IOBuffer.h"
00006 #include "Core/Array/ReadRaw.h"
00007 
00008 namespace Impala
00009 {
00010 namespace Core
00011 {
00012 namespace Matrix
00013 {
00014 
00015 
00016 template<class ArrayT>
00017 class VirtualMatrixIOBufferReader : public VirtualMatrix
00018 {
00019 public:
00020 
00021     typedef typename ArrayT::StorType StorT;
00022 
00023     VirtualMatrixIOBufferReader(Util::IOBuffer* buffer, bool ownBuffer)
00024     {
00025         Init(buffer, ownBuffer);
00026     }
00027 
00028     virtual
00029     ~VirtualMatrixIOBufferReader()
00030     {
00031         delete mChunkBuffer;
00032         mChunkBuffer = 0;
00033         delete mChunkBufferWrapper;
00034         mChunkBufferWrapper = 0;
00035         delete mChunkOffsets;
00036         mChunkOffsets = 0;
00037         if (mOwnBuffer)
00038         {
00039             delete mIOBuffer;
00040             mIOBuffer = 0;
00041         }
00042     }
00043 
00044     virtual int
00045     NrRow()
00046     {
00047         return mHeight;
00048     }
00049 
00050     virtual int
00051     NrCol()
00052     {
00053         return mWidth;
00054     }
00055 
00056 private:
00057 
00058     virtual bool
00059     HasGetRowImplReal64()
00060     {
00061         // Actually, we can do both but we have a preference
00062         return (sizeof(StorT) == 8);
00063     }
00064 
00065     virtual int
00066     GetRowImpl(int rowNr, Real64* buffer, int bufferSize)
00067     {
00068         return ReadRow(rowNr, buffer, bufferSize);
00069     }
00070 
00071     virtual bool
00072     HasGetRowImplReal32()
00073     {
00074         // Actually, we can do both but we have a preference
00075         return (sizeof(StorT) == 4);
00076     }
00077 
00078     virtual int
00079     GetRowImpl(int rowNr, Real32* buffer, int bufferSize)
00080     {
00081         return ReadRow(rowNr, buffer, bufferSize);
00082     }
00083 
00084     virtual int
00085     GetDiagonalImpl(Real64* buffer, int bufferSize)
00086     {
00087         for (int i=0 ; i<NrRow() ; i++)
00088         {
00089             StorT* chunkBufferLine = SwitchToRow(i);
00090             buffer[i] = static_cast<Real64>(chunkBufferLine[i]);
00091         }
00092         return NrRow();
00093     }
00094 
00095     template<class OutputT>
00096     int
00097     ReadRow(int rowNr, OutputT* buffer, int bufferSize)
00098     {
00099         StorT* chunkBufferLine = SwitchToRow(rowNr);
00100         for (int i=0 ; i<mWidth ; i++)
00101         {
00102             buffer[i] = static_cast<OutputT>(chunkBufferLine[i]);
00103         }
00104         return mWidth;
00105     }
00106 
00107     StorT*
00108     SwitchToRow(int rowNr)
00109     {
00110         int chunkNo = rowNr / mLinesPerChunk;
00111         SwitchToChunk(chunkNo);
00112         int chunkBufferLineNr = rowNr - mCurrentChunk * mLinesPerChunk;
00113         return mChunkBuffer + (chunkBufferLineNr * mWidth);
00114     }
00115 
00116     void
00117     SwitchToChunk(Int64 chunkNo)
00118     {
00119         if (mCurrentChunk == chunkNo)
00120             return;
00121         if (chunkNo >= mChunkCount)
00122         {
00123             ILOG_ERROR("Invalid chunkNo " << chunkNo << " " << mChunkCount);
00124             return;
00125         }
00126         Int64 nrBytes = mChunkSize;
00127         if (chunkNo == mChunkCount - 1)
00128         {
00129             // last chunk might be smaller
00130             Int64 linesLeft = NrRow() - mLinesPerChunk * chunkNo;
00131             nrBytes = linesLeft * sizeof(StorT) * mWidth;
00132         }
00133         if (mLinesPerCompressedBlock)
00134         {
00135             if (mChunkOffsets[chunkNo] == -1)
00136             {
00137                 for (Int64 i=1 ; i<chunkNo ; i++)
00138                 {
00139                     if (mChunkOffsets[i] == -1)
00140                     {
00141                         SwitchToChunk(i-1);
00142                     }
00143                 }
00144             }
00145             mIOBuffer->SetPosition(mChunkOffsets[chunkNo]);
00146             Int64 nrRead = 0;
00147             if (mCompressor == "zlib")
00148                 nrRead = mIOBuffer->ReadZlib(mChunkBuffer, nrBytes);
00149             if (mCompressor == "bzip")
00150                 nrRead = mIOBuffer->ReadBzip(mChunkBuffer, nrBytes);
00151             if (nrRead != nrBytes)
00152             {
00153                 ILOG_ERROR("readCompressed " << nrRead << " bytes instead of "
00154                             << nrBytes);
00155             }
00156             mChunkOffsets[chunkNo + 1] = mIOBuffer->GetPosition();
00157         }
00158         else
00159         {
00160             mIOBuffer->SetPosition(mChunkOffsets[chunkNo]);
00161             Int64 nrRead = mIOBuffer->Read(mChunkBuffer, nrBytes);
00162             if (nrRead != nrBytes)
00163             {
00164                 ILOG_ERROR("read " << nrRead << " bytes instead of " << nrBytes);
00165             }
00166         }
00167         Array::Endian(mChunkBufferWrapper, mChunkBufferWrapper);
00168         mCurrentChunk = chunkNo;
00169     }
00170     
00171     void
00172     Init(Util::IOBuffer* buffer, bool ownBuffer)
00173     {
00174         mIOBuffer = buffer;
00175         mOwnBuffer = ownBuffer;
00176         mBaseOffset = buffer->GetPosition();
00177         mChunkOffsets = 0;
00178         mChunkBuffer = 0;
00179         if (! Array::ReadRawHeader<ArrayT>
00180                 (mIOBuffer, &mVersion, &mBinary, &mElemSize, &mWidth, &mHeight,
00181                  &mNrA, mCompressor, &mLinesPerCompressedBlock))
00182         {
00183             ILOG_ERROR("Could not parse header");
00184             return;
00185         }
00186         if (!mBinary)
00187         {
00188             ILOG_ERROR("Only binary files are supported");
00189         }
00190         if (mElemSize != 1)
00191         {
00192             ILOG_ERROR("IOBuffer does not contain a Matrix");
00193         }
00194 
00195         Int64 lineSize = sizeof(StorT) * mWidth;
00196         buffer->SetPosition(mBaseOffset + 200);
00197         if (mLinesPerCompressedBlock > 0)
00198         {
00199             mLinesPerChunk = mLinesPerCompressedBlock;
00200         }
00201         else
00202         {
00203             // uncompressed file: still use the same chunk size
00204             mLinesPerChunk = Util::IOBuffer::cBzipBlockSize / lineSize;
00205         }
00206         mChunkCount = mHeight / mLinesPerChunk;
00207         if (mHeight % mLinesPerChunk != 0)
00208             mChunkCount++;
00209         mChunkSize = lineSize * mLinesPerChunk;
00210         mChunkOffsets = new Int64[mChunkCount+1];
00211         mChunkOffsets[0] = mBaseOffset + 200;
00212         for (Int64 i=1 ; i<mChunkCount+1 ; i++)
00213         {
00214             if (mLinesPerCompressedBlock == 0)
00215             {
00216                 mChunkOffsets[i] = mChunkOffsets[i-1] + mChunkSize;
00217             }
00218             else
00219             {
00220                 mChunkOffsets[i] = -1;
00221             }
00222         }
00223 
00224         mChunkBuffer = new StorT[mWidth * mLinesPerChunk];
00225         mChunkBufferWrapper = new ArrayT(mWidth * mLinesPerChunk, 1, 0, 0,
00226                                          mChunkBuffer, true);
00227         mCurrentChunk = -1;
00228         SwitchToChunk(0);
00229     }
00230 
00231     Util::IOBuffer* mIOBuffer;
00232     bool   mOwnBuffer;
00233     Int64  mBaseOffset;
00234     Int64* mChunkOffsets;
00235     Int64  mChunkSize;  // of the uncompressed data within one chunk
00236     Int64  mChunkCount;
00237     Int64  mLinesPerChunk;
00238 
00239     // current cache
00240     Int64   mCurrentChunk;
00241     StorT*  mChunkBuffer;
00242     ArrayT* mChunkBufferWrapper;
00243 
00244     Int64  mVersion;
00245     Int64  mBinary;
00246     Int64  mElemSize;
00247     Int64  mWidth;
00248     Int64  mHeight;
00249     Int64  mNrA;
00250     String mCompressor;
00251     Int64  mLinesPerCompressedBlock;
00252 
00253     ILOG_VAR_DEC;
00254 };
00255 
00256 ILOG_VAR_INIT_TEMPL_1(VirtualMatrixIOBufferReader, ArrayT, Impala.Core.Matrix);
00257 
00258 } // namespace Matrix
00259 } // namespace Core
00260 } // namespace Impala
00261 
00262 #endif

Generated on Thu Jan 13 09:04:34 2011 for ImpalaSrc by  doxygen 1.5.1