00001 #ifndef Impala_Util_IOBufferChannel_h
00002 #define Impala_Util_IOBufferChannel_h
00003
00004 #include "Util/IOBuffer.h"
00005 #include "Basis/CmdOptions.h"
00006 #include "Util/ChannelPool.h"
00007 #include "Basis/NativeTypeFormats.h"
00008
00009 namespace Impala
00010 {
00011 namespace Util
00012 {
00013
00014
00020 class IOBufferChannel : public IOBuffer
00021 {
00022 public:
00023
00024 IOBufferChannel(CString fileName, bool readMode, Channel* channel = 0)
00025 {
00026 mReadMode = readMode;
00027 mId = -1;
00028 if (channel == 0)
00029 {
00030 CmdOptions& options = CmdOptions::GetInstance();
00031 String dataServer = options.GetString("dataServer");
00032 String passwordFile = options.GetString("passwordFile");
00033 channel = ChannelPool::Instance().Get(dataServer, passwordFile);
00034 }
00035 mMyChannel = channel;
00036 char* buf = mMyChannel->Buffer();
00037 sprintf(buf, "openfilebuffer \"%s\" %d\0", fileName.c_str(), readMode);
00038 mMyChannel->SendRequest(strlen(buf)+1);
00039 if (mMyChannel->LastSendHadError())
00040 {
00041 ILOG_ERROR("Failed to open " << fileName);
00042 return;
00043 }
00044 Int64 bufSize;
00045 sscanf(buf, "%d %lld", &mId, &bufSize);
00046 if (mReadMode)
00047 {
00048 SetSize(bufSize);
00049 ILOG_DEBUG(fileName << ", id = " << mId << ", size = " << bufSize);
00050 }
00051 else
00052 {
00053 ILOG_DEBUG(fileName << ", id = " << mId);
00054 }
00055 }
00056
00057 virtual
00058 ~IOBufferChannel()
00059 {
00060 if (mId != -1)
00061 {
00062 char* buf = mMyChannel->Buffer();
00063 sprintf(buf, "closefilebuffer %d\0", mId);
00064 mMyChannel->SendRequest(strlen(buf)+1);
00065 mId = -1;
00066 }
00067 }
00068
00069 virtual bool
00070 Valid()
00071 {
00072 return (mId != -1);
00073 }
00074
00075 virtual Int64
00076 Read(void* buf, Int64 bytesToRead)
00077 {
00078 char* chBuf = mMyChannel->Buffer();
00079 Int64 nrRead = 0;
00080 char* bufPtr = (char*) buf;
00081 while (nrRead < bytesToRead)
00082 {
00083 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE,
00084 bytesToRead - nrRead);
00085 sprintf(chBuf, "readfilebuffer %d %d\0", mId, nrData);
00086 int len = mMyChannel->SendRequest(strlen(chBuf)+1);
00087 if (mMyChannel->LastSendHadError())
00088 return 0;
00089 memcpy(bufPtr + nrRead, chBuf, len);
00090 if (len == 0)
00091 break;
00092 nrRead += len;
00093 }
00094 SetPosition(GetPosition() + nrRead);
00095 return nrRead;
00096 }
00097
00098 virtual String
00099 ReadLine()
00100 {
00101
00102 Int64 available = Available();
00103 Int64 maxLineSize = 16384;
00104 if (available < maxLineSize)
00105 maxLineSize = available + 1;
00106 char* buf = new char[maxLineSize];
00107 PositionType start = GetPosition();
00108 Int64 nrRead = Read(buf, maxLineSize);
00109 Int64 i = 0;
00110 while ((i < maxLineSize) && (buf[i] != '\n'))
00111 i++;
00112 Int64 skipN = (buf[i] == '\n') ? 1 : 0;
00113 SetPosition(start + i + skipN);
00114 if ((i > 0) && (buf[i-1] == '\r'))
00115 i--;
00116 buf[i] = '\0';
00117 String res(buf);
00118 delete buf;
00119 return res;
00120 }
00121
00122 virtual Int64
00123 Gets(char* buf, Int64 bytesToRead)
00124 {
00125 Int64 available = Available();
00126 if (available < bytesToRead)
00127 bytesToRead = available;
00128 PositionType start = GetPosition();
00129 Int64 nrRead = Read(buf, bytesToRead);
00130 Int64 i = 0;
00131 while ((i < bytesToRead) && (buf[i] != '\n'))
00132 i++;
00133 bytesToRead = i;
00134 if (buf[i] == '\n')
00135 i++;
00136 SetPosition(start + i);
00137 return bytesToRead;
00138 }
00139
00140 virtual void
00141 NativeTypeRead(Int8* ptr)
00142 {
00143 DoNativeTypeRead(ptr);
00144 }
00145
00146 virtual void
00147 NativeTypeRead(UInt8* ptr)
00148 {
00149 DoNativeTypeRead(ptr);
00150 }
00151
00152 virtual void
00153 NativeTypeRead(Int16* ptr)
00154 {
00155 DoNativeTypeRead(ptr);
00156 }
00157
00158 virtual void
00159 NativeTypeRead(UInt16* ptr)
00160 {
00161 DoNativeTypeRead(ptr);
00162 }
00163
00164 virtual void
00165 NativeTypeRead(Int32* ptr)
00166 {
00167 DoNativeTypeRead(ptr);
00168 }
00169
00170 virtual void
00171 NativeTypeRead(UInt32* ptr)
00172 {
00173 DoNativeTypeRead(ptr);
00174 }
00175
00176 virtual void
00177 NativeTypeRead(Int64* ptr)
00178 {
00179 DoNativeTypeRead(ptr);
00180 }
00181
00182 virtual void
00183 NativeTypeRead(UInt64* ptr)
00184 {
00185 DoNativeTypeRead(ptr);
00186 }
00187
00188 virtual void
00189 NativeTypeRead(Real32* ptr)
00190 {
00191 DoNativeTypeRead(ptr);
00192 }
00193
00194 virtual void
00195 NativeTypeRead(Real64* ptr)
00196 {
00197 DoNativeTypeRead(ptr);
00198 }
00199
00200 virtual Int64
00201 Write(const void* buf, Int64 bytesToWrite)
00202 {
00203 char* chBuf = mMyChannel->Buffer();
00204 Int64 nrWritten = 0;
00205 char* bufPtr = (char*) buf;
00206 while (nrWritten < bytesToWrite)
00207 {
00208 sprintf(chBuf, "writefilebuffer %d\0", mId);
00209 int used = strlen(chBuf) + 1;
00210 int nrData = Min<Int64>(Channel::DATA_BUFFER_SIZE - used,
00211 bytesToWrite - nrWritten);
00212 memcpy(chBuf + used, bufPtr + nrWritten, nrData);
00213 int len = mMyChannel->SendRequest(used + nrData);
00214 if (mMyChannel->LastSendHadError())
00215 break;
00216 nrWritten += nrData;
00217 }
00218 SetPosition(GetPosition() + bytesToWrite);
00219 return nrWritten;
00220 }
00221
00222 virtual Int64
00223 Puts(const char* buf)
00224 {
00225 Int64 len = strlen(buf) + 1;
00226 char* writeBuf = new char[len];
00227 memcpy(writeBuf, buf, len);
00228 writeBuf[len-1] = '\n';
00229 Write(writeBuf, len);
00230 delete writeBuf;
00231 return 1;
00232 }
00233
00234 virtual void
00235 NativeTypeWrite(Int8 val)
00236 {
00237 DoNativeTypeWrite(val);
00238 }
00239
00240 virtual void
00241 NativeTypeWrite(UInt8 val)
00242 {
00243 DoNativeTypeWrite(val);
00244 }
00245
00246 virtual void
00247 NativeTypeWrite(Int16 val)
00248 {
00249 DoNativeTypeWrite(val);
00250 }
00251
00252 virtual void
00253 NativeTypeWrite(UInt16 val)
00254 {
00255 DoNativeTypeWrite(val);
00256 }
00257
00258 virtual void
00259 NativeTypeWrite(Int32 val)
00260 {
00261 DoNativeTypeWrite(val);
00262 }
00263
00264 virtual void
00265 NativeTypeWrite(UInt32 val)
00266 {
00267 DoNativeTypeWrite(val);
00268 }
00269
00270 virtual void
00271 NativeTypeWrite(Int64 val)
00272 {
00273 DoNativeTypeWrite(val);
00274 }
00275
00276 virtual void
00277 NativeTypeWrite(UInt64 val)
00278 {
00279 DoNativeTypeWrite(val);
00280 }
00281
00282 virtual void
00283 NativeTypeWrite(Real32 val)
00284 {
00285 DoNativeTypeWrite(val);
00286 }
00287
00288 virtual void
00289 NativeTypeWrite(Real64 val)
00290 {
00291 DoNativeTypeWrite(val);
00292 }
00293
00294 virtual void
00295 SetPosition(PositionType position)
00296 {
00297 if (GetPosition() == position)
00298 return;
00299
00300 char* buf = mMyChannel->Buffer();
00301 sprintf(buf, "seekfilebuffer %d %lld %d\0", mId, position, SEEK_SET);
00302 mMyChannel->SendRequest(strlen(buf)+1);
00303 if (mMyChannel->LastSendHadError())
00304 {
00305 ILOG_ERROR("Failed to seek");
00306 return;
00307 }
00308 PositionType newPos;
00309 sscanf(buf, "%lld", &newPos);
00310 IOBuffer::SetPosition(newPos);
00311 }
00312
00313 private:
00314
00315 template <class NativeType>
00316 void
00317 DoNativeTypeRead(NativeType* ptr)
00318 {
00319 String fs = NativeTypeFormat<NativeType>(0);
00320 char buf[1024];
00321 PositionType start = GetPosition();
00322 Int64 nrRead = Read(buf, 1024);
00323 Int64 i = 0;
00324 while ((buf[i] != ' ') && (i < nrRead))
00325 i++;
00326 if(nrRead == 0)
00327 ILOG_ERROR("DoNativeTypeRead read 0 bytes")
00328 sscanf(buf, fs.c_str(), ptr);
00329 SetPosition(start + i + 1);
00330 }
00331
00332 template <class NativeType>
00333 void
00334 DoNativeTypeWrite(NativeType val)
00335 {
00336 String fs = NativeTypeFormat<NativeType>(0) + " ";
00337 char buf[1024];
00338 sprintf(buf, fs.c_str(), val);
00339 Int64 i = 0;
00340 while (buf[i] != ' ')
00341 i++;
00342 Write(buf, i+1);
00343 }
00344
00345 bool mReadMode;
00346 int mId;
00347 Channel* mMyChannel;
00348
00349 ILOG_VAR_DEC;
00350 };
00351
00352 ILOG_VAR_INIT(IOBufferChannel, Impala.Util);
00353
00354 }
00355 }
00356
00357 #endif