00001 #ifndef Impala_Persistency_FileSystem_h
00002 #define Impala_Persistency_FileSystem_h
00003
00004 #ifdef unix
00005 #include <sys/stat.h>
00006 #include <sys/types.h>
00007 #else
00008 #include <direct.h>
00009 #include <errno.h>
00010 #endif
00011
00012 #include <vector>
00013
00014 #include "Basis/FileName.h"
00015 #include "Basis/FileNameTmp.h"
00016 #include "Util/ChannelPool.h"
00017 #include "Util/ReadIOBufferFromChannel.h"
00018 #include "Util/IOBufferFile.h"
00019 #include "Util/IOBufferChannel.h"
00020
00021 namespace Impala
00022 {
00023 namespace Persistency
00024 {
00025
00026
00035 class FileSystem
00036 {
00037 public:
00038
00039 FileSystem(CString data, CString dataServer)
00040 {
00041 CmdOptions& options = CmdOptions::GetInstance();
00042 mDataString = data;
00043 mData = StringList(mDataString, ';');
00044 mDataServer = dataServer;
00045 mDataChannel = 0;
00046 if (!mDataServer.empty())
00047 {
00048 ILOG_DEBUG_HEADNODE("Retrieving channel to server " << mDataServer);
00049 String passFile = options.GetString("passwordFile");
00050 mDataChannel = Util::ChannelPool::Instance().Get(mDataServer,
00051 passFile);
00052 }
00053 mOverride = options.GetBool("override");
00054 #ifdef WIN32
00055
00056 _setmaxstdio(2048);
00057 #endif
00058 }
00059
00060 bool
00061 Valid() const
00062 {
00063 return (mDataServer.empty() || mDataChannel);
00064 }
00065
00066
00067 void
00068 SetDataPath(String data)
00069 {
00070 mDataString = data;
00071 mData.clear();
00072 mData = StringList(mDataString, ';');
00073 }
00074
00075 String
00076 GetDataPath()
00077 {
00078 return mDataString;
00079 }
00080
00081
00082 void
00083 SetOverride(bool override)
00084 {
00085 mOverride = override;
00086 }
00087
00088
00089
00090 String
00091 MakeDir(CString dir)
00092 {
00093 return DoMkDirPath(dir);
00094 }
00095
00096 void
00097 DeleteDir(CString dir)
00098 {
00099 if (mDataChannel)
00100 {
00101 char* buf = mDataChannel->Buffer();
00102 sprintf(buf, "deletedir \"%s\" \"%s\"\0", dir.c_str(),
00103 mDataString.c_str());
00104 mDataChannel->SendRequest(strlen(buf)+1);
00105 }
00106 else
00107 {
00108 DoDeleteDir(dir);
00109 }
00110 }
00111
00112
00113
00114 String
00115 GetFilePath(CString file, bool toWrite, bool silent)
00116 {
00117 if (! Valid())
00118 return "";
00119
00120 CString path = file;
00121 if (toWrite)
00122 return GetWritableFile(mData.begin(), mData.end(), path, silent);
00123 return GetReadableFile(mData.begin(), mData.end(), path, silent);
00124 }
00125
00126 String
00127 GetFilePath(CString dir, CString file, bool toWrite, bool silent)
00128 {
00129 if (! Valid())
00130 return "";
00131
00132 String path = FileNameConcat(dir, file);
00133 if (toWrite)
00134 return GetWritableFile(mData.begin(), mData.end(), path, silent);
00135 return GetReadableFile(mData.begin(), mData.end(), path, silent);
00136 }
00137
00138 void
00139 DeleteFile(CString path)
00140 {
00141 if (mDataChannel)
00142 {
00143 char* buf = mDataChannel->Buffer();
00144 sprintf(buf, "deletefile \"%s\"\0", path.c_str());
00145 mDataChannel->SendRequest(strlen(buf)+1);
00146 }
00147 else
00148 {
00149 unlink(path.c_str());
00150 }
00151 }
00152
00173 Util::IOBuffer*
00174 GetIOBuffer(CString fileName, bool readMode, bool useMemoryIfLocal,
00175 String useLocalFileIfRemote, Int64 useLocalSizeIfRemote = 0,
00176 bool useIOBufferChannel = false)
00177 {
00178 Util::IOBuffer* res = 0;
00179 ILOG_DEBUG("GetIOBuffer [" << fileName << "]");
00180 if (! Valid())
00181 return res;
00182
00183 if (mDataChannel)
00184 {
00185 if (readMode)
00186 {
00187 if (useIOBufferChannel)
00188 {
00189 ILOG_DEBUG("GetIOBuffer: Read using Channel to " <<
00190 mDataServer << ": " << fileName);
00191 res = new Util::IOBufferChannel(fileName, true,
00192 mDataChannel);
00193 }
00194 else
00195 {
00196 ILOG_DEBUG("GetIOBuffer: Read using File: " << fileName);
00197
00198
00199
00200
00201 if (useLocalFileIfRemote == "tmp")
00202 useLocalFileIfRemote = FileNameTmp();
00203 res = Util::ReadIOBufferFromChannel(mDataChannel, fileName,
00204 useLocalFileIfRemote);
00205 }
00206 }
00207 else
00208 {
00209 if (useIOBufferChannel)
00210 {
00211 ILOG_DEBUG("GetIOBuffer: Write using Channel " << fileName);
00212 res = new Util::IOBufferChannel(fileName, false,
00213 mDataChannel);
00214 }
00215 else if (useLocalFileIfRemote.empty())
00216 {
00217 ILOG_DEBUG("GetIOBuffer: Write using mem " << fileName);
00218 res = new Util::IOBuffer(useLocalSizeIfRemote);
00219 }
00220 else
00221 {
00222 ILOG_DEBUG("GetIOBuffer: Write using File " << fileName);
00223 if (useLocalFileIfRemote == "tmp")
00224 useLocalFileIfRemote = FileNameTmp();
00225 res = new Util::IOBufferFile(useLocalFileIfRemote, false,
00226 false);
00227 }
00228 if (!useIOBufferChannel)
00229 res->SetUseChannel(mDataChannel, fileName,
00230 useLocalFileIfRemote);
00231 }
00232 }
00233 else
00234 {
00235 res = new Util::IOBufferFile(fileName, readMode, useMemoryIfLocal);
00236 }
00237
00238 if (res)
00239 {
00240 if (!res->Valid())
00241 {
00242 delete res;
00243 res = 0;
00244 }
00245 }
00246 return res;
00247 }
00248
00249 Util::Channel*
00250 GetDataChannel()
00251 {
00252 return mDataChannel;
00253 }
00254
00255 void
00256 Dump() const
00257 {
00258 ILOG_INFO_HEADNODE("Dump: data=[" << mDataString << "] server=[" <<
00259 mDataServer << "]");
00260 }
00261
00262 private:
00263
00264
00265
00266 void
00267 DoMkDir(CString dir)
00268 {
00269 #ifdef unix
00270 mode_t m = 0755;
00271 mkdir(dir.c_str(), m);
00272 #else
00273 if ((_mkdir(StringReplaceAll(dir, "/", "\\").c_str()) != 0) &&
00274 (errno != EEXIST))
00275 {
00276 ILOG_ERROR("[DoMkDir] Failed to create directory: " << dir <<
00277 " (" << strerror(errno) << ")");
00278 }
00279 #endif
00280 }
00281
00282 String
00283 DoMkDirPath(CString dir)
00284 {
00285 if (mDataChannel)
00286 return DoMkDirPathServer(dir);
00287
00288 return DoMkDirPathLocal(ConcatDir(*mData.begin(), dir));
00289 }
00290
00291 String
00292 DoMkDirPathLocal(CString dirWithBackSlashes)
00293 {
00294 String dir = StringReplaceAll(dirWithBackSlashes, "\\", "/");
00295 ILOG_DEBUG("DoMkDirPath: " << dir);
00296
00297 String path = "";
00298 if (dir[0] == '/')
00299 {
00300 #ifdef WIN32
00301 if (dir.find("//") == 0)
00302 {
00303
00304 int headEndPos = 1;
00305 int nextSlashPos = dir.find("/", headEndPos + 1);
00306 if (nextSlashPos != String::npos)
00307 {
00308 headEndPos = nextSlashPos;
00309 nextSlashPos = dir.find("/", headEndPos + 1);
00310 if (nextSlashPos != String::npos)
00311 headEndPos = nextSlashPos;
00312 }
00313 path = dir.substr(0, headEndPos + 1);
00314 dir = dir.substr(headEndPos + 1);
00315 }
00316 else
00317 #endif
00318 path = String("/");
00319 }
00320
00321 StringList subDirs(dir, '/');
00322 for (StringListCI i=subDirs.begin() ; i!=subDirs.end() ; i++)
00323 {
00324 CString subDir = *i;
00325 ILOG_DEBUG(" DoMkDirPath: subDir = " << subDir);
00326 if (subDir.size() == 0 || subDir == ".")
00327 continue;
00328 if ((path.size() > 0) && (*path.rbegin() != '/'))
00329 path = path + "/";
00330 path = path + *i;
00331 DoMkDir(path);
00332 }
00333 return path;
00334 }
00335
00336 String
00337 DoMkDirPathServer(CString dir)
00338 {
00339 char* buf = mDataChannel->Buffer();
00340 sprintf(buf, "domkdirpath \"%s\" \"%s\"\0", dir.c_str(),
00341 mDataString.c_str());
00342 int len = mDataChannel->SendRequest(strlen(buf)+1);
00343 if (mDataChannel->LastSendHadError())
00344 return "";
00345 buf[len] = 0;
00346 String res(buf);
00347 ILOG_DEBUG("Got dir [" << res << "]");
00348 return res;
00349 }
00350
00351 void
00352 DoDeleteDir(String dir)
00353 {
00354 dir = ConcatDir(*mData.begin(), dir);
00355 #ifdef WIN32
00356 dir = StringReplaceAll(dir, "/", "\\", true);
00357 String cmdLine("if exist " + dir + " rmdir /s/q " + dir);
00358 #else
00359 while (StringEndsWith(dir, "/."))
00360 dir = dir.substr(0, dir.size()-2);
00361 String cmdLine("rm -rf " + dir);
00362 #endif
00363 char sysBuf[2048];
00364 ILOG_INFO("[DoDeleteDir] " << cmdLine);
00365 sprintf(sysBuf, "%s", cmdLine.c_str());
00366
00367 int sysRes = system(sysBuf);
00368 if (sysRes != 0)
00369 {
00370 ILOG_ERROR("[DoDeleteDir] System call failed: [" << sysBuf << "]");
00371 }
00372
00373 }
00374
00375 String
00376 GetReadableFile(StringListCI begin, StringListCI end, CString file,
00377 bool silent)
00378 {
00379 if (mDataChannel)
00380 return GetReadableFileServer(file, silent);
00381 return GetReadableFileLocal(begin, end, file, silent);
00382 }
00383
00384 String
00385 GetReadableFileLocal(StringListCI begin, StringListCI end, CString file,
00386 bool silent)
00387 {
00388 StringListCI i;
00389 for (i=begin ; i!=end ; i++)
00390 {
00391 String d = StringResolveEnv(*i);
00392 String path = FileNameConcat(d, file);
00393 ILOG_DEBUG("GetReadableFile: trying " << path);
00394 if (Util::IOBufferFile::FileExists(path))
00395 return path;
00396 }
00397
00398 if (!silent)
00399 {
00400 ILOG_ERROR("Unable to find " << file << " in path ");
00401 for (i=begin ; i!=end ; i++)
00402 ILOG_INFO(" " << *i);
00403 }
00404 return String("");
00405 }
00406
00407 String
00408 GetReadableFileServer(CString file, bool silent)
00409 {
00410 char* buf = mDataChannel->Buffer();
00411 sprintf(buf, "getreadablefile \"%s\" \"%s\" %d\0", file.c_str(),
00412 mDataString.c_str(), silent);
00413 int len = mDataChannel->SendRequest(strlen(buf)+1);
00414 if (mDataChannel->LastSendHadError())
00415 return "";
00416 buf[len] = 0;
00417 String res(buf);
00418 ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00419 return res;
00420 }
00421
00422 String
00423 GetWritableFile(StringListCI begin, StringListCI end, CString file,
00424 bool silent)
00425 {
00426 if (mDataChannel)
00427 return GetWritableFileServer(file, silent);
00428 return GetWritableFileLocal(begin, end, file, silent);
00429 }
00430
00431 String
00432 GetWritableFileLocal(StringListCI begin, StringListCI end, CString file,
00433 bool silent)
00434 {
00435 if (begin == end)
00436 {
00437 ILOG_ERROR("GetWritableFile: no search path");
00438 return String("");
00439 }
00440 String d = StringResolveEnv(*begin);
00441 String path = FileNameConcat(d, file);
00442 ILOG_DEBUG("GetWritableFile: trying " << path);
00443 if (Util::IOBufferFile::FileExists(path))
00444 {
00445 if (! (mOverride || silent))
00446 {
00447 if (!silent)
00448 ILOG_ERROR(path << " already exists, will not override!");
00449 return String("");
00450 }
00451 }
00452 return path;
00453 }
00454
00455 String
00456 GetWritableFileServer(CString file, bool silent)
00457 {
00458 char* buf = mDataChannel->Buffer();
00459 sprintf(buf, "getwritablefile \"%s\" \"%s\" %d %d\0", file.c_str(),
00460 mDataString.c_str(), silent, mOverride);
00461 int len = mDataChannel->SendRequest(strlen(buf)+1);
00462 if (mDataChannel->LastSendHadError())
00463 return "";
00464 buf[len] = 0;
00465 String res(buf);
00466 ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00467 return res;
00468 }
00469
00470 String
00471 ConcatDir(CString dir, CString subDir)
00472 {
00473 return (subDir.empty() ? dir : (dir.empty() ? subDir
00474 : (dir + "/" + subDir)));
00475 }
00476
00477 bool mOverride;
00478 String mDataString;
00479 StringList mData;
00480 String mDataServer;
00481 Util::Channel* mDataChannel;
00482
00483 ILOG_VAR_DEC;
00484 };
00485
00486 ILOG_VAR_INIT(FileSystem, Impala.Persistency);
00487
00488 }
00489 }
00490
00491 #endif