Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

IOBuffer.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_IOBuffer_h
00002 #define Impala_Util_IOBuffer_h
00003 
00004 #include <stdio.h>
00005 #include <string>
00006 #include <deque>
00007 #include <iostream>
00008 #include "Basis/ILog.h"
00009 #include "Util/Channel.h"
00010 #include "Basis/NativeTypeFormats.h"
00011 #include "Link/Mpi/MpiFuncs.h"
00012 #ifdef BZIP2_USED
00013 #include "bzlib.h"
00014 #endif // BZIP2_USED
00015 #ifdef ZLIB_USED
00016 #ifdef WIN32
00017 #include "Png/zlib.h"
00018 #else
00019 #include "zlib.h"
00020 #endif
00021 #endif // ZLIB_USED
00022 
00023 
00024 namespace Impala
00025 {
00026 namespace Util
00027 {
00028 
00052 class IOBuffer
00053 {
00054 public:
00055     typedef Int64 PositionType;
00056 
00057     static const UInt32 cBzipBlockSize = 16 * 1024 * 1024;
00058 
00065     IOBuffer()
00066     {
00067         mBuffer = 0;
00068         mSize = 0;
00069         mPosition = 0;
00070         mDataChannel = 0;
00071     }
00072  
00074     IOBuffer(PositionType length)
00075     {
00076         mBuffer = new unsigned char[length];
00077         mSize = length;
00078         mPosition = 0;
00079         mDataChannel = 0;
00080     }
00081  
00085     IOBuffer(PositionType length, unsigned char* buffer)
00086     {
00087         mBuffer = buffer;
00088         mSize = length;
00089         mPosition = 0;
00090         mDataChannel = 0;
00091     }
00092 
00095     IOBuffer(String s)
00096     {
00097         mSize = s.size();
00098         mBuffer = new unsigned char [mSize+1];
00099         std::memcpy(mBuffer, s.c_str(), mSize);
00100         mBuffer[mSize] = 0;
00101         mPosition = 0;
00102         mDataChannel = 0;
00103     }
00104 
00105     virtual
00106     ~IOBuffer()
00107     {
00108         if (mDataChannel)
00109             WriteIOBufferToChannel();
00110         if (mBuffer)
00111             delete [] mBuffer;
00112         if (!mDataChannelTmp.empty())
00113             remove(mDataChannelTmp.c_str());
00114     }
00115 
00116     virtual bool
00117     Valid()
00118     {
00119         return true;
00120     }
00121 
00127     virtual PositionType
00128     Available()
00129     {
00130         PositionType res = mSize - mPosition;
00131         if (res < 0)
00132         {
00133             ILOG_ERROR("Available: position beyond size");
00134             return 0;
00135         }
00136         if (res == 1)
00137             return 0;
00138         return res;
00139     }
00140 
00141     virtual int
00142     AvailableInt32()
00143     {
00144         return Min<Int64>(Available(), 0x7fffffff);
00145     }
00146 
00148     virtual PositionType
00149     Size()
00150     {
00151         return mSize;
00152     }
00153 
00154     virtual void
00155     SetSize(PositionType size)
00156     {
00157         mSize = size;
00158     }
00159 
00160     template<class BackInsertIterator>
00161     void
00162     ReadStrings(BackInsertIterator bi, bool skipComments, int count)
00163     {
00164         if (! Valid())
00165             return;
00166 
00167         int c=0;
00168         while (Available())
00169         {
00170             String line = ReadLine();
00171             if (line[0] || !skipComments)
00172             {
00173                 if ((line[0] != '#') || !skipComments)
00174                     *bi++ = line;
00175             }
00176             if (++c==count)
00177                 break;
00178         }
00179     }
00180 
00181     template <class Iterator>
00182     bool
00183     WriteStrings(Iterator begin, Iterator end, int count = -1)
00184     {
00185         if (! Valid())
00186             return false;
00187 
00188         int c=0;
00189         for (Iterator it=begin ; it!=end ; it++)
00190         {
00191             String s = *it;
00192             Puts(s);
00193             if (++c == count)
00194                 break;
00195         }
00196         return true;
00197     }
00198 
00204 
00205     virtual Int64
00206     Read(void* buf, Int64 bytesToRead)
00207     {
00208         Int64 available = Available();
00209         if (available < bytesToRead)
00210             bytesToRead = available;
00211         memcpy(buf, mBuffer + mPosition, bytesToRead);
00212         mPosition += bytesToRead;
00213         return bytesToRead;
00214     }
00215 
00216 #ifdef BZIP2_USED
00217     Int64
00218     ReadBzip(void* buf, Int64 bytesToRead)
00219     {
00220         UInt32 nrBlocks = 0;
00221         Int64 nrBytes = 0;
00222         Read(&nrBlocks, sizeof(UInt32));
00223         Read(&nrBytes, sizeof(Int64));
00224         if (nrBytes > bytesToRead)
00225         {
00226             ILOG_ERROR("ReadBzip: Buffer to small");
00227             return 0;
00228         }
00229 
00230         char* bufPtr = (char *) buf;
00231         char* bzipBuf = new char[cBzipBlockSize];
00232         Int64 left = nrBytes;
00233         Int64 nrRead = 0;
00234         for (int i=0 ; i<nrBlocks ; i++)
00235         {
00236             UInt32 bzipSize = 0;
00237             Read(&bzipSize, sizeof(UInt32));
00238             Read(bzipBuf, bzipSize);
00239 
00240             UInt32 nr = (left > cBzipBlockSize) ? cBzipBlockSize : left;
00241             int res = BZ2_bzBuffToBuffDecompress(bufPtr, &nr, bzipBuf,
00242                                                  bzipSize, 0, 0);
00243             if (res == BZ_OK)
00244                 nrRead += nr;
00245             else
00246                 ILOG_ERROR("ReadBzip: Decompression failed");
00247             bufPtr += nr;
00248             left -= nr;
00249         }
00250         delete bzipBuf;
00251         return nrRead;
00252     }
00253 #else
00254     Int64
00255     ReadBzip(void* buf, Int64 bytesToRead)
00256     {
00257         ILOG_ERROR("ReadBzip not supported");
00258         return 0;
00259     }
00260 #endif // BZIP2_USED
00261 
00262 #ifdef ZLIB_USED
00263     Int64
00264     ReadZlib(void* buf, Int64 bytesToRead)
00265     {
00266         UInt32 nrBlocks = 0;
00267         Int64 nrBytes = 0;
00268         Read(&nrBlocks, sizeof(UInt32));
00269         Read(&nrBytes, sizeof(Int64));
00270         if (nrBytes > bytesToRead)
00271         {
00272             ILOG_ERROR("ReadZlib: Buffer to small");
00273             return 0;
00274         }
00275 
00276         unsigned char* bufPtr = (unsigned char *) buf;
00277         unsigned char* bzipBuf = new unsigned char[cBzipBlockSize];
00278         Int64 left = nrBytes;
00279         Int64 nrRead = 0;
00280         for (int i=0 ; i<nrBlocks ; i++)
00281         {
00282             UInt32 bzipSize = 0;
00283             Read(&bzipSize, sizeof(UInt32));
00284             Read(bzipBuf, bzipSize);
00285 
00286             uLongf nr = (left > cBzipBlockSize) ? cBzipBlockSize : left;
00287             int res = uncompress(bufPtr, &nr, bzipBuf, bzipSize);
00288             if (res == Z_OK)
00289                 nrRead += nr;
00290             else
00291                 ILOG_ERROR("ReadZlib: Decompression failed");
00292             bufPtr += nr;
00293             left -= nr;
00294         }
00295         delete bzipBuf;
00296         return nrRead;
00297     }
00298 #else
00299     Int64
00300     ReadZlib(void* buf, Int64 bytesToRead)
00301     {
00302         ILOG_ERROR("ReadZlib not supported");
00303         return 0;
00304     }
00305 #endif // ZLIB_USED
00306 
00308     virtual String
00309     ReadLine()
00310     {
00311         Int64 start = GetPosition();
00312         Int64 i = 0;
00313         Int64 available = Available();
00314         while ((i < available) && (mBuffer[start+i] != '\n'))
00315             i++;
00316         Int64 skipN = (mBuffer[start+i] == '\n') ? 1 : 0;
00317         SetPosition(start + i + skipN);
00318         if ((i > 0) && (mBuffer[start+i-1] == '\r')) // strip '\r' if present
00319             i--;
00320         const char* p = reinterpret_cast<const char*>(mBuffer + start);
00321         return String(p, i);
00322     }
00323 
00324     virtual Int64
00325     Gets(char* buf, Int64 bytesToRead)
00326     {
00327         Int64 available = Available();
00328         if (available < bytesToRead)
00329             bytesToRead = available;
00330         PositionType start = mPosition;
00331         Int64 i = 0;
00332         while ((i < bytesToRead) && (mBuffer[start+i] != '\n'))
00333         {
00334             buf[i] = mBuffer[start+i];
00335             i++;
00336         }
00337         bytesToRead = i;
00338         mPosition += bytesToRead;
00339         if (mBuffer[start+i] == '\n')
00340             mPosition++;
00341         return bytesToRead;
00342     }
00343 
00344     virtual void
00345     NativeTypeRead(Int8* ptr)
00346     {
00347         DoNativeTypeRead(ptr);
00348     }
00349 
00350     virtual void
00351     NativeTypeRead(UInt8* ptr)
00352     {
00353         DoNativeTypeRead(ptr);
00354     }
00355 
00356     virtual void
00357     NativeTypeRead(Int16* ptr)
00358     {
00359         DoNativeTypeRead(ptr);
00360     }
00361 
00362     virtual void
00363     NativeTypeRead(UInt16* ptr)
00364     {
00365         DoNativeTypeRead(ptr);
00366     }
00367 
00368     virtual void
00369     NativeTypeRead(Int32* ptr)
00370     {
00371         DoNativeTypeRead(ptr);
00372     }
00373 
00374     virtual void
00375     NativeTypeRead(UInt32* ptr)
00376     {
00377         DoNativeTypeRead(ptr);
00378     }
00379     
00380     virtual void
00381     NativeTypeRead(Int64* ptr)
00382     {
00383         DoNativeTypeRead(ptr);
00384     }
00385 
00386     virtual void
00387     NativeTypeRead(UInt64* ptr)
00388     {
00389         DoNativeTypeRead(ptr);
00390     }
00391 
00392     virtual void
00393     NativeTypeRead(Real32* ptr)
00394     {
00395         DoNativeTypeRead(ptr);
00396     }
00397 
00398     virtual void
00399     NativeTypeRead(Real64* ptr)
00400     {
00401         DoNativeTypeRead(ptr);
00402     }
00403 
00404     virtual Int64
00405     Write(const void* buf, Int64 bytesToWrite)
00406     {
00407         Int64 available = Available();
00408         if (available < bytesToWrite)
00409             bytesToWrite = available;
00410         memcpy(GetBuffer() + GetPosition(), buf, bytesToWrite);
00411         SetPosition(GetPosition() + bytesToWrite);
00412         return bytesToWrite;
00413     }
00414 
00415 #ifdef BZIP2_USED
00416     void
00417     WriteBzip(void* srcBuf, Int64 bytesToWrite)
00418     {
00419         UInt32 nrBlocks = bytesToWrite / cBzipBlockSize;
00420         if (bytesToWrite % cBzipBlockSize != 0)
00421             nrBlocks++;
00422         Write(&nrBlocks, sizeof(UInt32));
00423         Write(&bytesToWrite, sizeof(Int64));
00424 
00425         // BZ man : To guarantee that the compressed data will fit in its buffer,
00426         // allocate an output buffer of size 1% larger than the uncompressed
00427         // data, plus six hundred extra bytes.
00428         const UInt32 cBzipBufCap = cBzipBlockSize * 1.01 + 600;
00429         char* srcBufPtr = (char *) srcBuf;
00430         char* bzipBuf = new char[cBzipBufCap];
00431         Int64 left = bytesToWrite;
00432         for (int i=0 ; i<nrBlocks ; i++)
00433         {
00434             Int64 nr = left;
00435             if (nr > cBzipBlockSize)
00436                 nr = cBzipBlockSize;
00437             UInt32 bzipSize = cBzipBufCap;
00438             int res = BZ2_bzBuffToBuffCompress(bzipBuf, &bzipSize, srcBufPtr,
00439                                                nr, 9, 1, 30);
00440             if (res == BZ_OK)
00441             {
00442                 Write(&bzipSize, sizeof(UInt32));
00443                 Write(bzipBuf, bzipSize);
00444             }
00445             else
00446             {
00447                 ILOG_ERROR("WriteBzip: Compression failed");
00448             }
00449             srcBufPtr += nr;
00450             left -= nr;
00451         }
00452         delete bzipBuf;
00453     }
00454 #else
00455     void
00456     WriteBzip(void* srcBuf, Int64 bytesToWrite)
00457     {
00458         ILOG_ERROR("WriteBzip not supported");
00459     }
00460 #endif // BZIP2_USED
00461 
00462 #ifdef ZLIB_USED
00463     void
00464     WriteZlib(void* srcBuf, Int64 bytesToWrite)
00465     {
00466         UInt32 nrBlocks = bytesToWrite / cBzipBlockSize;
00467         if (bytesToWrite % cBzipBlockSize != 0)
00468             nrBlocks++;
00469         Write(&nrBlocks, sizeof(UInt32));
00470         Write(&bytesToWrite, sizeof(Int64));
00471 
00472         // BZ man : To guarantee that the compressed data will fit in its buffer,
00473         // allocate an output buffer of size 1% larger than the uncompressed
00474         // data, plus six hundred extra bytes.
00475         //const UInt32 cBzipBufCap = cBzipBlockSize * 1.01 + 600;
00476         const UInt32 cBzipBufCap = compressBound(cBzipBlockSize) + 32;
00477         unsigned char* srcBufPtr = (unsigned char *) srcBuf;
00478         unsigned char* bzipBuf = new unsigned char[cBzipBufCap];
00479         Int64 left = bytesToWrite;
00480         for (int i=0 ; i<nrBlocks ; i++)
00481         {
00482             Int64 nr = left;
00483             if (nr > cBzipBlockSize)
00484                 nr = cBzipBlockSize;
00485             uLongf bzipSize = cBzipBufCap;
00486             int res = compress(bzipBuf, &bzipSize, srcBufPtr, nr);
00487             if (res == Z_OK)
00488             {
00489                 Write(&bzipSize, sizeof(UInt32));
00490                 Write(bzipBuf, bzipSize);
00491             }
00492             else
00493             {
00494                 ILOG_ERROR("WriteZlib: Compression failed");
00495             }
00496             srcBufPtr += nr;
00497             left -= nr;
00498         }
00499         delete bzipBuf;
00500     }
00501 #else
00502     void
00503     WriteZlib(void* srcBuf, Int64 bytesToWrite)
00504     {
00505         ILOG_ERROR("WriteZlib not supported");
00506     }
00507 #endif // BZIP2_USED
00508 
00509 /*
00511     virtual void
00512     WriteLine(const String& line)
00513     {
00514         ILOG_ERROR("Not implemented yet, involves reallocing..."
00515                    << "this is duplication of available code");
00516     }
00517 */
00518 
00519     virtual Int64
00520     Puts(const char* buf)
00521     {
00522         Int64 len = strlen(buf);
00523         Int64 available = Available();
00524         if (available < len)
00525             len = available;
00526         unsigned char* start = GetBuffer() + GetPosition();
00527         memcpy(start, buf, len);
00528         start[len] = '\n';
00529         SetPosition(GetPosition() + len + 1);
00530         return 1;
00531     }
00532 
00533     virtual Int64
00534     Puts(const String& str)
00535     {
00536         return Puts(str.c_str());
00537     }
00538 
00539     virtual void
00540     NativeTypeWrite(Int8 val)
00541     {
00542         DoNativeTypeWrite(val);
00543     }
00544 
00545     virtual void
00546     NativeTypeWrite(UInt8 val)
00547     {
00548         DoNativeTypeWrite(val);
00549     }
00550 
00551     virtual void
00552     NativeTypeWrite(Int16 val)
00553     {
00554         DoNativeTypeWrite(val);
00555     }
00556 
00557     virtual void
00558     NativeTypeWrite(UInt16 val)
00559     {
00560         DoNativeTypeWrite(val);
00561     }
00562 
00563     virtual void
00564     NativeTypeWrite(Int32 val)
00565     {
00566         DoNativeTypeWrite(val);
00567     }
00568 
00569     virtual void
00570     NativeTypeWrite(UInt32 val)
00571     {
00572         DoNativeTypeWrite(val);
00573     }
00574     
00575     virtual void
00576     NativeTypeWrite(Int64 val)
00577     {
00578         DoNativeTypeWrite(val);
00579     }
00580 
00581     virtual void
00582     NativeTypeWrite(UInt64 val)
00583     {
00584         DoNativeTypeWrite(val);
00585     }
00586 
00587     virtual void
00588     NativeTypeWrite(Real32 val)
00589     {
00590         DoNativeTypeWrite(val);
00591     }
00592 
00593     virtual void
00594     NativeTypeWrite(Real64 val)
00595     {
00596         DoNativeTypeWrite(val);
00597     }
00598 
00600 
00606 
00607     virtual void
00608     SetPosition(PositionType position)
00609     {
00610         mPosition = position;
00611     }
00612 
00613     // For specializations that allow for a growing buffer
00614     virtual void
00615     SetPositionAndSize(PositionType position)
00616     {
00617         mPosition = position;
00618         if (mPosition > mSize)
00619             mSize = position;
00620     }
00621 
00622     virtual PositionType
00623     GetPosition()
00624     {
00625         return mPosition;
00626     }
00627 
00628     virtual void
00629     Rewind()
00630     {
00631         mPosition = 0;
00632     }
00633 
00637     virtual void
00638     FastForward()
00639     {
00640         mPosition = mSize;
00641     }
00642 
00643     virtual PositionType
00644     Seek(PositionType offset, int whence)
00645     {
00646         if (whence == SEEK_CUR)
00647             SetPosition(GetPosition() + offset);
00648         else if (whence == SEEK_END)
00649             SetPosition(Size() + offset);
00650         else if (whence == SEEK_SET)
00651             SetPosition(offset);
00652         else
00653             return -1;
00654 
00655         return GetPosition();
00656     }
00657 
00659 
00663     unsigned char*
00664     GetBuffer()
00665     {
00666         return mBuffer;
00667     }
00668 
00674     void
00675     SetBuffer(unsigned char* buf, PositionType size)
00676     {
00677         mBuffer = buf;
00678         mSize = size;
00679     }
00680 
00683     void
00684     SetUseChannel(Channel* channel, String channelFile, String tmpFile)
00685     {
00686         mDataChannel = channel;
00687         mDataChannelFile = channelFile;
00688         mDataChannelTmp = tmpFile;
00689     }
00690 
00691 protected:
00692 
00693     template <class NativeType>
00694     void
00695     DoNativeTypeRead(NativeType* ptr)
00696     {
00697         String fs = NativeTypeFormat<NativeType>(0);
00698         /* This seems to copy all of the remaining buffer :-(
00699         const char* start = (const char*) (GetBuffer() + GetPosition());
00700         sscanf(start, fs.c_str(), ptr);
00701         int i = 0;
00702         while (start[i] != ' ')
00703             i++;
00704         */
00705         char buf[1024];
00706         unsigned char* src = GetBuffer() + GetPosition();
00707         Int64 i = 0;
00708         while (src[i] != ' ')
00709             buf[i++] = src[i];
00710         buf[i] = ' ';
00711         sscanf(buf, fs.c_str(), ptr);
00712         SetPosition(GetPosition() + i + 1);
00713     }
00714 
00715     template <class NativeType>
00716     void
00717     DoNativeTypeWrite(NativeType val)
00718     {
00719         String fs = NativeTypeFormat<NativeType>(0) + " ";
00720         char buf[1024];
00721         sprintf(buf, fs.c_str(), val);
00722         Int64 i = 0;
00723         while (buf[i] != ' ')
00724             i++;
00725         Write(buf, i+1);
00726     }
00727 
00728     bool
00729     WriteIOBufferToChannel()
00730     {
00731         SetSize(GetPosition()); // assume the last write designates the end
00732         char* buf = mDataChannel->Buffer();
00733         sprintf(buf, "openfilebuffer \"%s\" 0\0", mDataChannelFile.c_str());
00734         mDataChannel->SendRequest(strlen(buf)+1);
00735         if (mDataChannel->LastSendHadError())
00736         {
00737             ILOG_ERROR("WriteIOBufferToChannel: open failed for [" <<
00738                        mDataChannelFile << "]");
00739             return false;
00740         }
00741         int id;
00742         Int64 bufSize;
00743         sscanf(buf, "%d %lld", &id, &bufSize); // bufSize is dummy when writing
00744         ILOG_DEBUG(mDataChannelFile << ", id = " << id);
00745 
00746         Int64 nrWritten = 0;
00747         SetPosition(0);
00748         while (Available() > 0)
00749         {
00750             sprintf(buf, "writefilebuffer %d\0", id);
00751             int used = strlen(buf) + 1;
00752             int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE-used, Available());
00753             Read(buf+used, nrData);
00754             int len = mDataChannel->SendRequest(used+nrData);
00755             if (mDataChannel->LastSendHadError())
00756             {
00757                 ILOG_ERROR("WriteIOBufferToChannel: write failed for [" <<
00758                            mDataChannelFile << "]");
00759                 break;
00760             }
00761             nrWritten += nrData;
00762             ILOG_DEBUG("  wrote " << nrData << " bytes, total = " << nrWritten);
00763         }
00764         sprintf(buf, "closefilebuffer %d\0", id);
00765         mDataChannel->SendRequest(strlen(buf)+1);
00766         return true;
00767     }
00768 
00769     void
00770     CheckDataChannelWrite()
00771     {
00772         if (mDataChannel != 0)
00773         {
00774             WriteIOBufferToChannel();
00775             mDataChannel = 0;
00776         }
00777     }
00778 
00779 private:
00780 
00781     unsigned char* mBuffer;
00782     PositionType   mSize;
00783     PositionType   mPosition;
00784 
00785     Channel*       mDataChannel;
00786     String         mDataChannelFile; // "logical" name of the file at the server
00787     String         mDataChannelTmp; // name of the temporary file
00788 
00789     ILOG_VAR_DEC;
00790 };
00791 
00792 ILOG_VAR_INIT(IOBuffer, Impala.Util);
00793 
00794 inline IOBuffer*
00795 CreateIOBufferFromFile(CString filename)
00796 {
00797     // read all contents of file and put in the IOBuffer
00798     FILE* file = fopen(filename.c_str(), "r");
00799     if (!file)
00800     {
00801         std::cout << "[CreateIOBufferFromFile] failed to open " << filename
00802                   << std::endl;
00803         return 0;
00804     }
00805 
00806     int blockSize = 0x1000;
00807     std::deque<unsigned char*> bufferList;
00808     int read;
00809     do
00810     {
00811         unsigned char* buffer = new unsigned char[blockSize];
00812         read = fread(buffer,1,blockSize,file);
00813         bufferList.push_back(buffer);
00814     }
00815     while (read == blockSize);
00816     fclose(file);
00817 
00818     // create total buffer
00819     int total = ((bufferList.size()-1)*blockSize) + read;
00820     unsigned char* buffer = new unsigned char[total];
00821     int i;
00822     for(i=0 ; i<bufferList.size() - 1 ; i++)
00823     {
00824         memcpy(buffer + (i*blockSize), bufferList[i], blockSize);
00825         delete [] bufferList[i];
00826         bufferList[i] = 0;
00827     }
00828     memcpy(buffer + (i*blockSize), bufferList[i], read); // last block only #read bytes
00829     delete [] bufferList[i];
00830     bufferList[i] = 0;
00831 
00832     return new IOBuffer(total, buffer);
00833 }
00834 
00836 inline void
00837 Broadcast(IOBuffer* buffer, int root)
00838 {
00839 #ifdef MPI_USED
00840     ILOG_VAR(Impala.Util.Broadcast);
00841     MPI_Barrier(MPI_COMM_WORLD);
00842     Int64 size = buffer->Size();
00843     MPI_Bcast(&size, 1, MPI_LONG_LONG, root, MPI_COMM_WORLD);
00844     unsigned char* sendBuffer;
00845     if (Link::Mpi::MyId() == root)
00846         sendBuffer = buffer->GetBuffer();
00847     else
00848         sendBuffer = new unsigned char[size];
00849     ILOG_DEBUG(Link::Mpi::MyId() << ": about to broadcast, buf = " <<
00850                (void*)sendBuffer << "size = " << size);
00851     unsigned char* curPos = sendBuffer;
00852     Int64 transmitted = 0;
00853     while (transmitted < size)
00854     {
00855         int maxBlock = 1024 * 1024 * 1024;
00856         int blockSize = (size - transmitted > maxBlock) ? maxBlock 
00857                                                         : size - transmitted;
00858         MPI_Bcast(curPos, blockSize, MPI_CHAR, root, MPI_COMM_WORLD);
00859         curPos += blockSize;
00860         transmitted += blockSize;
00861     }
00862     if (Link::Mpi::MyId() != root)
00863     {
00864         buffer->SetBuffer(sendBuffer, size);
00865     }
00866     ILOG_DEBUG("done");
00867 #endif // MPI_USED
00868 }
00869 
00870 } // namespace Util
00871 } // namespace Impala
00872 
00873 #endif // Impala_Util_IOBuffer_h

Generated on Thu Jan 13 09:05:15 2011 for ImpalaSrc by  doxygen 1.5.1