00001 #ifndef Impala_Core_Matrix_VirtualMatrixIOBufferReader_h
00002 #define Impala_Core_Matrix_VirtualMatrixIOBufferReader_h
00003
00004 #include "Core/Matrix/VirtualMatrix.h"
00005 #include "Util/IOBuffer.h"
00006 #include "Core/Array/ReadRaw.h"
00007
00008 namespace Impala
00009 {
00010 namespace Core
00011 {
00012 namespace Matrix
00013 {
00014
00015
00016 template<class ArrayT>
00017 class VirtualMatrixIOBufferReader : public VirtualMatrix
00018 {
00019 public:
00020
00021 typedef typename ArrayT::StorType StorT;
00022
00023 VirtualMatrixIOBufferReader(Util::IOBuffer* buffer, bool ownBuffer)
00024 {
00025 Init(buffer, ownBuffer);
00026 }
00027
00028 virtual
00029 ~VirtualMatrixIOBufferReader()
00030 {
00031 delete mChunkBuffer;
00032 mChunkBuffer = 0;
00033 delete mChunkBufferWrapper;
00034 mChunkBufferWrapper = 0;
00035 delete mChunkOffsets;
00036 mChunkOffsets = 0;
00037 if (mOwnBuffer)
00038 {
00039 delete mIOBuffer;
00040 mIOBuffer = 0;
00041 }
00042 }
00043
00044 virtual int
00045 NrRow()
00046 {
00047 return mHeight;
00048 }
00049
00050 virtual int
00051 NrCol()
00052 {
00053 return mWidth;
00054 }
00055
00056 private:
00057
00058 virtual bool
00059 HasGetRowImplReal64()
00060 {
00061
00062 return (sizeof(StorT) == 8);
00063 }
00064
00065 virtual int
00066 GetRowImpl(int rowNr, Real64* buffer, int bufferSize)
00067 {
00068 return ReadRow(rowNr, buffer, bufferSize);
00069 }
00070
00071 virtual bool
00072 HasGetRowImplReal32()
00073 {
00074
00075 return (sizeof(StorT) == 4);
00076 }
00077
00078 virtual int
00079 GetRowImpl(int rowNr, Real32* buffer, int bufferSize)
00080 {
00081 return ReadRow(rowNr, buffer, bufferSize);
00082 }
00083
00084 virtual int
00085 GetDiagonalImpl(Real64* buffer, int bufferSize)
00086 {
00087 for (int i=0 ; i<NrRow() ; i++)
00088 {
00089 StorT* chunkBufferLine = SwitchToRow(i);
00090 buffer[i] = static_cast<Real64>(chunkBufferLine[i]);
00091 }
00092 return NrRow();
00093 }
00094
00095 template<class OutputT>
00096 int
00097 ReadRow(int rowNr, OutputT* buffer, int bufferSize)
00098 {
00099 StorT* chunkBufferLine = SwitchToRow(rowNr);
00100 for (int i=0 ; i<mWidth ; i++)
00101 {
00102 buffer[i] = static_cast<OutputT>(chunkBufferLine[i]);
00103 }
00104 return mWidth;
00105 }
00106
00107 StorT*
00108 SwitchToRow(int rowNr)
00109 {
00110 int chunkNo = rowNr / mLinesPerChunk;
00111 SwitchToChunk(chunkNo);
00112 int chunkBufferLineNr = rowNr - mCurrentChunk * mLinesPerChunk;
00113 return mChunkBuffer + (chunkBufferLineNr * mWidth);
00114 }
00115
00116 void
00117 SwitchToChunk(Int64 chunkNo)
00118 {
00119 if (mCurrentChunk == chunkNo)
00120 return;
00121 if (chunkNo >= mChunkCount)
00122 {
00123 ILOG_ERROR("Invalid chunkNo " << chunkNo << " " << mChunkCount);
00124 return;
00125 }
00126 Int64 nrBytes = mChunkSize;
00127 if (chunkNo == mChunkCount - 1)
00128 {
00129
00130 Int64 linesLeft = NrRow() - mLinesPerChunk * chunkNo;
00131 nrBytes = linesLeft * sizeof(StorT) * mWidth;
00132 }
00133 if (mLinesPerCompressedBlock)
00134 {
00135 if (mChunkOffsets[chunkNo] == -1)
00136 {
00137 for (Int64 i=1 ; i<chunkNo ; i++)
00138 {
00139 if (mChunkOffsets[i] == -1)
00140 {
00141 SwitchToChunk(i-1);
00142 }
00143 }
00144 }
00145 mIOBuffer->SetPosition(mChunkOffsets[chunkNo]);
00146 Int64 nrRead = 0;
00147 if (mCompressor == "zlib")
00148 nrRead = mIOBuffer->ReadZlib(mChunkBuffer, nrBytes);
00149 if (mCompressor == "bzip")
00150 nrRead = mIOBuffer->ReadBzip(mChunkBuffer, nrBytes);
00151 if (nrRead != nrBytes)
00152 {
00153 ILOG_ERROR("readCompressed " << nrRead << " bytes instead of "
00154 << nrBytes);
00155 }
00156 mChunkOffsets[chunkNo + 1] = mIOBuffer->GetPosition();
00157 }
00158 else
00159 {
00160 mIOBuffer->SetPosition(mChunkOffsets[chunkNo]);
00161 Int64 nrRead = mIOBuffer->Read(mChunkBuffer, nrBytes);
00162 if (nrRead != nrBytes)
00163 {
00164 ILOG_ERROR("read " << nrRead << " bytes instead of " << nrBytes);
00165 }
00166 }
00167 Array::Endian(mChunkBufferWrapper, mChunkBufferWrapper);
00168 mCurrentChunk = chunkNo;
00169 }
00170
00171 void
00172 Init(Util::IOBuffer* buffer, bool ownBuffer)
00173 {
00174 mIOBuffer = buffer;
00175 mOwnBuffer = ownBuffer;
00176 mBaseOffset = buffer->GetPosition();
00177 mChunkOffsets = 0;
00178 mChunkBuffer = 0;
00179 if (! Array::ReadRawHeader<ArrayT>
00180 (mIOBuffer, &mVersion, &mBinary, &mElemSize, &mWidth, &mHeight,
00181 &mNrA, mCompressor, &mLinesPerCompressedBlock))
00182 {
00183 ILOG_ERROR("Could not parse header");
00184 return;
00185 }
00186 if (!mBinary)
00187 {
00188 ILOG_ERROR("Only binary files are supported");
00189 }
00190 if (mElemSize != 1)
00191 {
00192 ILOG_ERROR("IOBuffer does not contain a Matrix");
00193 }
00194
00195 Int64 lineSize = sizeof(StorT) * mWidth;
00196 buffer->SetPosition(mBaseOffset + 200);
00197 if (mLinesPerCompressedBlock > 0)
00198 {
00199 mLinesPerChunk = mLinesPerCompressedBlock;
00200 }
00201 else
00202 {
00203
00204 mLinesPerChunk = Util::IOBuffer::cBzipBlockSize / lineSize;
00205 }
00206 mChunkCount = mHeight / mLinesPerChunk;
00207 if (mHeight % mLinesPerChunk != 0)
00208 mChunkCount++;
00209 mChunkSize = lineSize * mLinesPerChunk;
00210 mChunkOffsets = new Int64[mChunkCount+1];
00211 mChunkOffsets[0] = mBaseOffset + 200;
00212 for (Int64 i=1 ; i<mChunkCount+1 ; i++)
00213 {
00214 if (mLinesPerCompressedBlock == 0)
00215 {
00216 mChunkOffsets[i] = mChunkOffsets[i-1] + mChunkSize;
00217 }
00218 else
00219 {
00220 mChunkOffsets[i] = -1;
00221 }
00222 }
00223
00224 mChunkBuffer = new StorT[mWidth * mLinesPerChunk];
00225 mChunkBufferWrapper = new ArrayT(mWidth * mLinesPerChunk, 1, 0, 0,
00226 mChunkBuffer, true);
00227 mCurrentChunk = -1;
00228 SwitchToChunk(0);
00229 }
00230
00231 Util::IOBuffer* mIOBuffer;
00232 bool mOwnBuffer;
00233 Int64 mBaseOffset;
00234 Int64* mChunkOffsets;
00235 Int64 mChunkSize;
00236 Int64 mChunkCount;
00237 Int64 mLinesPerChunk;
00238
00239
00240 Int64 mCurrentChunk;
00241 StorT* mChunkBuffer;
00242 ArrayT* mChunkBufferWrapper;
00243
00244 Int64 mVersion;
00245 Int64 mBinary;
00246 Int64 mElemSize;
00247 Int64 mWidth;
00248 Int64 mHeight;
00249 Int64 mNrA;
00250 String mCompressor;
00251 Int64 mLinesPerCompressedBlock;
00252
00253 ILOG_VAR_DEC;
00254 };
00255
00256 ILOG_VAR_INIT_TEMPL_1(VirtualMatrixIOBufferReader, ArrayT, Impala.Core.Matrix);
00257
00258 }
00259 }
00260 }
00261
00262 #endif