00001 #ifndef Impala_Util_IOBufferChannel_h
00002 #define Impala_Util_IOBufferChannel_h
00003
00004 #include "Util/IOBuffer.h"
00005 #include "Basis/CmdOptions.h"
00006 #include "Util/ChannelPool.h"
00007 #include "Basis/NativeTypeFormats.h"
00008
00009 namespace Impala
00010 {
00011 namespace Util
00012 {
00013
00014
00020 class IOBufferChannel : public IOBuffer
00021 {
00022 public:
00023
00024 IOBufferChannel(CString fileName, bool readMode, Channel* channel = 0)
00025 {
00026 mReadMode = readMode;
00027 mId = -1;
00028 if (channel == 0)
00029 {
00030 CmdOptions& options = CmdOptions::GetInstance();
00031 String dataServer = options.GetString("dataServer");
00032 String passwordFile = options.GetString("passwordFile");
00033 channel = ChannelPool::Instance().Get(dataServer, passwordFile);
00034 }
00035 mMyChannel = channel;
00036 char* buf = mMyChannel->Buffer();
00037 sprintf(buf, "openfilebuffer \"%s\" %d\0", fileName.c_str(), readMode);
00038 mMyChannel->SendRequest(strlen(buf)+1);
00039 if (mMyChannel->LastSendHadError())
00040 {
00041 ILOG_ERROR("Failed to open " << fileName);
00042 return;
00043 }
00044 Int64 bufSize;
00045 sscanf(buf, "%d %lld", &mId, &bufSize);
00046 if (mReadMode)
00047 {
00048 SetSize(bufSize);
00049 ILOG_DEBUG(fileName << ", id = " << mId << ", size = " << bufSize);
00050 }
00051 else
00052 {
00053 ILOG_DEBUG(fileName << ", id = " << mId);
00054 }
00055 }
00056
00057 virtual
00058 ~IOBufferChannel()
00059 {
00060 if (mId != -1)
00061 {
00062 char* buf = mMyChannel->Buffer();
00063 sprintf(buf, "closefilebuffer %d\0", mId);
00064 mMyChannel->SendRequest(strlen(buf)+1);
00065 mId = -1;
00066 }
00067 }
00068
00069 virtual bool
00070 Valid()
00071 {
00072 return (mId != -1);
00073 }
00074
00075 virtual Int64
00076 Read(void* buf, Int64 bytesToRead)
00077 {
00078 char* chBuf = mMyChannel->Buffer();
00079 Int64 nrRead = 0;
00080 char* bufPtr = (char*) buf;
00081 while (nrRead < bytesToRead)
00082 {
00083 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE, bytesToRead);
00084 sprintf(chBuf, "readfilebuffer %d %d\0", mId, nrData);
00085 int len = mMyChannel->SendRequest(strlen(chBuf)+1);
00086 if (mMyChannel->LastSendHadError())
00087 return 0;
00088 memcpy(bufPtr + nrRead, chBuf, len);
00089 if (len == 0)
00090 break;
00091 nrRead += len;
00092 }
00093 SetPosition(GetPosition() + nrRead);
00094 return nrRead;
00095 }
00096
00097 virtual String
00098 ReadLine()
00099 {
00100
00101 Int64 available = Available();
00102 Int64 maxLineSize = 16384;
00103 if (available < maxLineSize)
00104 maxLineSize = available + 1;
00105 char* buf = new char[maxLineSize];
00106 PositionType start = GetPosition();
00107 Int64 nrRead = Read(buf, maxLineSize);
00108 Int64 i = 0;
00109 while ((i < maxLineSize) && (buf[i] != '\n'))
00110 i++;
00111 Int64 skipN = (buf[i] == '\n') ? 1 : 0;
00112 SetPosition(start + i + skipN);
00113 if (buf[i-1] == '\r')
00114 i--;
00115 buf[i] = '\0';
00116 String res(buf);
00117 delete buf;
00118 return res;
00119 }
00120
00121 virtual Int64
00122 Gets(char* buf, Int64 bytesToRead)
00123 {
00124 Int64 available = Available();
00125 if (available < bytesToRead)
00126 bytesToRead = available;
00127 PositionType start = GetPosition();
00128 Int64 nrRead = Read(buf, bytesToRead);
00129 Int64 i = 0;
00130 while ((i < bytesToRead) && (buf[i] != '\n'))
00131 i++;
00132 bytesToRead = i;
00133 if (buf[i] == '\n')
00134 i++;
00135 SetPosition(start + i);
00136 return bytesToRead;
00137 }
00138
00139 virtual void
00140 NativeTypeRead(Int8* ptr)
00141 {
00142 DoNativeTypeRead(ptr);
00143 }
00144
00145 virtual void
00146 NativeTypeRead(UInt8* ptr)
00147 {
00148 DoNativeTypeRead(ptr);
00149 }
00150
00151 virtual void
00152 NativeTypeRead(Int16* ptr)
00153 {
00154 DoNativeTypeRead(ptr);
00155 }
00156
00157 virtual void
00158 NativeTypeRead(UInt16* ptr)
00159 {
00160 DoNativeTypeRead(ptr);
00161 }
00162
00163 virtual void
00164 NativeTypeRead(Int32* ptr)
00165 {
00166 DoNativeTypeRead(ptr);
00167 }
00168
00169 virtual void
00170 NativeTypeRead(UInt32* ptr)
00171 {
00172 DoNativeTypeRead(ptr);
00173 }
00174
00175 virtual void
00176 NativeTypeRead(Real32* ptr)
00177 {
00178 DoNativeTypeRead(ptr);
00179 }
00180
00181 virtual void
00182 NativeTypeRead(Real64* ptr)
00183 {
00184 DoNativeTypeRead(ptr);
00185 }
00186
00187 virtual void
00188 Write(const void* buf, Int64 bytesToWrite)
00189 {
00190 char* chBuf = mMyChannel->Buffer();
00191 Int64 nrWritten = 0;
00192 char* bufPtr = (char*) buf;
00193 while (nrWritten < bytesToWrite)
00194 {
00195 sprintf(chBuf, "writefilebuffer %d\0", mId);
00196 int used = strlen(chBuf) + 1;
00197 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE - used,
00198 bytesToWrite - nrWritten);
00199 memcpy(chBuf + used, bufPtr + nrWritten, nrData);
00200 int len = mMyChannel->SendRequest(used + nrData);
00201 if (mMyChannel->LastSendHadError())
00202 break;
00203 nrWritten += nrData;
00204 }
00205 SetPosition(GetPosition() + bytesToWrite);
00206 }
00207
00208 virtual Int64
00209 Puts(const char* buf)
00210 {
00211 Int64 len = strlen(buf) + 1;
00212 char* writeBuf = new char[len];
00213 memcpy(writeBuf, buf, len);
00214 writeBuf[len-1] = '\n';
00215 Write(writeBuf, len);
00216 delete writeBuf;
00217 return 1;
00218 }
00219
00220 virtual void
00221 NativeTypeWrite(Int8 val)
00222 {
00223 DoNativeTypeWrite(val);
00224 }
00225
00226 virtual void
00227 NativeTypeWrite(UInt8 val)
00228 {
00229 DoNativeTypeWrite(val);
00230 }
00231
00232 virtual void
00233 NativeTypeWrite(Int16 val)
00234 {
00235 DoNativeTypeWrite(val);
00236 }
00237
00238 virtual void
00239 NativeTypeWrite(UInt16 val)
00240 {
00241 DoNativeTypeWrite(val);
00242 }
00243
00244 virtual void
00245 NativeTypeWrite(Int32 val)
00246 {
00247 DoNativeTypeWrite(val);
00248 }
00249
00250 virtual void
00251 NativeTypeWrite(UInt32 val)
00252 {
00253 DoNativeTypeWrite(val);
00254 }
00255
00256 virtual void
00257 NativeTypeWrite(Real32 val)
00258 {
00259 DoNativeTypeWrite(val);
00260 }
00261
00262 virtual void
00263 NativeTypeWrite(Real64 val)
00264 {
00265 DoNativeTypeWrite(val);
00266 }
00267
00268 virtual void
00269 SetPosition(PositionType position)
00270 {
00271 if (GetPosition() == position)
00272 return;
00273
00274 char* buf = mMyChannel->Buffer();
00275 sprintf(buf, "seekfilebuffer %d %lld %d\0", mId, position, SEEK_SET);
00276 mMyChannel->SendRequest(strlen(buf)+1);
00277 if (mMyChannel->LastSendHadError())
00278 {
00279 ILOG_ERROR("Failed to seek");
00280 return;
00281 }
00282 PositionType newPos;
00283 sscanf(buf, "%lld", &newPos);
00284 IOBuffer::SetPosition(newPos);
00285 }
00286
00287 private:
00288
00289 template <class NativeType>
00290 void
00291 DoNativeTypeRead(NativeType* ptr)
00292 {
00293 String fs = NativeTypeFormat<NativeType>(0);
00294 char buf[1024];
00295 PositionType start = GetPosition();
00296 Int64 nrRead = Read(buf, 1024);
00297 Int64 i = 0;
00298 while ((buf[i] != ' ') && (i < nrRead))
00299 i++;
00300 if(nrRead == 0)
00301 ILOG_ERROR("DoNativeTypeRead read 0 bytes")
00302 sscanf(buf, fs.c_str(), ptr);
00303 SetPosition(start + i + 1);
00304 }
00305
00306 template <class NativeType>
00307 void
00308 DoNativeTypeWrite(NativeType val)
00309 {
00310 String fs = NativeTypeFormat<NativeType>(0) + " ";
00311 char buf[1024];
00312 sprintf(buf, fs.c_str(), val);
00313 Int64 i = 0;
00314 while (buf[i] != ' ')
00315 i++;
00316 Write(buf, i+1);
00317 }
00318
00319 bool mReadMode;
00320 int mId;
00321 Channel* mMyChannel;
00322
00323 ILOG_VAR_DEC;
00324 };
00325
00326 ILOG_VAR_INIT(IOBufferChannel, Impala.Util);
00327
00328 }
00329 }
00330
00331 #endif