Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

IOBuffer.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_IOBuffer_h
00002 #define Impala_Util_IOBuffer_h
00003 
00004 #include <stdio.h>
00005 #include <string>
00006 #include <deque>
00007 #include <iostream>
00008 #include "Basis/ILog.h"
00009 #include "Util/Channel.h"
00010 #include "Basis/NativeTypeFormats.h"
00011 #include "Link/Mpi/MpiFuncs.h"
00012 
00013 namespace Impala
00014 {
00015 namespace Util
00016 {
00017 
00041 class IOBuffer
00042 {
00043 public:
00044     typedef Int64 PositionType;
00045 
00052     IOBuffer()
00053     {
00054         mBuffer = 0;
00055         mSize = 0;
00056         mPosition = 0;
00057         mDataChannel = 0;
00058     }
00059  
00061     IOBuffer(PositionType length)
00062     {
00063         mBuffer = new unsigned char[length];
00064         mSize = length;
00065         mPosition = 0;
00066         mDataChannel = 0;
00067     }
00068  
00072     IOBuffer(PositionType length, unsigned char* buffer)
00073     {
00074         mBuffer = buffer;
00075         mSize = length;
00076         mPosition = 0;
00077         mDataChannel = 0;
00078     }
00079 
00080     virtual
00081     ~IOBuffer()
00082     {
00083         if (mDataChannel)
00084             WriteIOBufferToChannel();
00085         if (mBuffer)
00086             delete [] mBuffer;
00087         if (!mDataChannelTmp.empty())
00088             remove(mDataChannelTmp.c_str());
00089     }
00090 
00091     virtual bool
00092     Valid()
00093     {
00094         return true;
00095     }
00096 
00102     virtual PositionType
00103     Available()
00104     {
00105         PositionType res = mSize - mPosition;
00106         if (res < 0)
00107         {
00108             ILOG_ERROR("Available: position beyond size");
00109             return 0;
00110         }
00111         if (res == 1)
00112             return 0;
00113         return res;
00114     }
00115 
00116     virtual int
00117     AvailableInt32()
00118     {
00119         return Min<Int64>(Available(), 0x7fffffff);
00120     }
00121 
00123     virtual PositionType
00124     Size()
00125     {
00126         return mSize;
00127     }
00128 
00129     virtual void
00130     SetSize(PositionType size)
00131     {
00132         mSize = size;
00133     }
00134 
00135     template<class BackInsertIterator>
00136     void
00137     ReadStrings(BackInsertIterator bi, bool skipEC, int count)
00138     {
00139         if (! Valid())
00140             return;
00141 
00142         int c=0;
00143         while (Available())
00144         {
00145             String line = ReadLine();
00146             if (line[0] || !skipEC)
00147             {
00148                 if ((line[0] != '#') || !skipEC)
00149                     *bi++ = line;
00150             }
00151             if (++c==count)
00152                 break;
00153         }
00154     }
00155 
00156     template <class Iterator>
00157     bool
00158     WriteStrings(Iterator begin, Iterator end, int count = -1)
00159     {
00160         if (! Valid())
00161             return false;
00162 
00163         int c=0;
00164         for (Iterator it=begin ; it!=end ; it++)
00165         {
00166             String s = *it;
00167             Puts(s);
00168             if (++c == count)
00169                 break;
00170         }
00171         return true;
00172     }
00173 
00179 
00180     virtual Int64
00181     Read(void* buf, Int64 bytesToRead)
00182     {
00183         Int64 available = Available();
00184         if (available < bytesToRead)
00185             bytesToRead = available;
00186         memcpy(buf, mBuffer + mPosition, bytesToRead);
00187         mPosition += bytesToRead;
00188         return bytesToRead;
00189     }
00190 
00192     virtual String
00193     ReadLine()
00194     {
00195         Int64 start = GetPosition();
00196         Int64 i = 0;
00197         Int64 available = Available();
00198         while ((i < available) && (mBuffer[start+i] != '\n'))
00199             i++;
00200         Int64 skipN = (mBuffer[start+i] == '\n') ? 1 : 0;
00201         SetPosition(start + i + skipN);
00202         if (mBuffer[start+i-1] == '\r') // strip '\r' if present
00203             i--;
00204         const char* p = reinterpret_cast<const char*>(mBuffer + start);
00205         return String(p, i);
00206     }
00207 
00208     virtual Int64
00209     Gets(char* buf, Int64 bytesToRead)
00210     {
00211         Int64 available = Available();
00212         if (available < bytesToRead)
00213             bytesToRead = available;
00214         PositionType start = mPosition;
00215         Int64 i = 0;
00216         while ((i < bytesToRead) && (mBuffer[start+i] != '\n'))
00217         {
00218             buf[i] = mBuffer[start+i];
00219             i++;
00220         }
00221         bytesToRead = i;
00222         mPosition += bytesToRead;
00223         if (mBuffer[start+i] == '\n')
00224             mPosition++;
00225         return bytesToRead;
00226     }
00227 
00228     virtual void
00229     NativeTypeRead(Int8* ptr)
00230     {
00231         DoNativeTypeRead(ptr);
00232     }
00233 
00234     virtual void
00235     NativeTypeRead(UInt8* ptr)
00236     {
00237         DoNativeTypeRead(ptr);
00238     }
00239 
00240     virtual void
00241     NativeTypeRead(Int16* ptr)
00242     {
00243         DoNativeTypeRead(ptr);
00244     }
00245 
00246     virtual void
00247     NativeTypeRead(UInt16* ptr)
00248     {
00249         DoNativeTypeRead(ptr);
00250     }
00251 
00252     virtual void
00253     NativeTypeRead(Int32* ptr)
00254     {
00255         DoNativeTypeRead(ptr);
00256     }
00257 
00258     virtual void
00259     NativeTypeRead(UInt32* ptr)
00260     {
00261         DoNativeTypeRead(ptr);
00262     }
00263     
00264     virtual void
00265     NativeTypeRead(Int64* ptr)
00266     {
00267         DoNativeTypeRead(ptr);
00268     }
00269 
00270     virtual void
00271     NativeTypeRead(UInt64* ptr)
00272     {
00273         DoNativeTypeRead(ptr);
00274     }
00275 
00276     virtual void
00277     NativeTypeRead(Real32* ptr)
00278     {
00279         DoNativeTypeRead(ptr);
00280     }
00281 
00282     virtual void
00283     NativeTypeRead(Real64* ptr)
00284     {
00285         DoNativeTypeRead(ptr);
00286     }
00287 
00288     virtual void
00289     Write(const void* buf, Int64 bytesToWrite)
00290     {
00291         Int64 available = Available();
00292         if (available < bytesToWrite)
00293             bytesToWrite = available;
00294         memcpy(GetBuffer() + GetPosition(), buf, bytesToWrite);
00295         SetPosition(GetPosition() + bytesToWrite);
00296     }
00297 
00298 /*
00300     virtual void
00301     WriteLine(const String& line)
00302     {
00303         ILOG_ERROR("Not implemented yet, involves reallocing..."
00304                    << "this is duplication of available code");
00305     }
00306 */
00307 
00308     virtual Int64
00309     Puts(const char* buf)
00310     {
00311         Int64 len = strlen(buf);
00312         Int64 available = Available();
00313         if (available < len)
00314             len = available;
00315         unsigned char* start = GetBuffer() + GetPosition();
00316         memcpy(start, buf, len);
00317         start[len] = '\n';
00318         SetPosition(GetPosition() + len + 1);
00319         return 1;
00320     }
00321 
00322     virtual Int64
00323     Puts(const String& str)
00324     {
00325         return Puts(str.c_str());
00326     }
00327 
00328     virtual void
00329     NativeTypeWrite(Int8 val)
00330     {
00331         DoNativeTypeWrite(val);
00332     }
00333 
00334     virtual void
00335     NativeTypeWrite(UInt8 val)
00336     {
00337         DoNativeTypeWrite(val);
00338     }
00339 
00340     virtual void
00341     NativeTypeWrite(Int16 val)
00342     {
00343         DoNativeTypeWrite(val);
00344     }
00345 
00346     virtual void
00347     NativeTypeWrite(UInt16 val)
00348     {
00349         DoNativeTypeWrite(val);
00350     }
00351 
00352     virtual void
00353     NativeTypeWrite(Int32 val)
00354     {
00355         DoNativeTypeWrite(val);
00356     }
00357 
00358     virtual void
00359     NativeTypeWrite(UInt32 val)
00360     {
00361         DoNativeTypeWrite(val);
00362     }
00363     
00364     virtual void
00365     NativeTypeWrite(Int64 val)
00366     {
00367         DoNativeTypeWrite(val);
00368     }
00369 
00370     virtual void
00371     NativeTypeWrite(UInt64 val)
00372     {
00373         DoNativeTypeWrite(val);
00374     }
00375 
00376     virtual void
00377     NativeTypeWrite(Real32 val)
00378     {
00379         DoNativeTypeWrite(val);
00380     }
00381 
00382     virtual void
00383     NativeTypeWrite(Real64 val)
00384     {
00385         DoNativeTypeWrite(val);
00386     }
00387 
00389 
00395 
00396     virtual void
00397     SetPosition(PositionType position)
00398     {
00399         mPosition = position;
00400     }
00401 
00402     // For specializations that allow for a growing buffer
00403     virtual void
00404     SetPositionAndSize(PositionType position)
00405     {
00406         mPosition = position;
00407         if (mPosition > mSize)
00408             mSize = position;
00409     }
00410 
00411     virtual PositionType
00412     GetPosition()
00413     {
00414         return mPosition;
00415     }
00416 
00417     virtual void
00418     Rewind()
00419     {
00420         mPosition = 0;
00421     }
00422 
00426     virtual void
00427     FastForward()
00428     {
00429         mPosition = mSize;
00430     }
00431 
00432     virtual PositionType
00433     Seek(PositionType offset, int whence)
00434     {
00435         if (whence == SEEK_CUR)
00436             SetPosition(GetPosition() + offset);
00437         else if (whence == SEEK_END)
00438             SetPosition(Size() + offset);
00439         else if (whence == SEEK_SET)
00440             SetPosition(offset);
00441         else
00442             return -1;
00443 
00444         return GetPosition();
00445     }
00446 
00448 
00452     unsigned char*
00453     GetBuffer()
00454     {
00455         return mBuffer;
00456     }
00457 
00463     void
00464     SetBuffer(unsigned char* buf, PositionType size)
00465     {
00466         mBuffer = buf;
00467         mSize = size;
00468     }
00469 
00472     void
00473     SetUseChannel(Channel* channel, String channelFile, String tmpFile)
00474     {
00475         mDataChannel = channel;
00476         mDataChannelFile = channelFile;
00477         mDataChannelTmp = tmpFile;
00478     }
00479 
00480 protected:
00481 
00482     template <class NativeType>
00483     void
00484     DoNativeTypeRead(NativeType* ptr)
00485     {
00486         String fs = NativeTypeFormat<NativeType>(0);
00487         /* This seems to copy all of the remaining buffer :-(
00488         const char* start = (const char*) (GetBuffer() + GetPosition());
00489         sscanf(start, fs.c_str(), ptr);
00490         int i = 0;
00491         while (start[i] != ' ')
00492             i++;
00493         */
00494         char buf[1024];
00495         unsigned char* src = GetBuffer() + GetPosition();
00496         Int64 i = 0;
00497         while (src[i] != ' ')
00498             buf[i++] = src[i];
00499         buf[i] = ' ';
00500         sscanf(buf, fs.c_str(), ptr);
00501         SetPosition(GetPosition() + i + 1);
00502     }
00503 
00504     template <class NativeType>
00505     void
00506     DoNativeTypeWrite(NativeType val)
00507     {
00508         String fs = NativeTypeFormat<NativeType>(0) + " ";
00509         char buf[1024];
00510         sprintf(buf, fs.c_str(), val);
00511         Int64 i = 0;
00512         while (buf[i] != ' ')
00513             i++;
00514         Write(buf, i+1);
00515     }
00516 
00517     bool
00518     WriteIOBufferToChannel()
00519     {
00520         SetSize(GetPosition()); // assume the last write designates the end
00521         char* buf = mDataChannel->Buffer();
00522         sprintf(buf, "openfilebuffer \"%s\" 0\0", mDataChannelFile.c_str());
00523         mDataChannel->SendRequest(strlen(buf)+1);
00524         if (mDataChannel->LastSendHadError())
00525         {
00526             ILOG_ERROR("WriteIOBufferToChannel: open failed for [" <<
00527                        mDataChannelFile << "]");
00528             return false;
00529         }
00530         int id;
00531         Int64 bufSize;
00532         sscanf(buf, "%d %lld", &id, &bufSize); // bufSize is dummy when writing
00533         ILOG_DEBUG(mDataChannelFile << ", id = " << id);
00534 
00535         Int64 nrWritten = 0;
00536         SetPosition(0);
00537         while (Available() > 0)
00538         {
00539             sprintf(buf, "writefilebuffer %d\0", id);
00540             int used = strlen(buf) + 1;
00541             int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE-used, Available());
00542             Read(buf+used, nrData);
00543             int len = mDataChannel->SendRequest(used+nrData);
00544             if (mDataChannel->LastSendHadError())
00545             {
00546                 ILOG_ERROR("WriteIOBufferToChannel: write failed for [" <<
00547                            mDataChannelFile << "]");
00548                 break;
00549             }
00550             nrWritten += nrData;
00551             ILOG_DEBUG("  wrote " << nrData << " bytes, total = " << nrWritten);
00552         }
00553         sprintf(buf, "closefilebuffer %d\0", id);
00554         mDataChannel->SendRequest(strlen(buf)+1);
00555         return true;
00556     }
00557 
00558     void
00559     CheckDataChannelWrite()
00560     {
00561         if (mDataChannel != 0)
00562         {
00563             WriteIOBufferToChannel();
00564             mDataChannel = 0;
00565         }
00566     }
00567 
00568 private:
00569 
00570     unsigned char* mBuffer;
00571     PositionType   mSize;
00572     PositionType   mPosition;
00573 
00574     Channel*       mDataChannel;
00575     String         mDataChannelFile; // "logical" name of the file at the server
00576     String         mDataChannelTmp; // name of the temporary file
00577 
00578     ILOG_VAR_DEC;
00579 };
00580 
00581 ILOG_VAR_INIT(IOBuffer, Impala.Util);
00582 
00583 inline IOBuffer*
00584 CreateIOBufferFromFile(CString filename)
00585 {
00586     // read all contents of file and put in the IOBuffer
00587     FILE* file = fopen(filename.c_str(), "r");
00588     if (!file)
00589     {
00590         std::cout << "[CreateIOBufferFromFile] failed to open " << filename
00591                   << std::endl;
00592         return 0;
00593     }
00594 
00595     int blockSize = 0x1000;
00596     std::deque<unsigned char*> bufferList;
00597     int read;
00598     do
00599     {
00600         unsigned char* buffer = new unsigned char[blockSize];
00601         read = fread(buffer,1,blockSize,file);
00602         bufferList.push_back(buffer);
00603     }
00604     while (read == blockSize);
00605     fclose(file);
00606 
00607     // create total buffer
00608     int total = ((bufferList.size()-1)*blockSize) + read;
00609     unsigned char* buffer = new unsigned char[total];
00610     int i;
00611     for(i=0 ; i<bufferList.size() - 1 ; i++)
00612     {
00613         memcpy(buffer + (i*blockSize), bufferList[i], blockSize);
00614         delete [] bufferList[i];
00615         bufferList[i] = 0;
00616     }
00617     memcpy(buffer + (i*blockSize), bufferList[i], read); // last block only #read bytes
00618     delete [] bufferList[i];
00619     bufferList[i] = 0;
00620 
00621     return new IOBuffer(total, buffer);
00622 }
00623 
00625 void
00626 Broadcast(IOBuffer* buffer)
00627 {
00628 #ifdef MPI_USED
00629     ILOG_VAR(Link.Mpi.Broadcast);
00630     MPI_Barrier(MPI_COMM_WORLD);
00631     int size = buffer->Size();
00632     MPI_Bcast(&size, 1, MPI_INT, 0, MPI_COMM_WORLD);
00633     unsigned char* sendBuffer;
00634     if(Link::Mpi::MyId() == 0)
00635         sendBuffer = buffer->GetBuffer();
00636     else
00637         sendBuffer = new unsigned char[size];
00638     ILOG_DEBUG(Link::Mpi::MyId() << ": about to broadcast, buf = " << (void*)sendBuffer << "size = " << size);
00639     MPI_Bcast(sendBuffer, size, MPI_CHAR, 0, MPI_COMM_WORLD);
00640     if(Link::Mpi::MyId() != 0)
00641     {
00642         buffer->SetBuffer(sendBuffer, size);
00643     }
00644     ILOG_DEBUG("done");
00645 #endif // MPI_USED
00646 }
00647 
00648 } // namespace Util
00649 } // namespace Impala
00650 
00651 #endif // Impala_Util_IOBuffer_h

Generated on Fri Mar 19 09:31:46 2010 for ImpalaSrc by  doxygen 1.5.1