00001 #ifndef Impala_Util_ReadIOBufferFromChannel_h
00002 #define Impala_Util_ReadIOBufferFromChannel_h
00003
00004 #include "Util/IOBufferFile.h"
00005
00006 namespace Impala
00007 {
00008 namespace Util
00009 {
00010
00011
00016 inline IOBuffer*
00017 ReadIOBufferFromChannel(Channel* channel, std::string fileName,
00018 std::string useLocalFile)
00019 {
00020 ILOG_VAR(Impala.Util.ReadIOBufferFromChannel);
00021 char* buf = channel->Buffer();
00022 sprintf(buf, "openfilebuffer \"%s\" 1\0", fileName.c_str());
00023 channel->SendRequest(strlen(buf)+1);
00024 if (channel->LastSendHadError())
00025 return 0;
00026 int id = 0;
00027 Int64 bufSize = 0;
00028 sscanf(buf, "%d %lld", &id, &bufSize);
00029 ILOG_DEBUG(fileName << ", id=" << id << ", size=" << bufSize << " from "
00030 << channel->GetServerInfo());
00031
00032 IOBuffer* ioBuf = 0;
00033 if (useLocalFile.empty())
00034 ioBuf = new IOBuffer(bufSize);
00035 else
00036 ioBuf = new IOBufferFile(useLocalFile, false, false);
00037 Int64 nrRead = 0;
00038 while (nrRead < bufSize)
00039 {
00040 sprintf(buf, "readfilebuffer %d %d\0", id, Channel::DATA_BUFFER_SIZE);
00041 int len = channel->SendRequest(strlen(buf)+1);
00042 if (channel->LastSendHadError())
00043 break;
00044 ioBuf->Write(buf, len);
00045 nrRead += len;
00046 ILOG_DEBUG(" wrote " << len << " bytes, total = " << nrRead);
00047 }
00048 sprintf(buf, "closefilebuffer %d\0", id);
00049 channel->SendRequest(strlen(buf)+1);
00050 if (useLocalFile.empty())
00051 {
00052 ioBuf->SetPosition(0);
00053 }
00054 else
00055 {
00056 delete ioBuf;
00057 ioBuf = new IOBufferFile(useLocalFile, true, false);
00058 }
00059 return ioBuf;
00060 }
00061
00062 }
00063 }
00064
00065 #endif