Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src


Go to the documentation of this file.
00001 #ifndef Impala_Persistency_FileSystem_h
00002 #define Impala_Persistency_FileSystem_h
00004 #ifdef unix
00005 #include <sys/stat.h>
00006 #include <sys/types.h>
00007 #else
00008 #include <direct.h>
00009 #endif
00011 #include <vector>
00012 #include "Basis/FileName.h"
00013 #include "Basis/FileNameTmp.h"
00014 #include "Util/ChannelPool.h"
00015 #include "Util/ReadIOBufferFromChannel.h"
00016 #include "Util/IOBufferFile.h"
00017 #include "Util/IOBufferChannel.h"
00019 namespace Impala
00020 {
00021 namespace Persistency
00022 {
00033 class FileSystem
00034 {
00035 public:
00037     FileSystem(CString data, CString dataServer)
00038     {
00039         CmdOptions& options = CmdOptions::GetInstance();
00040         mDataString = data;
00041         mData = StringList(mDataString, ';');
00042         mDataServer = dataServer;
00043         mDataChannel = 0;
00044         if (!mDataServer.empty())
00045         {
00046             ILOG_INFO_ONCE("Retrieving channel to server " << mDataServer);
00047             String passFile = options.GetString("passwordFile");
00048             mDataChannel = Util::ChannelPool::Instance().Get(mDataServer,
00049                                                              passFile);
00050         }
00051         mOverride = options.GetBool("override");
00052 #ifdef WIN32
00053         // printf("%d\n", _getmaxstdio()); // normally 512
00054         _setmaxstdio(2048); // set it to the absolute maximum
00055 #endif
00056     }
00058     // For use in data server only : reset data path for new request.
00059     void
00060     SetDataPath(String data)
00061     {
00062         mDataString = data;
00063         mData.clear();
00064         mData = StringList(mDataString, ';');
00065     }
00067     String
00068     GetDataPath()
00069     {
00070         return mDataString;
00071     }
00073     // For use in data server only : reset override for new request.
00074     void
00075     SetOverride(bool override)
00076     {
00077         mOverride = override;
00078     }
00080     // Directory part
00082     String
00083     MakeDir(CString dir)
00084     {
00085         return DoMkDirPath(dir);
00086     }
00088     // File part
00090     String
00091     GetFilePath(CString file, bool toWrite, bool silent)
00092     {
00093         CString path = file;
00094         if (toWrite)
00095             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00096         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00097     }
00099     String
00100     GetFilePath(CString dir, CString file, bool toWrite, bool silent)
00101     {
00102         String path = FileNameConcat(dir, file);
00103         if (toWrite)
00104             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00105         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00106     }
00128     Util::IOBuffer*
00129     GetIOBuffer(CString fileName, bool readMode, bool useMemoryIfLocal,
00130                 String useLocalFileIfRemote, Int64 useLocalSizeIfRemote = 0,
00131                 bool useIOBufferChannel = false)
00132     {
00133         Util::IOBuffer* res = 0;
00134         ILOG_DEBUG("GetIOBuffer [" << fileName << "]");
00135         if (mDataChannel)
00136         {
00137             if (readMode)
00138             {
00139                 if (useIOBufferChannel)
00140                 {
00141                     ILOG_DEBUG("GetIOBuffer: Read using Channel to " <<
00142                                mDataServer << ": " << fileName);
00143                     res = new Util::IOBufferChannel(fileName, true,
00144                                                     mDataChannel);
00145                 }
00146                 else
00147                 {
00148                     ILOG_DEBUG("GetIOBuffer: Read using File: " << fileName);
00149                     /* the race condition that occurs when someone passes in
00150                        "tmp" while reading a file is very difficult to track
00151                        down (only happens with MPI-jobs). Therefore, generate 
00152                        a temporary filename even in read mode */
00153                     if (useLocalFileIfRemote == "tmp")
00154                         useLocalFileIfRemote = FileNameTmp();
00155                     res = Util::ReadIOBufferFromChannel(mDataChannel, fileName,
00156                                                         useLocalFileIfRemote);
00157                 }
00158             }
00159             else
00160             {
00161                 if (useIOBufferChannel)
00162                 {
00163                     ILOG_DEBUG("GetIOBuffer: Write using Channel " << fileName);
00164                     res = new Util::IOBufferChannel(fileName, false,
00165                                                     mDataChannel);
00166                 }
00167                 else if (useLocalFileIfRemote.empty())
00168                 {
00169                     ILOG_DEBUG("GetIOBuffer: Write using mem " << fileName);
00170                     res = new Util::IOBuffer(useLocalSizeIfRemote);
00171                 }
00172                 else
00173                 {
00174                     ILOG_DEBUG("GetIOBuffer: Write using File " << fileName);
00175                     if (useLocalFileIfRemote == "tmp")
00176                         useLocalFileIfRemote = FileNameTmp();
00177                     res = new Util::IOBufferFile(useLocalFileIfRemote, false,
00178                                                  false);
00179                 }
00180                 if (!useIOBufferChannel)
00181                     res->SetUseChannel(mDataChannel, fileName,
00182                                        useLocalFileIfRemote);
00183             }
00184         }
00185         else
00186         {
00187             res = new Util::IOBufferFile(fileName, readMode, useMemoryIfLocal);
00188         }
00190         if (res)
00191         {
00192             if (!res->Valid())
00193             {
00194                 delete res;
00195                 res = 0;
00196             }
00197         }
00198         return res;
00199     }
00201     Util::Channel*
00202     GetDataChannel()
00203     {
00204         return mDataChannel;
00205     }
00207     void
00208     Dump() const
00209     {
00210         ILOG_INFO_ONCE("Dump: data=[" << mDataString << "] server=[" <<
00211                        mDataServer << "]");
00212     }
00214 private:
00216     // Utilities
00218     void
00219     DoMkDir(CString dir)
00220     {
00221 #ifdef unix
00222         mode_t m = 0755;
00223         mkdir(dir.c_str(), m);
00224 #else
00225         mkdir(dir.c_str());
00226 #endif
00227     }
00229     String
00230     DoMkDirPath(CString dir)
00231     {
00232         if (mDataChannel)
00233             return DoMkDirPathServer(dir);
00235         return DoMkDirPathLocal(ConcatDir(*mData.begin(), dir));
00236     }
00238     String
00239     DoMkDirPathLocal(CString dirWithBackSlashes)
00240     {
00241         String dir = StringReplaceAll(dirWithBackSlashes, "\\", "/");
00242         ILOG_DEBUG("DoMkDirPath: " << dir);
00243         StringList subDirs(dir, '/');
00244         String path("");
00245         if (dir[0] == '/')
00246             path = String("/");
00247         for (StringListCI i=subDirs.begin() ; i!=subDirs.end() ; i++)
00248         {
00249             CString subDir = *i;
00250             ILOG_DEBUG("  DoMkDirPath: subDir = " << subDir);
00251             if (subDir.size() == 0)
00252                 continue;
00253             if ((path.size() > 0) && (*path.rbegin() != '/'))
00254                 path = path + "/";
00255             path = path + *i;
00256             DoMkDir(path);
00257         }
00258         return path;
00259     }
00261     String
00262     DoMkDirPathServer(CString dir)
00263     {
00264         char* buf = mDataChannel->Buffer();
00265         sprintf(buf, "domkdirpath \"%s\" \"%s\"\0", dir.c_str(),
00266                 mDataString.c_str());
00267         int len = mDataChannel->SendRequest(strlen(buf)+1);
00268         if (mDataChannel->LastSendHadError())
00269             return "";
00270         buf[len] = 0;
00271         String res(buf);
00272         ILOG_DEBUG("Got dir [" << res << "]");
00273         return res;
00274     }
00276     String
00277     GetReadableFile(StringListCI begin, StringListCI end, CString file,
00278                     bool silent)
00279     {
00280         if (mDataChannel)
00281             return GetReadableFileServer(file, silent);
00282         return GetReadableFileLocal(begin, end, file, silent);
00283     }
00285     String
00286     GetReadableFileLocal(StringListCI begin, StringListCI end, CString file,
00287                          bool silent)
00288     {
00289         StringListCI i;
00290         for (i=begin ; i!=end ; i++)
00291         {
00292             String path = FileNameConcat(*i, file);
00293             ILOG_DEBUG("GetReadableFile: trying " << path);
00294             if (Util::IOBufferFile::FileExists(path))
00295                 return path;
00296         }
00298         if (!silent)
00299         {
00300             ILOG_ERROR("Unable to find " << file << " in path ");
00301             for (i=begin ; i!=end ; i++)
00302                 ILOG_INFO("    " << *i);
00303         }
00304         return String("");
00305     }
00307     String
00308     GetReadableFileServer(CString file, bool silent)
00309     {
00310         char* buf = mDataChannel->Buffer();
00311         sprintf(buf, "getreadablefile \"%s\" \"%s\" %d\0", file.c_str(),
00312                 mDataString.c_str(), silent);
00313         int len = mDataChannel->SendRequest(strlen(buf)+1);
00314         if (mDataChannel->LastSendHadError())
00315             return "";
00316         buf[len] = 0;
00317         String res(buf);
00318         ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00319         return res;
00320     }
00322     String
00323     GetWritableFile(StringListCI begin, StringListCI end, CString file,
00324                     bool silent)
00325     {
00326         if (mDataChannel)
00327             return GetWritableFileServer(file, silent);
00328         return GetWritableFileLocal(begin, end, file, silent);
00329     }
00331     String
00332     GetWritableFileLocal(StringListCI begin, StringListCI end, CString file,
00333                          bool silent)
00334     {
00335         if (begin == end)
00336         {
00337             ILOG_ERROR("GetWritableFile: no search path");
00338             return String("");
00339         }
00340         String path = FileNameConcat(*begin, file);
00341         ILOG_DEBUG("GetWritableFile: trying " << path);
00342         if (Util::IOBufferFile::FileExists(path))
00343         {
00344             if (! mOverride)
00345             {
00346                 if (!silent)
00347                     ILOG_ERROR(path << " already exists, will not override!");
00348                 return String("");
00349             }
00350         }
00351         return path;
00352     }
00354     String
00355     GetWritableFileServer(CString file, bool silent)
00356     {
00357         char* buf = mDataChannel->Buffer();
00358         sprintf(buf, "getwritablefile \"%s\" \"%s\" %d %d\0", file.c_str(),
00359                 mDataString.c_str(), silent, mOverride);
00360         int len = mDataChannel->SendRequest(strlen(buf)+1);
00361         if (mDataChannel->LastSendHadError())
00362             return "";
00363         buf[len] = 0;
00364         String res(buf);
00365         ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00366         return res;
00367     }
00369     String
00370     ConcatDir(CString dir, CString subDir)
00371     {
00372         return (subDir.empty() ? dir : (dir.empty() ? subDir
00373                                                     : (dir + "/" + subDir)));
00374     }
00376     bool           mOverride;
00377     String         mDataString;
00378     StringList     mData;
00379     String         mDataServer;
00380     Util::Channel* mDataChannel;
00382     ILOG_VAR_DEC;
00383 };
00385 ILOG_VAR_INIT(FileSystem, Impala.Persistency);
00387 } // namespace Persistency
00388 } // namespace Impala
00390 #endif

Generated on Fri Mar 19 09:31:43 2010 for ImpalaSrc by  doxygen 1.5.1