Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

FileSystem.h

Go to the documentation of this file.
00001 #ifndef Impala_Persistency_FileSystem_h
00002 #define Impala_Persistency_FileSystem_h
00003 
00004 #ifdef unix
00005 #include <sys/stat.h>
00006 #include <sys/types.h>
00007 #else
00008 #include <direct.h>
00009 #include <errno.h>
00010 #endif
00011 
00012 #include <vector>
00013 
00014 #include "Basis/FileName.h"
00015 #include "Basis/FileNameTmp.h"
00016 #include "Util/ChannelPool.h"
00017 #include "Util/ReadIOBufferFromChannel.h"
00018 #include "Util/IOBufferFile.h"
00019 #include "Util/IOBufferChannel.h"
00020 
00021 namespace Impala
00022 {
00023 namespace Persistency
00024 {
00025 
00026 
00035 class FileSystem
00036 {
00037 public:
00038 
00039     FileSystem(CString data, CString dataServer)
00040     {
00041         CmdOptions& options = CmdOptions::GetInstance();
00042         mDataString = data;
00043         mData = StringList(mDataString, ';');
00044         mDataServer = dataServer;
00045         mDataChannel = 0;
00046         if (!mDataServer.empty())
00047         {
00048             ILOG_DEBUG_HEADNODE("Retrieving channel to server " << mDataServer);
00049             String passFile = options.GetString("passwordFile");
00050             mDataChannel = Util::ChannelPool::Instance().Get(mDataServer,
00051                                                              passFile);
00052         }
00053         mOverride = options.GetBool("override");
00054 #ifdef WIN32
00055         // printf("%d\n", _getmaxstdio()); // normally 512
00056         _setmaxstdio(2048); // set it to the absolute maximum
00057 #endif
00058     }
00059 
00060     bool
00061     Valid() const
00062     {
00063         return (mDataServer.empty() || mDataChannel);
00064     }
00065 
00066     // For use in data server only : reset data path for new request.
00067     void
00068     SetDataPath(String data)
00069     {
00070         mDataString = data;
00071         mData.clear();
00072         mData = StringList(mDataString, ';');
00073     }
00074 
00075     String
00076     GetDataPath()
00077     {
00078         return mDataString;
00079     }
00080 
00081     // For use in data server only : reset override for new request.
00082     void
00083     SetOverride(bool override)
00084     {
00085         mOverride = override;
00086     }
00087 
00088     // Directory part
00089 
00090     String
00091     MakeDir(CString dir)
00092     {
00093         return DoMkDirPath(dir);
00094     }
00095 
00096     void
00097     DeleteDir(CString dir)
00098     {
00099         if (mDataChannel)
00100         {
00101             char* buf = mDataChannel->Buffer();
00102             sprintf(buf, "deletedir \"%s\" \"%s\"\0", dir.c_str(),
00103                     mDataString.c_str());
00104             mDataChannel->SendRequest(strlen(buf)+1);
00105         }
00106         else
00107         {
00108             DoDeleteDir(dir);
00109         }
00110     }
00111 
00112     // File part
00113 
00114     String
00115     GetFilePath(CString file, bool toWrite, bool silent)
00116     {
00117         if (! Valid())
00118             return "";
00119 
00120         CString path = file;
00121         if (toWrite)
00122             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00123         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00124     }
00125 
00126     String
00127     GetFilePath(CString dir, CString file, bool toWrite, bool silent)
00128     {
00129         if (! Valid())
00130             return "";
00131 
00132         String path = FileNameConcat(dir, file);
00133         if (toWrite)
00134             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00135         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00136     }
00137 
00138     void
00139     DeleteFile(CString path)
00140     {
00141         if (mDataChannel)
00142         {
00143             char* buf = mDataChannel->Buffer();
00144             sprintf(buf, "deletefile \"%s\"\0", path.c_str());
00145             mDataChannel->SendRequest(strlen(buf)+1);
00146         }
00147         else
00148         {
00149             unlink(path.c_str());
00150         }
00151     }
00152 
00173     Util::IOBuffer*
00174     GetIOBuffer(CString fileName, bool readMode, bool useMemoryIfLocal,
00175                 String useLocalFileIfRemote, Int64 useLocalSizeIfRemote = 0,
00176                 bool useIOBufferChannel = false)
00177     {
00178         Util::IOBuffer* res = 0;
00179         ILOG_DEBUG("GetIOBuffer [" << fileName << "]");
00180         if (! Valid())
00181             return res;
00182 
00183         if (mDataChannel)
00184         {
00185             if (readMode)
00186             {
00187                 if (useIOBufferChannel)
00188                 {
00189                     ILOG_DEBUG("GetIOBuffer: Read using Channel to " <<
00190                                mDataServer << ": " << fileName);
00191                     res = new Util::IOBufferChannel(fileName, true,
00192                                                     mDataChannel);
00193                 }
00194                 else
00195                 {
00196                     ILOG_DEBUG("GetIOBuffer: Read using File: " << fileName);
00197                     /* the race condition that occurs when someone passes in
00198                        "tmp" while reading a file is very difficult to track
00199                        down (only happens with MPI-jobs). Therefore, generate 
00200                        a temporary filename even in read mode */
00201                     if (useLocalFileIfRemote == "tmp")
00202                         useLocalFileIfRemote = FileNameTmp();
00203                     res = Util::ReadIOBufferFromChannel(mDataChannel, fileName,
00204                                                         useLocalFileIfRemote);
00205                 }
00206             }
00207             else
00208             {
00209                 if (useIOBufferChannel)
00210                 {
00211                     ILOG_DEBUG("GetIOBuffer: Write using Channel " << fileName);
00212                     res = new Util::IOBufferChannel(fileName, false,
00213                                                     mDataChannel);
00214                 }
00215                 else if (useLocalFileIfRemote.empty())
00216                 {
00217                     ILOG_DEBUG("GetIOBuffer: Write using mem " << fileName);
00218                     res = new Util::IOBuffer(useLocalSizeIfRemote);
00219                 }
00220                 else
00221                 {
00222                     ILOG_DEBUG("GetIOBuffer: Write using File " << fileName);
00223                     if (useLocalFileIfRemote == "tmp")
00224                         useLocalFileIfRemote = FileNameTmp();
00225                     res = new Util::IOBufferFile(useLocalFileIfRemote, false,
00226                                                  false);
00227                 }
00228                 if (!useIOBufferChannel)
00229                     res->SetUseChannel(mDataChannel, fileName,
00230                                        useLocalFileIfRemote);
00231             }
00232         }
00233         else
00234         {
00235             res = new Util::IOBufferFile(fileName, readMode, useMemoryIfLocal);
00236         }
00237 
00238         if (res)
00239         {
00240             if (!res->Valid())
00241             {
00242                 delete res;
00243                 res = 0;
00244             }
00245         }
00246         return res;
00247     }
00248 
00249     Util::Channel*
00250     GetDataChannel()
00251     {
00252         return mDataChannel;
00253     }
00254 
00255     void
00256     Dump() const
00257     {
00258         ILOG_INFO_HEADNODE("Dump: data=[" << mDataString << "] server=[" <<
00259                        mDataServer << "]");
00260     }
00261 
00262 private:
00263 
00264     // Utilities
00265 
00266     void
00267     DoMkDir(CString dir)
00268     {
00269 #ifdef unix
00270         mode_t m = 0755;
00271         mkdir(dir.c_str(), m);
00272 #else
00273         if ((_mkdir(StringReplaceAll(dir, "/", "\\").c_str()) != 0) && 
00274             (errno != EEXIST))
00275         {
00276             ILOG_ERROR("[DoMkDir] Failed to create directory: " << dir << 
00277                        " (" << strerror(errno) << ")");
00278         }
00279 #endif
00280     }
00281 
00282     String
00283     DoMkDirPath(CString dir)
00284     {
00285         if (mDataChannel)
00286             return DoMkDirPathServer(dir);
00287 
00288         return DoMkDirPathLocal(ConcatDir(*mData.begin(), dir));
00289     }
00290 
00291     String
00292     DoMkDirPathLocal(CString dirWithBackSlashes)
00293     {
00294         String dir = StringReplaceAll(dirWithBackSlashes, "\\", "/");
00295         ILOG_DEBUG("DoMkDirPath: " << dir);
00296 
00297         String path = "";
00298         if (dir[0] == '/')
00299         {
00300 #ifdef WIN32
00301             if (dir.find("//") == 0)
00302             {
00303                 // construct path head upto and including fourth slash
00304                 int headEndPos = 1;
00305                 int nextSlashPos = dir.find("/", headEndPos + 1);
00306                 if (nextSlashPos != String::npos)
00307                 {
00308                     headEndPos = nextSlashPos;
00309                     nextSlashPos = dir.find("/", headEndPos + 1);
00310                     if (nextSlashPos != String::npos)
00311                         headEndPos = nextSlashPos;
00312                 }
00313                 path = dir.substr(0, headEndPos + 1);
00314                 dir = dir.substr(headEndPos + 1);
00315             }
00316             else
00317 #endif
00318                 path = String("/");
00319         }
00320         
00321         StringList subDirs(dir, '/');
00322         for (StringListCI i=subDirs.begin() ; i!=subDirs.end() ; i++)
00323         {
00324             CString subDir = *i;
00325             ILOG_DEBUG("  DoMkDirPath: subDir = " << subDir);
00326             if (subDir.size() == 0 || subDir == ".")
00327                 continue;
00328             if ((path.size() > 0) && (*path.rbegin() != '/'))
00329                 path = path + "/";
00330             path = path + *i;
00331             DoMkDir(path);
00332         }
00333         return path;
00334     }
00335 
00336     String
00337     DoMkDirPathServer(CString dir)
00338     {
00339         char* buf = mDataChannel->Buffer();
00340         sprintf(buf, "domkdirpath \"%s\" \"%s\"\0", dir.c_str(),
00341                 mDataString.c_str());
00342         int len = mDataChannel->SendRequest(strlen(buf)+1);
00343         if (mDataChannel->LastSendHadError())
00344             return "";
00345         buf[len] = 0;
00346         String res(buf);
00347         ILOG_DEBUG("Got dir [" << res << "]");
00348         return res;
00349     }
00350 
00351     void
00352     DoDeleteDir(String dir)
00353     {
00354         dir = ConcatDir(*mData.begin(), dir);
00355 #ifdef WIN32
00356         dir = StringReplaceAll(dir, "/", "\\", true);
00357         String cmdLine("if exist " + dir + " rmdir /s/q " + dir);
00358 #else
00359         while (StringEndsWith(dir, "/."))
00360             dir = dir.substr(0, dir.size()-2);
00361         String cmdLine("rm -rf " + dir);
00362 #endif
00363         char sysBuf[2048];
00364         ILOG_INFO("[DoDeleteDir] " << cmdLine);
00365         sprintf(sysBuf, "%s", cmdLine.c_str());
00366         //
00367         int sysRes = system(sysBuf);
00368         if (sysRes != 0)
00369         {
00370             ILOG_ERROR("[DoDeleteDir] System call failed: ["  << sysBuf << "]");
00371         }
00372         //
00373     }
00374 
00375     String
00376     GetReadableFile(StringListCI begin, StringListCI end, CString file,
00377                     bool silent)
00378     {
00379         if (mDataChannel)
00380             return GetReadableFileServer(file, silent);
00381         return GetReadableFileLocal(begin, end, file, silent);
00382     }
00383 
00384     String
00385     GetReadableFileLocal(StringListCI begin, StringListCI end, CString file,
00386                          bool silent)
00387     {
00388         StringListCI i;
00389         for (i=begin ; i!=end ; i++)
00390         {
00391             String d = StringResolveEnv(*i);
00392             String path = FileNameConcat(d, file);
00393             ILOG_DEBUG("GetReadableFile: trying " << path);
00394             if (Util::IOBufferFile::FileExists(path))
00395                 return path;
00396         }
00397 
00398         if (!silent)
00399         {
00400             ILOG_ERROR("Unable to find " << file << " in path ");
00401             for (i=begin ; i!=end ; i++)
00402                 ILOG_INFO("    " << *i);
00403         }
00404         return String("");
00405     }
00406 
00407     String
00408     GetReadableFileServer(CString file, bool silent)
00409     {
00410         char* buf = mDataChannel->Buffer();
00411         sprintf(buf, "getreadablefile \"%s\" \"%s\" %d\0", file.c_str(),
00412                 mDataString.c_str(), silent);
00413         int len = mDataChannel->SendRequest(strlen(buf)+1);
00414         if (mDataChannel->LastSendHadError())
00415             return "";
00416         buf[len] = 0;
00417         String res(buf);
00418         ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00419         return res;
00420     }
00421 
00422     String
00423     GetWritableFile(StringListCI begin, StringListCI end, CString file,
00424                     bool silent)
00425     {
00426         if (mDataChannel)
00427             return GetWritableFileServer(file, silent);
00428         return GetWritableFileLocal(begin, end, file, silent);
00429     }
00430 
00431     String
00432     GetWritableFileLocal(StringListCI begin, StringListCI end, CString file,
00433                          bool silent)
00434     {
00435         if (begin == end)
00436         {
00437             ILOG_ERROR("GetWritableFile: no search path");
00438             return String("");
00439         }
00440         String d = StringResolveEnv(*begin);
00441         String path = FileNameConcat(d, file);
00442         ILOG_DEBUG("GetWritableFile: trying " << path);
00443         if (Util::IOBufferFile::FileExists(path))
00444         {
00445             if (! (mOverride || silent))   // silent implies "just checking"
00446             {
00447                 if (!silent)
00448                     ILOG_ERROR(path << " already exists, will not override!");
00449                 return String("");
00450             }
00451         }
00452         return path;
00453     }
00454 
00455     String
00456     GetWritableFileServer(CString file, bool silent)
00457     {
00458         char* buf = mDataChannel->Buffer();
00459         sprintf(buf, "getwritablefile \"%s\" \"%s\" %d %d\0", file.c_str(),
00460                 mDataString.c_str(), silent, mOverride);
00461         int len = mDataChannel->SendRequest(strlen(buf)+1);
00462         if (mDataChannel->LastSendHadError())
00463             return "";
00464         buf[len] = 0;
00465         String res(buf);
00466         ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00467         return res;
00468     }
00469 
00470     String
00471     ConcatDir(CString dir, CString subDir)
00472     {
00473         return (subDir.empty() ? dir : (dir.empty() ? subDir
00474                                                     : (dir + "/" + subDir)));
00475     }
00476 
00477     bool           mOverride;
00478     String         mDataString;
00479     StringList     mData;
00480     String         mDataServer;
00481     Util::Channel* mDataChannel;
00482 
00483     ILOG_VAR_DEC;
00484 };
00485 
00486 ILOG_VAR_INIT(FileSystem, Impala.Persistency);
00487 
00488 } // namespace Persistency
00489 } // namespace Impala
00490 
00491 #endif

Generated on Thu Jan 13 09:05:03 2011 for ImpalaSrc by  doxygen 1.5.1