Home || Visual Search || Applications || Architecture || Important Messages || OGL || Src

Database.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_Database_h
00002 #define Impala_Util_Database_h
00003 
00004 #ifndef REPOSITORY_USED // Here comes the deprecated stuff
00005 
00006 #ifdef unix
00007 #include <sys/stat.h>
00008 #include <sys/types.h>
00009 #include <sys/resource.h>
00010 #else
00011 #include <direct.h>
00012 #include <errno.h>
00013 #endif
00014 
00015 #include <vector>
00016 
00017 #include "Basis/CmdOptions.h"
00018 #include "Basis/FileNameTmp.h"
00019 #include "Util/ChannelPool.h"
00020 #include "Util/ReadIOBufferFromChannel.h"
00021 #include "Util/IOBufferFile.h"
00022 #include "Util/IOBufferChannel.h"
00023 
00024 namespace Impala
00025 {
00026 namespace Util
00027 {
00028 
00029 
00030 class Database
00031 {
00032 public:
00033 
00034     static Database&
00035     GetInstance()
00036     {
00037         static Database theDatabase;
00038         return theDatabase;
00039     }
00040 
00041     Database()
00042     {
00043         Init("");
00044     }
00045 
00046     Database(String setName)
00047     {
00048         Init(setName);
00049     }
00050 
00051     // For use in data server only : reset data path for new request.
00052     void
00053     SetDataPath(String data)
00054     {
00055         mDataString = data;
00056         mData.clear();
00057         mData = StringList(mDataString, ';');
00058     }
00059 
00060     String
00061     GetDataPath()
00062     {
00063         return mDataString;
00064     }
00065 
00066     // For use in data server only : reset override for new request.
00067     void
00068     SetOverride(bool override)
00069     {
00070         mOverride = override;
00071     }
00072 
00073     // Any data directory part
00074 
00075     String
00076     MakeDir(CString dir)
00077     {
00078         return DoMkDirPath(dir);
00079     }
00080 
00081     String
00082     MakeDir(CString dir1, CString dir2)
00083     {
00084         return MakeDir(ConcatDir(dir1, dir2));
00085     }
00086 
00087     String
00088     MakeDir(CString dir1, CString dir2, CString dir3)
00089     {
00090         return MakeDir(dir1, ConcatDir(dir2, dir3));
00091     }
00092 
00093     String
00094     MakeDir(CString dir1, CString dir2, CString dir3, CString dir4)
00095     {
00096         return MakeDir(dir1, dir2, ConcatDir(dir3, dir4));
00097     }
00098 
00099     String
00100     MakeDir(CString dir1, CString dir2, CString dir3, CString dir4, CString dir5)
00101     {
00102         return MakeDir(dir1, dir2, dir3, ConcatDir(dir4, dir5));
00103     }
00104 
00105     // Any data file part
00106 
00107     String
00108     GetFilePath(CString file, bool toWrite, bool silent)
00109     {
00110         CString path = file;
00111         if (toWrite)
00112             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00113         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00114     }
00115 
00116     String
00117     GetFilePath(CString dir, CString file, bool toWrite, bool silent)
00118     {
00119         String path = dir + "/" + file;
00120         if (toWrite)
00121             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00122         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00123     }
00124 
00125     String
00126     GetFilePath(CString dir1, CString dir2, CString file, bool toWrite,
00127                 bool silent)
00128     {
00129         String path = dir1 + "/" + dir2 + "/" + file;
00130         if (toWrite)
00131             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00132         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00133     }
00134 
00135     String
00136     GetFilePath(CString dir1, CString dir2, CString dir3, CString file,
00137                 bool toWrite, bool silent)
00138     {
00139         String path = dir1 + "/" + dir2 + "/" + dir3 + "/" + file;
00140         if (toWrite)
00141             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00142         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00143     }
00144 
00145     String
00146     GetFilePath(CString dir1, CString dir2, CString dir3, CString dir4,
00147                 CString file, bool toWrite, bool silent)
00148     {
00149         String path = dir1 + "/" + dir2 + "/" + dir3 + "/" + dir4 + "/" + file;
00150         if (toWrite)
00151             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00152         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00153     }
00154 
00155     String
00156     GetFilePath(CString dir1, CString dir2, CString dir3, CString dir4,
00157                 CString dir5, CString file, bool toWrite, bool silent)
00158     {
00159         String path = dir1 + "/" + dir2 + "/" + dir3 + "/" + dir4 + "/"
00160             + dir5 + "/" + file;
00161         if (toWrite)
00162             return GetWritableFile(mData.begin(), mData.end(), path, silent);
00163         return GetReadableFile(mData.begin(), mData.end(), path, silent);
00164     }
00165 
00166     void
00167     DeleteFile(CString path)
00168     {
00169         if (mDataChannel)
00170         {
00171             char* buf = mDataChannel->Buffer();
00172             sprintf(buf, "deletefile \"%s\"\0", path.c_str());
00173             mDataChannel->SendRequest(strlen(buf)+1);
00174         }
00175         else
00176         {
00177             unlink(path.c_str());
00178         }
00179     }
00180 
00200     IOBuffer*
00201     GetIOBuffer(CString fileName, bool readMode, bool useMemoryIfLocal,
00202                 String useLocalFileIfRemote, Int64 useLocalSizeIfRemote = 0,
00203                 bool useIOBufferChannel = false)
00204     {
00205         IOBuffer* res = 0;
00206         ILOG_DEBUG("GetIOBuffer [" << fileName << "]");
00207         if (mDataChannel)
00208         {
00209             if (readMode)
00210             {
00211                 if (useIOBufferChannel)
00212                 {
00213                     ILOG_DEBUG("GetIOBuffer: Read using Channel to " <<
00214                                mDataServer << ": " << fileName);
00215                     res = new IOBufferChannel(fileName, true, mDataChannel);
00216                 }
00217                 else
00218                 {
00219                     ILOG_DEBUG("GetIOBuffer: Read using File: " << fileName);
00220                     /* the race condition that occurs when someone passes in
00221                        "tmp" while reading a file is very difficult to track
00222                        down (only happens with MPI-jobs). Therefore, generate 
00223                        a temporary filename even in read mode */
00224                     if (useLocalFileIfRemote == "tmp")
00225                         useLocalFileIfRemote = FileNameTmp();
00226                     res = ReadIOBufferFromChannel(mDataChannel, fileName,
00227                                                   useLocalFileIfRemote);
00228                 }
00229             }
00230             else
00231             {
00232                 if (useIOBufferChannel)
00233                 {
00234                     ILOG_DEBUG("GetIOBuffer: Write using Channel " << fileName);
00235                     res = new IOBufferChannel(fileName, false, mDataChannel);
00236                 }
00237                 else if (useLocalFileIfRemote.empty())
00238                 {
00239                     ILOG_DEBUG("GetIOBuffer: Write using mem " << fileName);
00240                     res = new IOBuffer(useLocalSizeIfRemote);
00241                 }
00242                 else
00243                 {
00244                     ILOG_DEBUG("GetIOBuffer: Write using File " << fileName);
00245                     if (useLocalFileIfRemote == "tmp")
00246                         useLocalFileIfRemote = FileNameTmp();
00247                     res = new IOBufferFile(useLocalFileIfRemote, false, false);
00248                 }
00249                 if (!useIOBufferChannel)
00250                     res->SetUseChannel(mDataChannel, fileName,
00251                                        useLocalFileIfRemote);
00252             }
00253         }
00254         else
00255         {
00256             res = new IOBufferFile(fileName, readMode, useMemoryIfLocal);
00257         }
00258 
00259         if (res)
00260         {
00261             if (!res->Valid())
00262             {
00263                 delete res;
00264                 res = 0;
00265             }
00266         }
00267         return res;
00268     }
00269 
00270     Channel*
00271     GetDataChannel()
00272     {
00273         return mDataChannel;
00274     }
00275 
00276     void
00277     Dump() const
00278     {
00279         ILOG_INFO_HEADNODE("Dump: data=[" << mDataString << "] server=[" <<
00280                        mDataServer << "]");
00281     }
00282 
00283 private:
00284 
00285     void
00286     Init(String setName)
00287     {
00288         CmdOptions& options = CmdOptions::GetInstance();
00289         mDataString = StringReplaceAll(options.GetString("data"), ":", ";", false);
00290         ILOG_DEBUG("'data' option externally specified: " << mDataString);
00291         String additionalData = options.GetString("data::"+setName, "");
00292         if (additionalData.empty() && !setName.empty())
00293             additionalData = "../" + FileNameBase(setName);
00294         if (!additionalData.empty())
00295         {
00296             mDataString = additionalData + ";" + mDataString;
00297             ILOG_INFO_HEADNODE("'data' option for " << setName <<
00298                            " has been extended: " << mDataString);
00299         }
00300         mData = StringList(mDataString, ';');
00301         mDataServer = options.GetString("dataServer");
00302         String replaceDataServer = options.GetString("dataServer::"+setName, "");
00303         if (!replaceDataServer.empty())
00304         {
00305             mDataServer = replaceDataServer;
00306             ILOG_INFO_HEADNODE("server for " << setName << " is " << mDataServer);
00307         }
00308         mDataChannel = 0;
00309         if (!mDataServer.empty())
00310         {
00311             ILOG_INFO_HEADNODE("Retrieving channel to server " << mDataServer);
00312             String passFile = options.GetString("passwordFile");
00313             mDataChannel = ChannelPool::Instance().Get(mDataServer, passFile);
00314         }
00315         mOverride = options.GetBool("override");
00316 #ifdef WIN32
00317         // printf("%d\n", _getmaxstdio()); // normally 512
00318         _setmaxstdio(2048); // set it to the absolute maximum
00319 #endif
00320 #ifdef unix
00321         rlimit lim;
00322         getrlimit(RLIMIT_NOFILE, &lim);
00323         lim.rlim_cur = 2048; // will succeed on image, down to 1024(?) on linux
00324         setrlimit(RLIMIT_NOFILE, &lim);
00325 #endif
00326     }
00327 
00328     // Utilities
00329 
00330     void
00331     DoMkDir(CString dir)
00332     {
00333 #ifdef unix
00334         mode_t m = 0755;
00335         mkdir(dir.c_str(), m);
00336 #else
00337         if (_mkdir(StringReplaceAll(dir, "/", "\\").c_str()) != 0 && 
00338             errno != EEXIST)
00339             ILOG_ERROR("[DoMkDir] Failed to create directory: " << dir << 
00340                        " (" << strerror(errno) << ")");
00341 #endif
00342     }
00343 
00344     String
00345     DoMkDirPath(CString dir)
00346     {
00347         if (mDataChannel)
00348             return DoMkDirPathServer(dir);
00349 
00350         return DoMkDirPathLocal(ConcatDir(*mData.begin(), dir));
00351     }
00352 
00353     String
00354     DoMkDirPathLocal(CString dirWithBackSlashes)
00355     {
00356         String dir = StringReplaceAll(dirWithBackSlashes, "\\", "/");
00357         ILOG_DEBUG("DoMkDirPath: " << dir);
00358 
00359         String path = "";
00360         if (dir[0] == '/')
00361         {
00362 #ifdef WIN32
00363             if (dir.find("//") == 0)
00364             {
00365                 // construct path head upto and including fourth slash
00366                 int headEndPos = 1;
00367                 int nextSlashPos = dir.find("/", headEndPos + 1);
00368                 if (nextSlashPos != String::npos)
00369                 {
00370                     headEndPos = nextSlashPos;
00371                     nextSlashPos = dir.find("/", headEndPos + 1);
00372                     if (nextSlashPos != String::npos)
00373                         headEndPos = nextSlashPos;
00374                 }
00375                 path = dir.substr(0, headEndPos + 1);
00376                 dir = dir.substr(headEndPos + 1);
00377             }
00378             else
00379 #endif
00380                 path = String("/");
00381         }
00382         
00383         StringList subDirs(dir, '/');
00384         for (StringListCI i=subDirs.begin() ; i!=subDirs.end() ; i++)
00385         {
00386             CString subDir = *i;
00387             ILOG_DEBUG("  DoMkDirPath: subDir = " << subDir);
00388             if (subDir.size() == 0 || subDir == ".")
00389                 continue;
00390             if ((path.size() > 0) && (*path.rbegin() != '/'))
00391                 path = path + "/";
00392             path = path + *i;
00393             DoMkDir(path);
00394         }
00395         return path;
00396     }
00397 
00398     String
00399     DoMkDirPathServer(CString dir)
00400     {
00401         char* buf = mDataChannel->Buffer();
00402         sprintf(buf, "domkdirpath \"%s\" \"%s\"\0", dir.c_str(), mDataString.c_str());
00403         int len = mDataChannel->SendRequest(strlen(buf)+1);
00404         if (mDataChannel->LastSendHadError())
00405             return "";
00406         buf[len] = 0;
00407         String res(buf);
00408         ILOG_DEBUG("Got dir [" << res << "]");
00409         return res;
00410     }
00411 
00412     String
00413     GetReadableFile(StringListCI begin, StringListCI end, CString file,
00414                     bool silent)
00415     {
00416         if (mDataChannel)
00417             return GetReadableFileServer(file, silent);
00418         return GetReadableFileLocal(begin, end, file, silent);
00419     }
00420 
00421     String
00422     GetReadableFileLocal(StringListCI begin, StringListCI end, CString file,
00423                          bool silent)
00424     {
00425         StringListCI i;
00426         for (i=begin ; i!=end ; i++)
00427         {
00428             String d = StringResolveEnv(*i);
00429             String path = FileNameConcat(d, file);
00430             ILOG_DEBUG("GetReadableFile: trying " << path);
00431             if (IOBufferFile::FileExists(path))
00432                 return path;
00433         }
00434 
00435         if (!silent)
00436         {
00437             ILOG_ERROR("Unable to find " << file << " in path ");
00438             for (i=begin ; i!=end ; i++)
00439                 ILOG_INFO("    " << *i);
00440         }
00441         return String("");
00442     }
00443 
00444     String
00445     GetReadableFileServer(CString file, bool silent)
00446     {
00447         char* buf = mDataChannel->Buffer();
00448         sprintf(buf, "getreadablefile \"%s\" \"%s\" %d\0", file.c_str(),
00449                 mDataString.c_str(), silent);
00450         int len = mDataChannel->SendRequest(strlen(buf)+1);
00451         if (mDataChannel->LastSendHadError())
00452             return "";
00453         buf[len] = 0;
00454         String res(buf);
00455         ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00456         return res;
00457     }
00458 
00459     String
00460     GetWritableFile(StringListCI begin, StringListCI end, CString file,
00461                     bool silent)
00462     {
00463         if (mDataChannel)
00464             return GetWritableFileServer(file, silent);
00465         return GetWritableFileLocal(begin, end, file, silent);
00466     }
00467 
00468     String
00469     GetWritableFileLocal(StringListCI begin, StringListCI end, CString file,
00470                          bool silent)
00471     {
00472         if (begin == end)
00473         {
00474             ILOG_ERROR("GetWritableFile: no search path");
00475             return String("");
00476         }
00477         String d = StringResolveEnv(*begin);
00478         String path = FileNameConcat(d, file);
00479         ILOG_DEBUG("GetWritableFile: trying " << path);
00480         if (IOBufferFile::FileExists(path))
00481         {
00482             if (! (mOverride || silent))   // silent implies "just checking"
00483             {
00484                 if (!silent)
00485                     ILOG_ERROR(path << " already exists, will not override!");
00486                 return String("");
00487             }
00488         }
00489         return path;
00490     }
00491 
00492     String
00493     GetWritableFileServer(CString file, bool silent)
00494     {
00495         char* buf = mDataChannel->Buffer();
00496         sprintf(buf, "getwritablefile \"%s\" \"%s\" %d %d\0", file.c_str(),
00497                 mDataString.c_str(), silent, mOverride);
00498         int len = mDataChannel->SendRequest(strlen(buf)+1);
00499         if (mDataChannel->LastSendHadError())
00500             return "";
00501         buf[len] = 0;
00502         String res(buf);
00503         ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00504         return res;
00505     }
00506 
00507     String ConcatDir(CString dir, CString subDir)
00508     {
00509         return (subDir.empty() ? dir : (dir.empty() ? subDir : (dir + "/" + subDir)));
00510     }
00511 
00512     bool        mOverride;
00513     String      mDataString;
00514     StringList  mData;
00515     String      mDataServer;
00516     Channel*    mDataChannel;
00517 
00518     ILOG_VAR_DEC;
00519 };
00520 
00521 ILOG_VAR_INIT(Database, Impala.Util);
00522 
00523 } // namespace Util
00524 } // namespace Impala
00525 
00526 #endif // REPOSITORY_USED
00527 
00528 #endif

Generated on Thu Jan 13 09:05:14 2011 for ImpalaSrc by  doxygen 1.5.1