00001 #ifndef Impala_Util_IOBuffer_h
00002 #define Impala_Util_IOBuffer_h
00003
00004 #include <stdio.h>
00005 #include <string>
00006 #include <deque>
00007 #include <iostream>
00008 #include "Basis/ILog.h"
00009 #include "Util/Channel.h"
00010 #include "Basis/NativeTypeFormats.h"
00011 #include "Link/Mpi/MpiFuncs.h"
00012
00013 namespace Impala
00014 {
00015 namespace Util
00016 {
00017
00041 class IOBuffer
00042 {
00043 public:
00044 typedef Int64 PositionType;
00045
00052 IOBuffer()
00053 {
00054 mBuffer = 0;
00055 mSize = 0;
00056 mPosition = 0;
00057 mDataChannel = 0;
00058 }
00059
00061 IOBuffer(PositionType length)
00062 {
00063 mBuffer = new unsigned char[length];
00064 mSize = length;
00065 mPosition = 0;
00066 mDataChannel = 0;
00067 }
00068
00072 IOBuffer(PositionType length, unsigned char* buffer)
00073 {
00074 mBuffer = buffer;
00075 mSize = length;
00076 mPosition = 0;
00077 mDataChannel = 0;
00078 }
00079
00080 virtual
00081 ~IOBuffer()
00082 {
00083 if (mDataChannel)
00084 WriteIOBufferToChannel();
00085 if (mBuffer)
00086 delete [] mBuffer;
00087 if (!mDataChannelTmp.empty())
00088 remove(mDataChannelTmp.c_str());
00089 }
00090
00091 virtual bool
00092 Valid()
00093 {
00094 return true;
00095 }
00096
00102 virtual PositionType
00103 Available()
00104 {
00105 PositionType res = mSize - mPosition;
00106 if (res < 0)
00107 {
00108 ILOG_ERROR("Available: position beyond size");
00109 return 0;
00110 }
00111 if (res == 1)
00112 return 0;
00113 return res;
00114 }
00115
00116 virtual int
00117 AvailableInt32()
00118 {
00119 return Min<Int64>(Available(), 0x7fffffff);
00120 }
00121
00123 virtual PositionType
00124 Size()
00125 {
00126 return mSize;
00127 }
00128
00129 virtual void
00130 SetSize(PositionType size)
00131 {
00132 mSize = size;
00133 }
00134
00135 template<class BackInsertIterator>
00136 void
00137 ReadStrings(BackInsertIterator bi, bool skipEC, int count)
00138 {
00139 if (! Valid())
00140 return;
00141
00142 int c=0;
00143 while (Available())
00144 {
00145 String line = ReadLine();
00146 if (line[0] || !skipEC)
00147 {
00148 if ((line[0] != '#') || !skipEC)
00149 *bi++ = line;
00150 }
00151 if (++c==count)
00152 break;
00153 }
00154 }
00155
00156 template <class Iterator>
00157 bool
00158 WriteStrings(Iterator begin, Iterator end, int count = -1)
00159 {
00160 if (! Valid())
00161 return false;
00162
00163 int c=0;
00164 for (Iterator it=begin ; it!=end ; it++)
00165 {
00166 String s = *it;
00167 Puts(s);
00168 if (++c == count)
00169 break;
00170 }
00171 return true;
00172 }
00173
00179
00180 virtual Int64
00181 Read(void* buf, Int64 bytesToRead)
00182 {
00183 Int64 available = Available();
00184 if (available < bytesToRead)
00185 bytesToRead = available;
00186 memcpy(buf, mBuffer + mPosition, bytesToRead);
00187 mPosition += bytesToRead;
00188 return bytesToRead;
00189 }
00190
00192 virtual String
00193 ReadLine()
00194 {
00195 Int64 start = GetPosition();
00196 Int64 i = 0;
00197 Int64 available = Available();
00198 while ((i < available) && (mBuffer[start+i] != '\n'))
00199 i++;
00200 Int64 skipN = (mBuffer[start+i] == '\n') ? 1 : 0;
00201 SetPosition(start + i + skipN);
00202 if (mBuffer[start+i-1] == '\r')
00203 i--;
00204 const char* p = reinterpret_cast<const char*>(mBuffer + start);
00205 return String(p, i);
00206 }
00207
00208 virtual Int64
00209 Gets(char* buf, Int64 bytesToRead)
00210 {
00211 Int64 available = Available();
00212 if (available < bytesToRead)
00213 bytesToRead = available;
00214 PositionType start = mPosition;
00215 Int64 i = 0;
00216 while ((i < bytesToRead) && (mBuffer[start+i] != '\n'))
00217 {
00218 buf[i] = mBuffer[start+i];
00219 i++;
00220 }
00221 bytesToRead = i;
00222 mPosition += bytesToRead;
00223 if (mBuffer[start+i] == '\n')
00224 mPosition++;
00225 return bytesToRead;
00226 }
00227
00228 virtual void
00229 NativeTypeRead(Int8* ptr)
00230 {
00231 DoNativeTypeRead(ptr);
00232 }
00233
00234 virtual void
00235 NativeTypeRead(UInt8* ptr)
00236 {
00237 DoNativeTypeRead(ptr);
00238 }
00239
00240 virtual void
00241 NativeTypeRead(Int16* ptr)
00242 {
00243 DoNativeTypeRead(ptr);
00244 }
00245
00246 virtual void
00247 NativeTypeRead(UInt16* ptr)
00248 {
00249 DoNativeTypeRead(ptr);
00250 }
00251
00252 virtual void
00253 NativeTypeRead(Int32* ptr)
00254 {
00255 DoNativeTypeRead(ptr);
00256 }
00257
00258 virtual void
00259 NativeTypeRead(UInt32* ptr)
00260 {
00261 DoNativeTypeRead(ptr);
00262 }
00263
00264 virtual void
00265 NativeTypeRead(Int64* ptr)
00266 {
00267 DoNativeTypeRead(ptr);
00268 }
00269
00270 virtual void
00271 NativeTypeRead(UInt64* ptr)
00272 {
00273 DoNativeTypeRead(ptr);
00274 }
00275
00276 virtual void
00277 NativeTypeRead(Real32* ptr)
00278 {
00279 DoNativeTypeRead(ptr);
00280 }
00281
00282 virtual void
00283 NativeTypeRead(Real64* ptr)
00284 {
00285 DoNativeTypeRead(ptr);
00286 }
00287
00288 virtual void
00289 Write(const void* buf, Int64 bytesToWrite)
00290 {
00291 Int64 available = Available();
00292 if (available < bytesToWrite)
00293 bytesToWrite = available;
00294 memcpy(GetBuffer() + GetPosition(), buf, bytesToWrite);
00295 SetPosition(GetPosition() + bytesToWrite);
00296 }
00297
00298
00300
00301
00302
00303
00304
00305
00306
00307
00308 virtual Int64
00309 Puts(const char* buf)
00310 {
00311 Int64 len = strlen(buf);
00312 Int64 available = Available();
00313 if (available < len)
00314 len = available;
00315 unsigned char* start = GetBuffer() + GetPosition();
00316 memcpy(start, buf, len);
00317 start[len] = '\n';
00318 SetPosition(GetPosition() + len + 1);
00319 return 1;
00320 }
00321
00322 virtual Int64
00323 Puts(const String& str)
00324 {
00325 return Puts(str.c_str());
00326 }
00327
00328 virtual void
00329 NativeTypeWrite(Int8 val)
00330 {
00331 DoNativeTypeWrite(val);
00332 }
00333
00334 virtual void
00335 NativeTypeWrite(UInt8 val)
00336 {
00337 DoNativeTypeWrite(val);
00338 }
00339
00340 virtual void
00341 NativeTypeWrite(Int16 val)
00342 {
00343 DoNativeTypeWrite(val);
00344 }
00345
00346 virtual void
00347 NativeTypeWrite(UInt16 val)
00348 {
00349 DoNativeTypeWrite(val);
00350 }
00351
00352 virtual void
00353 NativeTypeWrite(Int32 val)
00354 {
00355 DoNativeTypeWrite(val);
00356 }
00357
00358 virtual void
00359 NativeTypeWrite(UInt32 val)
00360 {
00361 DoNativeTypeWrite(val);
00362 }
00363
00364 virtual void
00365 NativeTypeWrite(Int64 val)
00366 {
00367 DoNativeTypeWrite(val);
00368 }
00369
00370 virtual void
00371 NativeTypeWrite(UInt64 val)
00372 {
00373 DoNativeTypeWrite(val);
00374 }
00375
00376 virtual void
00377 NativeTypeWrite(Real32 val)
00378 {
00379 DoNativeTypeWrite(val);
00380 }
00381
00382 virtual void
00383 NativeTypeWrite(Real64 val)
00384 {
00385 DoNativeTypeWrite(val);
00386 }
00387
00389
00395
00396 virtual void
00397 SetPosition(PositionType position)
00398 {
00399 mPosition = position;
00400 }
00401
00402
00403 virtual void
00404 SetPositionAndSize(PositionType position)
00405 {
00406 mPosition = position;
00407 if (mPosition > mSize)
00408 mSize = position;
00409 }
00410
00411 virtual PositionType
00412 GetPosition()
00413 {
00414 return mPosition;
00415 }
00416
00417 virtual void
00418 Rewind()
00419 {
00420 mPosition = 0;
00421 }
00422
00426 virtual void
00427 FastForward()
00428 {
00429 mPosition = mSize;
00430 }
00431
00432 virtual PositionType
00433 Seek(PositionType offset, int whence)
00434 {
00435 if (whence == SEEK_CUR)
00436 SetPosition(GetPosition() + offset);
00437 else if (whence == SEEK_END)
00438 SetPosition(Size() + offset);
00439 else if (whence == SEEK_SET)
00440 SetPosition(offset);
00441 else
00442 return -1;
00443
00444 return GetPosition();
00445 }
00446
00448
00452 unsigned char*
00453 GetBuffer()
00454 {
00455 return mBuffer;
00456 }
00457
00463 void
00464 SetBuffer(unsigned char* buf, PositionType size)
00465 {
00466 mBuffer = buf;
00467 mSize = size;
00468 }
00469
00472 void
00473 SetUseChannel(Channel* channel, String channelFile, String tmpFile)
00474 {
00475 mDataChannel = channel;
00476 mDataChannelFile = channelFile;
00477 mDataChannelTmp = tmpFile;
00478 }
00479
00480 protected:
00481
00482 template <class NativeType>
00483 void
00484 DoNativeTypeRead(NativeType* ptr)
00485 {
00486 String fs = NativeTypeFormat<NativeType>(0);
00487
00488
00489
00490
00491
00492
00493
00494 char buf[1024];
00495 unsigned char* src = GetBuffer() + GetPosition();
00496 Int64 i = 0;
00497 while (src[i] != ' ')
00498 buf[i++] = src[i];
00499 buf[i] = ' ';
00500 sscanf(buf, fs.c_str(), ptr);
00501 SetPosition(GetPosition() + i + 1);
00502 }
00503
00504 template <class NativeType>
00505 void
00506 DoNativeTypeWrite(NativeType val)
00507 {
00508 String fs = NativeTypeFormat<NativeType>(0) + " ";
00509 char buf[1024];
00510 sprintf(buf, fs.c_str(), val);
00511 Int64 i = 0;
00512 while (buf[i] != ' ')
00513 i++;
00514 Write(buf, i+1);
00515 }
00516
00517 bool
00518 WriteIOBufferToChannel()
00519 {
00520 SetSize(GetPosition());
00521 char* buf = mDataChannel->Buffer();
00522 sprintf(buf, "openfilebuffer \"%s\" 0\0", mDataChannelFile.c_str());
00523 mDataChannel->SendRequest(strlen(buf)+1);
00524 if (mDataChannel->LastSendHadError())
00525 {
00526 ILOG_ERROR("WriteIOBufferToChannel: open failed for [" <<
00527 mDataChannelFile << "]");
00528 return false;
00529 }
00530 int id;
00531 Int64 bufSize;
00532 sscanf(buf, "%d %lld", &id, &bufSize);
00533 ILOG_DEBUG(mDataChannelFile << ", id = " << id);
00534
00535 Int64 nrWritten = 0;
00536 SetPosition(0);
00537 while (Available() > 0)
00538 {
00539 sprintf(buf, "writefilebuffer %d\0", id);
00540 int used = strlen(buf) + 1;
00541 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE-used, Available());
00542 Read(buf+used, nrData);
00543 int len = mDataChannel->SendRequest(used+nrData);
00544 if (mDataChannel->LastSendHadError())
00545 {
00546 ILOG_ERROR("WriteIOBufferToChannel: write failed for [" <<
00547 mDataChannelFile << "]");
00548 break;
00549 }
00550 nrWritten += nrData;
00551 ILOG_DEBUG(" wrote " << nrData << " bytes, total = " << nrWritten);
00552 }
00553 sprintf(buf, "closefilebuffer %d\0", id);
00554 mDataChannel->SendRequest(strlen(buf)+1);
00555 return true;
00556 }
00557
00558 void
00559 CheckDataChannelWrite()
00560 {
00561 if (mDataChannel != 0)
00562 {
00563 WriteIOBufferToChannel();
00564 mDataChannel = 0;
00565 }
00566 }
00567
00568 private:
00569
00570 unsigned char* mBuffer;
00571 PositionType mSize;
00572 PositionType mPosition;
00573
00574 Channel* mDataChannel;
00575 String mDataChannelFile;
00576 String mDataChannelTmp;
00577
00578 ILOG_VAR_DEC;
00579 };
00580
00581 ILOG_VAR_INIT(IOBuffer, Impala.Util);
00582
00583 inline IOBuffer*
00584 CreateIOBufferFromFile(CString filename)
00585 {
00586
00587 FILE* file = fopen(filename.c_str(), "r");
00588 if (!file)
00589 {
00590 std::cout << "[CreateIOBufferFromFile] failed to open " << filename
00591 << std::endl;
00592 return 0;
00593 }
00594
00595 int blockSize = 0x1000;
00596 std::deque<unsigned char*> bufferList;
00597 int read;
00598 do
00599 {
00600 unsigned char* buffer = new unsigned char[blockSize];
00601 read = fread(buffer,1,blockSize,file);
00602 bufferList.push_back(buffer);
00603 }
00604 while (read == blockSize);
00605 fclose(file);
00606
00607
00608 int total = ((bufferList.size()-1)*blockSize) + read;
00609 unsigned char* buffer = new unsigned char[total];
00610 int i;
00611 for(i=0 ; i<bufferList.size() - 1 ; i++)
00612 {
00613 memcpy(buffer + (i*blockSize), bufferList[i], blockSize);
00614 delete [] bufferList[i];
00615 bufferList[i] = 0;
00616 }
00617 memcpy(buffer + (i*blockSize), bufferList[i], read);
00618 delete [] bufferList[i];
00619 bufferList[i] = 0;
00620
00621 return new IOBuffer(total, buffer);
00622 }
00623
00625 void
00626 Broadcast(IOBuffer* buffer)
00627 {
00628 #ifdef MPI_USED
00629 ILOG_VAR(Link.Mpi.Broadcast);
00630 MPI_Barrier(MPI_COMM_WORLD);
00631 int size = buffer->Size();
00632 MPI_Bcast(&size, 1, MPI_INT, 0, MPI_COMM_WORLD);
00633 unsigned char* sendBuffer;
00634 if(Link::Mpi::MyId() == 0)
00635 sendBuffer = buffer->GetBuffer();
00636 else
00637 sendBuffer = new unsigned char[size];
00638 ILOG_DEBUG(Link::Mpi::MyId() << ": about to broadcast, buf = " << (void*)sendBuffer << "size = " << size);
00639 MPI_Bcast(sendBuffer, size, MPI_CHAR, 0, MPI_COMM_WORLD);
00640 if(Link::Mpi::MyId() != 0)
00641 {
00642 buffer->SetBuffer(sendBuffer, size);
00643 }
00644 ILOG_DEBUG("done");
00645 #endif // MPI_USED
00646 }
00647
00648 }
00649 }
00650
00651 #endif // Impala_Util_IOBuffer_h