Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

IOBufferChannel.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_IOBufferChannel_h
00002 #define Impala_Util_IOBufferChannel_h
00003 
00004 #include "Util/IOBuffer.h"
00005 #include "Basis/CmdOptions.h"
00006 #include "Util/ChannelPool.h"
00007 #include "Basis/NativeTypeFormats.h"
00008 
00009 namespace Impala
00010 {
00011 namespace Util
00012 {
00013 
00014 
00020 class IOBufferChannel : public IOBuffer
00021 {
00022 public:
00023 
00024     IOBufferChannel(CString fileName, bool readMode, Channel* channel = 0)
00025     {
00026         mReadMode = readMode;
00027         mId = -1;
00028         if (channel == 0)
00029         {
00030             CmdOptions& options = CmdOptions::GetInstance();
00031             String dataServer = options.GetString("dataServer");
00032             String passwordFile = options.GetString("passwordFile");
00033             channel = ChannelPool::Instance().Get(dataServer, passwordFile);
00034         }
00035         mMyChannel = channel;
00036         char* buf = mMyChannel->Buffer();
00037         sprintf(buf, "openfilebuffer \"%s\" %d\0", fileName.c_str(), readMode);
00038         mMyChannel->SendRequest(strlen(buf)+1);
00039         if (mMyChannel->LastSendHadError())
00040         {
00041             ILOG_ERROR("Failed to open " << fileName);
00042             return;
00043         }
00044         Int64 bufSize;
00045         sscanf(buf, "%d %lld", &mId, &bufSize);
00046         if (mReadMode)
00047         {
00048             SetSize(bufSize);
00049             ILOG_DEBUG(fileName << ", id = " << mId << ", size = " << bufSize);
00050         }
00051         else
00052         {
00053             ILOG_DEBUG(fileName << ", id = " << mId);
00054         }
00055     }
00056 
00057     virtual
00058     ~IOBufferChannel()
00059     {
00060         if (mId != -1)
00061         {
00062             char* buf = mMyChannel->Buffer();
00063             sprintf(buf, "closefilebuffer %d\0", mId);
00064             mMyChannel->SendRequest(strlen(buf)+1);
00065             mId = -1;
00066         }
00067     }
00068 
00069     virtual bool
00070     Valid()
00071     {
00072         return (mId != -1);
00073     }
00074 
00075     virtual Int64
00076     Read(void* buf, Int64 bytesToRead)
00077     {
00078         char* chBuf = mMyChannel->Buffer();
00079         Int64 nrRead = 0;
00080         char* bufPtr = (char*) buf;
00081         while (nrRead < bytesToRead)
00082         {
00083             int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE, bytesToRead);
00084             sprintf(chBuf, "readfilebuffer %d %d\0", mId, nrData);
00085             int len = mMyChannel->SendRequest(strlen(chBuf)+1);
00086             if (mMyChannel->LastSendHadError())
00087                 return 0;
00088             memcpy(bufPtr + nrRead, chBuf, len);
00089             if (len == 0)
00090                 break;
00091             nrRead += len;
00092         }
00093         SetPosition(GetPosition() + nrRead);
00094         return nrRead;
00095     }
00096 
00097     virtual String
00098     ReadLine()
00099     {
00100         // todo : this is a very inefficient implementation
00101         Int64 available = Available();
00102         Int64 maxLineSize = 16384; // 16 Kb, todo : give user control
00103         if (available < maxLineSize)
00104             maxLineSize = available + 1; // extra space for '\0'
00105         char* buf = new char[maxLineSize];
00106         PositionType start = GetPosition();
00107         Int64 nrRead = Read(buf, maxLineSize); // does SetPosition
00108         Int64 i = 0;
00109         while ((i < maxLineSize) && (buf[i] != '\n'))
00110             i++;
00111         Int64 skipN = (buf[i] == '\n') ? 1 : 0;
00112         SetPosition(start + i + skipN);
00113         if (buf[i-1] == '\r') // strip '\r' if present
00114             i--;
00115         buf[i] = '\0';
00116         String res(buf);
00117         delete buf;
00118         return res;
00119     }
00120 
00121     virtual Int64
00122     Gets(char* buf, Int64 bytesToRead)
00123     {
00124         Int64 available = Available();
00125         if (available < bytesToRead)
00126             bytesToRead = available;
00127         PositionType start = GetPosition();
00128         Int64 nrRead = Read(buf, bytesToRead); // does SetPosition
00129         Int64 i = 0;
00130         while ((i < bytesToRead) && (buf[i] != '\n'))
00131             i++;
00132         bytesToRead = i;
00133         if (buf[i] == '\n')
00134             i++;
00135         SetPosition(start + i);
00136         return bytesToRead;
00137     }
00138 
00139     virtual void
00140     NativeTypeRead(Int8* ptr)
00141     {
00142         DoNativeTypeRead(ptr);
00143     }
00144 
00145     virtual void
00146     NativeTypeRead(UInt8* ptr)
00147     {
00148         DoNativeTypeRead(ptr);
00149     }
00150 
00151     virtual void
00152     NativeTypeRead(Int16* ptr)
00153     {
00154         DoNativeTypeRead(ptr);
00155     }
00156 
00157     virtual void
00158     NativeTypeRead(UInt16* ptr)
00159     {
00160         DoNativeTypeRead(ptr);
00161     }
00162 
00163     virtual void
00164     NativeTypeRead(Int32* ptr)
00165     {
00166         DoNativeTypeRead(ptr);
00167     }
00168 
00169     virtual void
00170     NativeTypeRead(UInt32* ptr)
00171     {
00172         DoNativeTypeRead(ptr);
00173     }
00174 
00175     virtual void
00176     NativeTypeRead(Real32* ptr)
00177     {
00178         DoNativeTypeRead(ptr);
00179     }
00180 
00181     virtual void
00182     NativeTypeRead(Real64* ptr)
00183     {
00184         DoNativeTypeRead(ptr);
00185     }
00186 
00187     virtual void
00188     Write(const void* buf, Int64 bytesToWrite)
00189     {
00190         char* chBuf = mMyChannel->Buffer();
00191         Int64 nrWritten = 0;
00192         char* bufPtr = (char*) buf;
00193         while (nrWritten < bytesToWrite)
00194         {
00195             sprintf(chBuf, "writefilebuffer %d\0", mId);
00196             int used = strlen(chBuf) + 1;
00197             int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE - used,
00198                                     bytesToWrite - nrWritten);
00199             memcpy(chBuf + used, bufPtr + nrWritten, nrData);
00200             int len = mMyChannel->SendRequest(used + nrData);
00201             if (mMyChannel->LastSendHadError())
00202                 break;
00203             nrWritten += nrData;
00204         }
00205         SetPosition(GetPosition() + bytesToWrite);
00206     }
00207 
00208     virtual Int64
00209     Puts(const char* buf)
00210     {
00211         Int64 len = strlen(buf) + 1;
00212         char* writeBuf = new char[len];
00213         memcpy(writeBuf, buf, len);
00214         writeBuf[len-1] = '\n';
00215         Write(writeBuf, len);
00216         delete writeBuf;
00217         return 1;
00218     }
00219 
00220     virtual void
00221     NativeTypeWrite(Int8 val)
00222     {
00223         DoNativeTypeWrite(val);
00224     }
00225 
00226     virtual void
00227     NativeTypeWrite(UInt8 val)
00228     {
00229         DoNativeTypeWrite(val);
00230     }
00231 
00232     virtual void
00233     NativeTypeWrite(Int16 val)
00234     {
00235         DoNativeTypeWrite(val);
00236     }
00237 
00238     virtual void
00239     NativeTypeWrite(UInt16 val)
00240     {
00241         DoNativeTypeWrite(val);
00242     }
00243 
00244     virtual void
00245     NativeTypeWrite(Int32 val)
00246     {
00247         DoNativeTypeWrite(val);
00248     }
00249 
00250     virtual void
00251     NativeTypeWrite(UInt32 val)
00252     {
00253         DoNativeTypeWrite(val);
00254     }
00255 
00256     virtual void
00257     NativeTypeWrite(Real32 val)
00258     {
00259         DoNativeTypeWrite(val);
00260     }
00261 
00262     virtual void
00263     NativeTypeWrite(Real64 val)
00264     {
00265         DoNativeTypeWrite(val);
00266     }
00267 
00268     virtual void
00269     SetPosition(PositionType position)
00270     {
00271         if (GetPosition() == position)
00272             return;
00273 
00274         char* buf = mMyChannel->Buffer();
00275         sprintf(buf, "seekfilebuffer %d %lld %d\0", mId, position, SEEK_SET);
00276         mMyChannel->SendRequest(strlen(buf)+1);
00277         if (mMyChannel->LastSendHadError())
00278         {
00279             ILOG_ERROR("Failed to seek");
00280             return;
00281         }
00282         PositionType newPos;
00283         sscanf(buf, "%lld", &newPos);
00284         IOBuffer::SetPosition(newPos);
00285     }
00286 
00287 private:
00288 
00289     template <class NativeType>
00290     void
00291     DoNativeTypeRead(NativeType* ptr)
00292     {
00293         String fs = NativeTypeFormat<NativeType>(0);
00294         char buf[1024];
00295         PositionType start = GetPosition();
00296         Int64 nrRead = Read(buf, 1024); // does SetPosition
00297         Int64 i = 0;
00298         while ((buf[i] != ' ') && (i < nrRead))
00299             i++;
00300         if(nrRead == 0)
00301             ILOG_ERROR("DoNativeTypeRead read 0 bytes")
00302         sscanf(buf, fs.c_str(), ptr);
00303         SetPosition(start + i + 1);
00304     }
00305 
00306     template <class NativeType>
00307     void
00308     DoNativeTypeWrite(NativeType val)
00309     {
00310         String fs = NativeTypeFormat<NativeType>(0) + " ";
00311         char buf[1024];
00312         sprintf(buf, fs.c_str(), val);
00313         Int64 i = 0;
00314         while (buf[i] != ' ')
00315             i++;
00316         Write(buf, i+1);
00317     }
00318 
00319     bool     mReadMode;
00320     int      mId;
00321     Channel* mMyChannel; // watch out for IOBuffer::mDataChannel conflicts
00322 
00323     ILOG_VAR_DEC;
00324 };
00325 
00326 ILOG_VAR_INIT(IOBufferChannel, Impala.Util);
00327 
00328 } // namespace Util
00329 } // namespace Impala
00330 
00331 #endif

Generated on Fri Mar 19 09:31:47 2010 for ImpalaSrc by  doxygen 1.5.1