Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

IOBufferChannel.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_IOBufferChannel_h
00002 #define Impala_Util_IOBufferChannel_h
00003 
00004 #include "Util/IOBuffer.h"
00005 #include "Basis/CmdOptions.h"
00006 #include "Util/ChannelPool.h"
00007 #include "Basis/NativeTypeFormats.h"
00008 
00009 namespace Impala
00010 {
00011 namespace Util
00012 {
00013 
00014 
00020 class IOBufferChannel : public IOBuffer
00021 {
00022 public:
00023 
00024     IOBufferChannel(CString fileName, bool readMode, Channel* channel = 0)
00025     {
00026         mReadMode = readMode;
00027         mId = -1;
00028         if (channel == 0)
00029         {
00030             CmdOptions& options = CmdOptions::GetInstance();
00031             String dataServer = options.GetString("dataServer");
00032             String passwordFile = options.GetString("passwordFile");
00033             channel = ChannelPool::Instance().Get(dataServer, passwordFile);
00034         }
00035         mMyChannel = channel;
00036         char* buf = mMyChannel->Buffer();
00037         sprintf(buf, "openfilebuffer \"%s\" %d\0", fileName.c_str(), readMode);
00038         mMyChannel->SendRequest(strlen(buf)+1);
00039         if (mMyChannel->LastSendHadError())
00040         {
00041             ILOG_ERROR("Failed to open " << fileName);
00042             return;
00043         }
00044         Int64 bufSize;
00045         sscanf(buf, "%d %lld", &mId, &bufSize);
00046         if (mReadMode)
00047         {
00048             SetSize(bufSize);
00049             ILOG_DEBUG(fileName << ", id = " << mId << ", size = " << bufSize);
00050         }
00051         else
00052         {
00053             ILOG_DEBUG(fileName << ", id = " << mId);
00054         }
00055     }
00056 
00057     virtual
00058     ~IOBufferChannel()
00059     {
00060         if (mId != -1)
00061         {
00062             char* buf = mMyChannel->Buffer();
00063             sprintf(buf, "closefilebuffer %d\0", mId);
00064             mMyChannel->SendRequest(strlen(buf)+1);
00065             mId = -1;
00066         }
00067     }
00068 
00069     virtual bool
00070     Valid()
00071     {
00072         return (mId != -1);
00073     }
00074 
00075     virtual Int64
00076     Read(void* buf, Int64 bytesToRead)
00077     {
00078         char* chBuf = mMyChannel->Buffer();
00079         Int64 nrRead = 0;
00080         char* bufPtr = (char*) buf;
00081         while (nrRead < bytesToRead)
00082         {
00083             int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE, 
00084                                     bytesToRead - nrRead);
00085             sprintf(chBuf, "readfilebuffer %d %d\0", mId, nrData);
00086             int len = mMyChannel->SendRequest(strlen(chBuf)+1);
00087             if (mMyChannel->LastSendHadError())
00088                 return 0;
00089             memcpy(bufPtr + nrRead, chBuf, len);
00090             if (len == 0)
00091                 break;
00092             nrRead += len;
00093         }
00094         SetPosition(GetPosition() + nrRead);
00095         return nrRead;
00096     }
00097 
00098     virtual String
00099     ReadLine()
00100     {
00101         // todo : this is a very inefficient implementation
00102         Int64 available = Available();
00103         Int64 maxLineSize = 16384; // 16 Kb, todo : give user control
00104         if (available < maxLineSize)
00105             maxLineSize = available + 1; // extra space for '\0'
00106         char* buf = new char[maxLineSize];
00107         PositionType start = GetPosition();
00108         Int64 nrRead = Read(buf, maxLineSize); // does SetPosition
00109         Int64 i = 0;
00110         while ((i < maxLineSize) && (buf[i] != '\n'))
00111             i++;
00112         Int64 skipN = (buf[i] == '\n') ? 1 : 0;
00113         SetPosition(start + i + skipN);
00114         if ((i > 0) && (buf[i-1] == '\r')) // strip '\r' if present
00115             i--;
00116         buf[i] = '\0';
00117         String res(buf);
00118         delete buf;
00119         return res;
00120     }
00121 
00122     virtual Int64
00123     Gets(char* buf, Int64 bytesToRead)
00124     {
00125         Int64 available = Available();
00126         if (available < bytesToRead)
00127             bytesToRead = available;
00128         PositionType start = GetPosition();
00129         Int64 nrRead = Read(buf, bytesToRead); // does SetPosition
00130         Int64 i = 0;
00131         while ((i < bytesToRead) && (buf[i] != '\n'))
00132             i++;
00133         bytesToRead = i;
00134         if (buf[i] == '\n')
00135             i++;
00136         SetPosition(start + i);
00137         return bytesToRead;
00138     }
00139 
00140     virtual void
00141     NativeTypeRead(Int8* ptr)
00142     {
00143         DoNativeTypeRead(ptr);
00144     }
00145 
00146     virtual void
00147     NativeTypeRead(UInt8* ptr)
00148     {
00149         DoNativeTypeRead(ptr);
00150     }
00151 
00152     virtual void
00153     NativeTypeRead(Int16* ptr)
00154     {
00155         DoNativeTypeRead(ptr);
00156     }
00157 
00158     virtual void
00159     NativeTypeRead(UInt16* ptr)
00160     {
00161         DoNativeTypeRead(ptr);
00162     }
00163 
00164     virtual void
00165     NativeTypeRead(Int32* ptr)
00166     {
00167         DoNativeTypeRead(ptr);
00168     }
00169 
00170     virtual void
00171     NativeTypeRead(UInt32* ptr)
00172     {
00173         DoNativeTypeRead(ptr);
00174     }
00175 
00176     virtual void
00177     NativeTypeRead(Int64* ptr)
00178     {
00179         DoNativeTypeRead(ptr);
00180     }
00181 
00182     virtual void
00183     NativeTypeRead(UInt64* ptr)
00184     {
00185         DoNativeTypeRead(ptr);
00186     }
00187 
00188     virtual void
00189     NativeTypeRead(Real32* ptr)
00190     {
00191         DoNativeTypeRead(ptr);
00192     }
00193 
00194     virtual void
00195     NativeTypeRead(Real64* ptr)
00196     {
00197         DoNativeTypeRead(ptr);
00198     }
00199 
00200     virtual Int64
00201     Write(const void* buf, Int64 bytesToWrite)
00202     {
00203         char* chBuf = mMyChannel->Buffer();
00204         Int64 nrWritten = 0;
00205         char* bufPtr = (char*) buf;
00206         while (nrWritten < bytesToWrite)
00207         {
00208             sprintf(chBuf, "writefilebuffer %d\0", mId);
00209             int used = strlen(chBuf) + 1;
00210             int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE - used,
00211                                     bytesToWrite - nrWritten);
00212             memcpy(chBuf + used, bufPtr + nrWritten, nrData);
00213             int len = mMyChannel->SendRequest(used + nrData);
00214             if (mMyChannel->LastSendHadError())
00215                 break;
00216             nrWritten += nrData;
00217         }
00218         SetPosition(GetPosition() + bytesToWrite);
00219         return nrWritten;
00220     }
00221 
00222     virtual Int64
00223     Puts(const char* buf)
00224     {
00225         Int64 len = strlen(buf) + 1;
00226         char* writeBuf = new char[len];
00227         memcpy(writeBuf, buf, len);
00228         writeBuf[len-1] = '\n';
00229         Write(writeBuf, len);
00230         delete writeBuf;
00231         return 1;
00232     }
00233 
00234     virtual void
00235     NativeTypeWrite(Int8 val)
00236     {
00237         DoNativeTypeWrite(val);
00238     }
00239 
00240     virtual void
00241     NativeTypeWrite(UInt8 val)
00242     {
00243         DoNativeTypeWrite(val);
00244     }
00245 
00246     virtual void
00247     NativeTypeWrite(Int16 val)
00248     {
00249         DoNativeTypeWrite(val);
00250     }
00251 
00252     virtual void
00253     NativeTypeWrite(UInt16 val)
00254     {
00255         DoNativeTypeWrite(val);
00256     }
00257 
00258     virtual void
00259     NativeTypeWrite(Int32 val)
00260     {
00261         DoNativeTypeWrite(val);
00262     }
00263 
00264     virtual void
00265     NativeTypeWrite(UInt32 val)
00266     {
00267         DoNativeTypeWrite(val);
00268     }
00269 
00270     virtual void
00271     NativeTypeWrite(Int64 val)
00272     {
00273         DoNativeTypeWrite(val);
00274     }
00275 
00276     virtual void
00277     NativeTypeWrite(UInt64 val)
00278     {
00279         DoNativeTypeWrite(val);
00280     }
00281 
00282     virtual void
00283     NativeTypeWrite(Real32 val)
00284     {
00285         DoNativeTypeWrite(val);
00286     }
00287 
00288     virtual void
00289     NativeTypeWrite(Real64 val)
00290     {
00291         DoNativeTypeWrite(val);
00292     }
00293 
00294     virtual void
00295     SetPosition(PositionType position)
00296     {
00297         if (GetPosition() == position)
00298             return;
00299 
00300         char* buf = mMyChannel->Buffer();
00301         sprintf(buf, "seekfilebuffer %d %lld %d\0", mId, position, SEEK_SET);
00302         mMyChannel->SendRequest(strlen(buf)+1);
00303         if (mMyChannel->LastSendHadError())
00304         {
00305             ILOG_ERROR("Failed to seek");
00306             return;
00307         }
00308         PositionType newPos;
00309         sscanf(buf, "%lld", &newPos);
00310         IOBuffer::SetPosition(newPos);
00311     }
00312 
00313 private:
00314 
00315     template <class NativeType>
00316     void
00317     DoNativeTypeRead(NativeType* ptr)
00318     {
00319         String fs = NativeTypeFormat<NativeType>(0);
00320         char buf[1024];
00321         PositionType start = GetPosition();
00322         Int64 nrRead = Read(buf, 1024); // does SetPosition
00323         Int64 i = 0;
00324         while ((buf[i] != ' ') && (i < nrRead))
00325             i++;
00326         if(nrRead == 0)
00327             ILOG_ERROR("DoNativeTypeRead read 0 bytes")
00328         sscanf(buf, fs.c_str(), ptr);
00329         SetPosition(start + i + 1);
00330     }
00331 
00332     template <class NativeType>
00333     void
00334     DoNativeTypeWrite(NativeType val)
00335     {
00336         String fs = NativeTypeFormat<NativeType>(0) + " ";
00337         char buf[1024];
00338         sprintf(buf, fs.c_str(), val);
00339         Int64 i = 0;
00340         while (buf[i] != ' ')
00341             i++;
00342         Write(buf, i+1);
00343     }
00344 
00345     bool     mReadMode;
00346     int      mId;
00347     Channel* mMyChannel; // watch out for IOBuffer::mDataChannel conflicts
00348 
00349     ILOG_VAR_DEC;
00350 };
00351 
00352 ILOG_VAR_INIT(IOBufferChannel, Impala.Util);
00353 
00354 } // namespace Util
00355 } // namespace Impala
00356 
00357 #endif

Generated on Thu Jan 13 09:05:15 2011 for ImpalaSrc by  doxygen 1.5.1