00001 #ifndef Impala_Util_IOBuffer_h
00002 #define Impala_Util_IOBuffer_h
00003
00004 #include <stdio.h>
00005 #include <string>
00006 #include <deque>
00007 #include <iostream>
00008 #include "Basis/ILog.h"
00009 #include "Util/Channel.h"
00010 #include "Basis/NativeTypeFormats.h"
00011 #include "Link/Mpi/MpiFuncs.h"
00012 #ifdef BZIP2_USED
00013 #include "bzlib.h"
00014 #endif // BZIP2_USED
00015 #ifdef ZLIB_USED
00016 #ifdef WIN32
00017 #include "Png/zlib.h"
00018 #else
00019 #include "zlib.h"
00020 #endif
00021 #endif // ZLIB_USED
00022
00023
00024 namespace Impala
00025 {
00026 namespace Util
00027 {
00028
00052 class IOBuffer
00053 {
00054 public:
00055 typedef Int64 PositionType;
00056
00057 static const UInt32 cBzipBlockSize = 16 * 1024 * 1024;
00058
00065 IOBuffer()
00066 {
00067 mBuffer = 0;
00068 mSize = 0;
00069 mPosition = 0;
00070 mDataChannel = 0;
00071 }
00072
00074 IOBuffer(PositionType length)
00075 {
00076 mBuffer = new unsigned char[length];
00077 mSize = length;
00078 mPosition = 0;
00079 mDataChannel = 0;
00080 }
00081
00085 IOBuffer(PositionType length, unsigned char* buffer)
00086 {
00087 mBuffer = buffer;
00088 mSize = length;
00089 mPosition = 0;
00090 mDataChannel = 0;
00091 }
00092
00095 IOBuffer(String s)
00096 {
00097 mSize = s.size();
00098 mBuffer = new unsigned char [mSize+1];
00099 std::memcpy(mBuffer, s.c_str(), mSize);
00100 mBuffer[mSize] = 0;
00101 mPosition = 0;
00102 mDataChannel = 0;
00103 }
00104
00105 virtual
00106 ~IOBuffer()
00107 {
00108 if (mDataChannel)
00109 WriteIOBufferToChannel();
00110 if (mBuffer)
00111 delete [] mBuffer;
00112 if (!mDataChannelTmp.empty())
00113 remove(mDataChannelTmp.c_str());
00114 }
00115
00116 virtual bool
00117 Valid()
00118 {
00119 return true;
00120 }
00121
00127 virtual PositionType
00128 Available()
00129 {
00130 PositionType res = mSize - mPosition;
00131 if (res < 0)
00132 {
00133 ILOG_ERROR("Available: position beyond size");
00134 return 0;
00135 }
00136 if (res == 1)
00137 return 0;
00138 return res;
00139 }
00140
00141 virtual int
00142 AvailableInt32()
00143 {
00144 return Min<Int64>(Available(), 0x7fffffff);
00145 }
00146
00148 virtual PositionType
00149 Size()
00150 {
00151 return mSize;
00152 }
00153
00154 virtual void
00155 SetSize(PositionType size)
00156 {
00157 mSize = size;
00158 }
00159
00160 template<class BackInsertIterator>
00161 void
00162 ReadStrings(BackInsertIterator bi, bool skipComments, int count)
00163 {
00164 if (! Valid())
00165 return;
00166
00167 int c=0;
00168 while (Available())
00169 {
00170 String line = ReadLine();
00171 if (line[0] || !skipComments)
00172 {
00173 if ((line[0] != '#') || !skipComments)
00174 *bi++ = line;
00175 }
00176 if (++c==count)
00177 break;
00178 }
00179 }
00180
00181 template <class Iterator>
00182 bool
00183 WriteStrings(Iterator begin, Iterator end, int count = -1)
00184 {
00185 if (! Valid())
00186 return false;
00187
00188 int c=0;
00189 for (Iterator it=begin ; it!=end ; it++)
00190 {
00191 String s = *it;
00192 Puts(s);
00193 if (++c == count)
00194 break;
00195 }
00196 return true;
00197 }
00198
00204
00205 virtual Int64
00206 Read(void* buf, Int64 bytesToRead)
00207 {
00208 Int64 available = Available();
00209 if (available < bytesToRead)
00210 bytesToRead = available;
00211 memcpy(buf, mBuffer + mPosition, bytesToRead);
00212 mPosition += bytesToRead;
00213 return bytesToRead;
00214 }
00215
00216 #ifdef BZIP2_USED
00217 Int64
00218 ReadBzip(void* buf, Int64 bytesToRead)
00219 {
00220 UInt32 nrBlocks = 0;
00221 Int64 nrBytes = 0;
00222 Read(&nrBlocks, sizeof(UInt32));
00223 Read(&nrBytes, sizeof(Int64));
00224 if (nrBytes > bytesToRead)
00225 {
00226 ILOG_ERROR("ReadBzip: Buffer to small");
00227 return 0;
00228 }
00229
00230 char* bufPtr = (char *) buf;
00231 char* bzipBuf = new char[cBzipBlockSize];
00232 Int64 left = nrBytes;
00233 Int64 nrRead = 0;
00234 for (int i=0 ; i<nrBlocks ; i++)
00235 {
00236 UInt32 bzipSize = 0;
00237 Read(&bzipSize, sizeof(UInt32));
00238 Read(bzipBuf, bzipSize);
00239
00240 UInt32 nr = (left > cBzipBlockSize) ? cBzipBlockSize : left;
00241 int res = BZ2_bzBuffToBuffDecompress(bufPtr, &nr, bzipBuf,
00242 bzipSize, 0, 0);
00243 if (res == BZ_OK)
00244 nrRead += nr;
00245 else
00246 ILOG_ERROR("ReadBzip: Decompression failed");
00247 bufPtr += nr;
00248 left -= nr;
00249 }
00250 delete bzipBuf;
00251 return nrRead;
00252 }
00253 #else
00254 Int64
00255 ReadBzip(void* buf, Int64 bytesToRead)
00256 {
00257 ILOG_ERROR("ReadBzip not supported");
00258 return 0;
00259 }
00260 #endif // BZIP2_USED
00261
00262 #ifdef ZLIB_USED
00263 Int64
00264 ReadZlib(void* buf, Int64 bytesToRead)
00265 {
00266 UInt32 nrBlocks = 0;
00267 Int64 nrBytes = 0;
00268 Read(&nrBlocks, sizeof(UInt32));
00269 Read(&nrBytes, sizeof(Int64));
00270 if (nrBytes > bytesToRead)
00271 {
00272 ILOG_ERROR("ReadZlib: Buffer to small");
00273 return 0;
00274 }
00275
00276 unsigned char* bufPtr = (unsigned char *) buf;
00277 unsigned char* bzipBuf = new unsigned char[cBzipBlockSize];
00278 Int64 left = nrBytes;
00279 Int64 nrRead = 0;
00280 for (int i=0 ; i<nrBlocks ; i++)
00281 {
00282 UInt32 bzipSize = 0;
00283 Read(&bzipSize, sizeof(UInt32));
00284 Read(bzipBuf, bzipSize);
00285
00286 uLongf nr = (left > cBzipBlockSize) ? cBzipBlockSize : left;
00287 int res = uncompress(bufPtr, &nr, bzipBuf, bzipSize);
00288 if (res == Z_OK)
00289 nrRead += nr;
00290 else
00291 ILOG_ERROR("ReadZlib: Decompression failed");
00292 bufPtr += nr;
00293 left -= nr;
00294 }
00295 delete bzipBuf;
00296 return nrRead;
00297 }
00298 #else
00299 Int64
00300 ReadZlib(void* buf, Int64 bytesToRead)
00301 {
00302 ILOG_ERROR("ReadZlib not supported");
00303 return 0;
00304 }
00305 #endif // ZLIB_USED
00306
00308 virtual String
00309 ReadLine()
00310 {
00311 Int64 start = GetPosition();
00312 Int64 i = 0;
00313 Int64 available = Available();
00314 while ((i < available) && (mBuffer[start+i] != '\n'))
00315 i++;
00316 Int64 skipN = (mBuffer[start+i] == '\n') ? 1 : 0;
00317 SetPosition(start + i + skipN);
00318 if ((i > 0) && (mBuffer[start+i-1] == '\r'))
00319 i--;
00320 const char* p = reinterpret_cast<const char*>(mBuffer + start);
00321 return String(p, i);
00322 }
00323
00324 virtual Int64
00325 Gets(char* buf, Int64 bytesToRead)
00326 {
00327 Int64 available = Available();
00328 if (available < bytesToRead)
00329 bytesToRead = available;
00330 PositionType start = mPosition;
00331 Int64 i = 0;
00332 while ((i < bytesToRead) && (mBuffer[start+i] != '\n'))
00333 {
00334 buf[i] = mBuffer[start+i];
00335 i++;
00336 }
00337 bytesToRead = i;
00338 mPosition += bytesToRead;
00339 if (mBuffer[start+i] == '\n')
00340 mPosition++;
00341 return bytesToRead;
00342 }
00343
00344 virtual void
00345 NativeTypeRead(Int8* ptr)
00346 {
00347 DoNativeTypeRead(ptr);
00348 }
00349
00350 virtual void
00351 NativeTypeRead(UInt8* ptr)
00352 {
00353 DoNativeTypeRead(ptr);
00354 }
00355
00356 virtual void
00357 NativeTypeRead(Int16* ptr)
00358 {
00359 DoNativeTypeRead(ptr);
00360 }
00361
00362 virtual void
00363 NativeTypeRead(UInt16* ptr)
00364 {
00365 DoNativeTypeRead(ptr);
00366 }
00367
00368 virtual void
00369 NativeTypeRead(Int32* ptr)
00370 {
00371 DoNativeTypeRead(ptr);
00372 }
00373
00374 virtual void
00375 NativeTypeRead(UInt32* ptr)
00376 {
00377 DoNativeTypeRead(ptr);
00378 }
00379
00380 virtual void
00381 NativeTypeRead(Int64* ptr)
00382 {
00383 DoNativeTypeRead(ptr);
00384 }
00385
00386 virtual void
00387 NativeTypeRead(UInt64* ptr)
00388 {
00389 DoNativeTypeRead(ptr);
00390 }
00391
00392 virtual void
00393 NativeTypeRead(Real32* ptr)
00394 {
00395 DoNativeTypeRead(ptr);
00396 }
00397
00398 virtual void
00399 NativeTypeRead(Real64* ptr)
00400 {
00401 DoNativeTypeRead(ptr);
00402 }
00403
00404 virtual Int64
00405 Write(const void* buf, Int64 bytesToWrite)
00406 {
00407 Int64 available = Available();
00408 if (available < bytesToWrite)
00409 bytesToWrite = available;
00410 memcpy(GetBuffer() + GetPosition(), buf, bytesToWrite);
00411 SetPosition(GetPosition() + bytesToWrite);
00412 return bytesToWrite;
00413 }
00414
00415 #ifdef BZIP2_USED
00416 void
00417 WriteBzip(void* srcBuf, Int64 bytesToWrite)
00418 {
00419 UInt32 nrBlocks = bytesToWrite / cBzipBlockSize;
00420 if (bytesToWrite % cBzipBlockSize != 0)
00421 nrBlocks++;
00422 Write(&nrBlocks, sizeof(UInt32));
00423 Write(&bytesToWrite, sizeof(Int64));
00424
00425
00426
00427
00428 const UInt32 cBzipBufCap = cBzipBlockSize * 1.01 + 600;
00429 char* srcBufPtr = (char *) srcBuf;
00430 char* bzipBuf = new char[cBzipBufCap];
00431 Int64 left = bytesToWrite;
00432 for (int i=0 ; i<nrBlocks ; i++)
00433 {
00434 Int64 nr = left;
00435 if (nr > cBzipBlockSize)
00436 nr = cBzipBlockSize;
00437 UInt32 bzipSize = cBzipBufCap;
00438 int res = BZ2_bzBuffToBuffCompress(bzipBuf, &bzipSize, srcBufPtr,
00439 nr, 9, 1, 30);
00440 if (res == BZ_OK)
00441 {
00442 Write(&bzipSize, sizeof(UInt32));
00443 Write(bzipBuf, bzipSize);
00444 }
00445 else
00446 {
00447 ILOG_ERROR("WriteBzip: Compression failed");
00448 }
00449 srcBufPtr += nr;
00450 left -= nr;
00451 }
00452 delete bzipBuf;
00453 }
00454 #else
00455 void
00456 WriteBzip(void* srcBuf, Int64 bytesToWrite)
00457 {
00458 ILOG_ERROR("WriteBzip not supported");
00459 }
00460 #endif // BZIP2_USED
00461
00462 #ifdef ZLIB_USED
00463 void
00464 WriteZlib(void* srcBuf, Int64 bytesToWrite)
00465 {
00466 UInt32 nrBlocks = bytesToWrite / cBzipBlockSize;
00467 if (bytesToWrite % cBzipBlockSize != 0)
00468 nrBlocks++;
00469 Write(&nrBlocks, sizeof(UInt32));
00470 Write(&bytesToWrite, sizeof(Int64));
00471
00472
00473
00474
00475
00476 const UInt32 cBzipBufCap = compressBound(cBzipBlockSize) + 32;
00477 unsigned char* srcBufPtr = (unsigned char *) srcBuf;
00478 unsigned char* bzipBuf = new unsigned char[cBzipBufCap];
00479 Int64 left = bytesToWrite;
00480 for (int i=0 ; i<nrBlocks ; i++)
00481 {
00482 Int64 nr = left;
00483 if (nr > cBzipBlockSize)
00484 nr = cBzipBlockSize;
00485 uLongf bzipSize = cBzipBufCap;
00486 int res = compress(bzipBuf, &bzipSize, srcBufPtr, nr);
00487 if (res == Z_OK)
00488 {
00489 Write(&bzipSize, sizeof(UInt32));
00490 Write(bzipBuf, bzipSize);
00491 }
00492 else
00493 {
00494 ILOG_ERROR("WriteZlib: Compression failed");
00495 }
00496 srcBufPtr += nr;
00497 left -= nr;
00498 }
00499 delete bzipBuf;
00500 }
00501 #else
00502 void
00503 WriteZlib(void* srcBuf, Int64 bytesToWrite)
00504 {
00505 ILOG_ERROR("WriteZlib not supported");
00506 }
00507 #endif // BZIP2_USED
00508
00509
00511
00512
00513
00514
00515
00516
00517
00518
00519 virtual Int64
00520 Puts(const char* buf)
00521 {
00522 Int64 len = strlen(buf);
00523 Int64 available = Available();
00524 if (available < len)
00525 len = available;
00526 unsigned char* start = GetBuffer() + GetPosition();
00527 memcpy(start, buf, len);
00528 start[len] = '\n';
00529 SetPosition(GetPosition() + len + 1);
00530 return 1;
00531 }
00532
00533 virtual Int64
00534 Puts(const String& str)
00535 {
00536 return Puts(str.c_str());
00537 }
00538
00539 virtual void
00540 NativeTypeWrite(Int8 val)
00541 {
00542 DoNativeTypeWrite(val);
00543 }
00544
00545 virtual void
00546 NativeTypeWrite(UInt8 val)
00547 {
00548 DoNativeTypeWrite(val);
00549 }
00550
00551 virtual void
00552 NativeTypeWrite(Int16 val)
00553 {
00554 DoNativeTypeWrite(val);
00555 }
00556
00557 virtual void
00558 NativeTypeWrite(UInt16 val)
00559 {
00560 DoNativeTypeWrite(val);
00561 }
00562
00563 virtual void
00564 NativeTypeWrite(Int32 val)
00565 {
00566 DoNativeTypeWrite(val);
00567 }
00568
00569 virtual void
00570 NativeTypeWrite(UInt32 val)
00571 {
00572 DoNativeTypeWrite(val);
00573 }
00574
00575 virtual void
00576 NativeTypeWrite(Int64 val)
00577 {
00578 DoNativeTypeWrite(val);
00579 }
00580
00581 virtual void
00582 NativeTypeWrite(UInt64 val)
00583 {
00584 DoNativeTypeWrite(val);
00585 }
00586
00587 virtual void
00588 NativeTypeWrite(Real32 val)
00589 {
00590 DoNativeTypeWrite(val);
00591 }
00592
00593 virtual void
00594 NativeTypeWrite(Real64 val)
00595 {
00596 DoNativeTypeWrite(val);
00597 }
00598
00600
00606
00607 virtual void
00608 SetPosition(PositionType position)
00609 {
00610 mPosition = position;
00611 }
00612
00613
00614 virtual void
00615 SetPositionAndSize(PositionType position)
00616 {
00617 mPosition = position;
00618 if (mPosition > mSize)
00619 mSize = position;
00620 }
00621
00622 virtual PositionType
00623 GetPosition()
00624 {
00625 return mPosition;
00626 }
00627
00628 virtual void
00629 Rewind()
00630 {
00631 mPosition = 0;
00632 }
00633
00637 virtual void
00638 FastForward()
00639 {
00640 mPosition = mSize;
00641 }
00642
00643 virtual PositionType
00644 Seek(PositionType offset, int whence)
00645 {
00646 if (whence == SEEK_CUR)
00647 SetPosition(GetPosition() + offset);
00648 else if (whence == SEEK_END)
00649 SetPosition(Size() + offset);
00650 else if (whence == SEEK_SET)
00651 SetPosition(offset);
00652 else
00653 return -1;
00654
00655 return GetPosition();
00656 }
00657
00659
00663 unsigned char*
00664 GetBuffer()
00665 {
00666 return mBuffer;
00667 }
00668
00674 void
00675 SetBuffer(unsigned char* buf, PositionType size)
00676 {
00677 mBuffer = buf;
00678 mSize = size;
00679 }
00680
00683 void
00684 SetUseChannel(Channel* channel, String channelFile, String tmpFile)
00685 {
00686 mDataChannel = channel;
00687 mDataChannelFile = channelFile;
00688 mDataChannelTmp = tmpFile;
00689 }
00690
00691 protected:
00692
00693 template <class NativeType>
00694 void
00695 DoNativeTypeRead(NativeType* ptr)
00696 {
00697 String fs = NativeTypeFormat<NativeType>(0);
00698
00699
00700
00701
00702
00703
00704
00705 char buf[1024];
00706 unsigned char* src = GetBuffer() + GetPosition();
00707 Int64 i = 0;
00708 while (src[i] != ' ')
00709 buf[i++] = src[i];
00710 buf[i] = ' ';
00711 sscanf(buf, fs.c_str(), ptr);
00712 SetPosition(GetPosition() + i + 1);
00713 }
00714
00715 template <class NativeType>
00716 void
00717 DoNativeTypeWrite(NativeType val)
00718 {
00719 String fs = NativeTypeFormat<NativeType>(0) + " ";
00720 char buf[1024];
00721 sprintf(buf, fs.c_str(), val);
00722 Int64 i = 0;
00723 while (buf[i] != ' ')
00724 i++;
00725 Write(buf, i+1);
00726 }
00727
00728 bool
00729 WriteIOBufferToChannel()
00730 {
00731 SetSize(GetPosition());
00732 char* buf = mDataChannel->Buffer();
00733 sprintf(buf, "openfilebuffer \"%s\" 0\0", mDataChannelFile.c_str());
00734 mDataChannel->SendRequest(strlen(buf)+1);
00735 if (mDataChannel->LastSendHadError())
00736 {
00737 ILOG_ERROR("WriteIOBufferToChannel: open failed for [" <<
00738 mDataChannelFile << "]");
00739 return false;
00740 }
00741 int id;
00742 Int64 bufSize;
00743 sscanf(buf, "%d %lld", &id, &bufSize);
00744 ILOG_DEBUG(mDataChannelFile << ", id = " << id);
00745
00746 Int64 nrWritten = 0;
00747 SetPosition(0);
00748 while (Available() > 0)
00749 {
00750 sprintf(buf, "writefilebuffer %d\0", id);
00751 int used = strlen(buf) + 1;
00752 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE-used, Available());
00753 Read(buf+used, nrData);
00754 int len = mDataChannel->SendRequest(used+nrData);
00755 if (mDataChannel->LastSendHadError())
00756 {
00757 ILOG_ERROR("WriteIOBufferToChannel: write failed for [" <<
00758 mDataChannelFile << "]");
00759 break;
00760 }
00761 nrWritten += nrData;
00762 ILOG_DEBUG(" wrote " << nrData << " bytes, total = " << nrWritten);
00763 }
00764 sprintf(buf, "closefilebuffer %d\0", id);
00765 mDataChannel->SendRequest(strlen(buf)+1);
00766 return true;
00767 }
00768
00769 void
00770 CheckDataChannelWrite()
00771 {
00772 if (mDataChannel != 0)
00773 {
00774 WriteIOBufferToChannel();
00775 mDataChannel = 0;
00776 }
00777 }
00778
00779 private:
00780
00781 unsigned char* mBuffer;
00782 PositionType mSize;
00783 PositionType mPosition;
00784
00785 Channel* mDataChannel;
00786 String mDataChannelFile;
00787 String mDataChannelTmp;
00788
00789 ILOG_VAR_DEC;
00790 };
00791
00792 ILOG_VAR_INIT(IOBuffer, Impala.Util);
00793
00794 inline IOBuffer*
00795 CreateIOBufferFromFile(CString filename)
00796 {
00797
00798 FILE* file = fopen(filename.c_str(), "r");
00799 if (!file)
00800 {
00801 std::cout << "[CreateIOBufferFromFile] failed to open " << filename
00802 << std::endl;
00803 return 0;
00804 }
00805
00806 int blockSize = 0x1000;
00807 std::deque<unsigned char*> bufferList;
00808 int read;
00809 do
00810 {
00811 unsigned char* buffer = new unsigned char[blockSize];
00812 read = fread(buffer,1,blockSize,file);
00813 bufferList.push_back(buffer);
00814 }
00815 while (read == blockSize);
00816 fclose(file);
00817
00818
00819 int total = ((bufferList.size()-1)*blockSize) + read;
00820 unsigned char* buffer = new unsigned char[total];
00821 int i;
00822 for(i=0 ; i<bufferList.size() - 1 ; i++)
00823 {
00824 memcpy(buffer + (i*blockSize), bufferList[i], blockSize);
00825 delete [] bufferList[i];
00826 bufferList[i] = 0;
00827 }
00828 memcpy(buffer + (i*blockSize), bufferList[i], read);
00829 delete [] bufferList[i];
00830 bufferList[i] = 0;
00831
00832 return new IOBuffer(total, buffer);
00833 }
00834
00836 inline void
00837 Broadcast(IOBuffer* buffer, int root)
00838 {
00839 #ifdef MPI_USED
00840 ILOG_VAR(Impala.Util.Broadcast);
00841 MPI_Barrier(MPI_COMM_WORLD);
00842 Int64 size = buffer->Size();
00843 MPI_Bcast(&size, 1, MPI_LONG_LONG, root, MPI_COMM_WORLD);
00844 unsigned char* sendBuffer;
00845 if (Link::Mpi::MyId() == root)
00846 sendBuffer = buffer->GetBuffer();
00847 else
00848 sendBuffer = new unsigned char[size];
00849 ILOG_DEBUG(Link::Mpi::MyId() << ": about to broadcast, buf = " <<
00850 (void*)sendBuffer << "size = " << size);
00851 unsigned char* curPos = sendBuffer;
00852 Int64 transmitted = 0;
00853 while (transmitted < size)
00854 {
00855 int maxBlock = 1024 * 1024 * 1024;
00856 int blockSize = (size - transmitted > maxBlock) ? maxBlock
00857 : size - transmitted;
00858 MPI_Bcast(curPos, blockSize, MPI_CHAR, root, MPI_COMM_WORLD);
00859 curPos += blockSize;
00860 transmitted += blockSize;
00861 }
00862 if (Link::Mpi::MyId() != root)
00863 {
00864 buffer->SetBuffer(sendBuffer, size);
00865 }
00866 ILOG_DEBUG("done");
00867 #endif // MPI_USED
00868 }
00869
00870 }
00871 }
00872
00873 #endif // Impala_Util_IOBuffer_h