00001 #ifndef Impala_Persistency_FileSystem_h
00002 #define Impala_Persistency_FileSystem_h
00003
00004 #ifdef unix
00005 #include <sys/stat.h>
00006 #include <sys/types.h>
00007 #else
00008 #include <direct.h>
00009 #endif
00010
00011 #include <vector>
00012 #include "Basis/FileName.h"
00013 #include "Basis/FileNameTmp.h"
00014 #include "Util/ChannelPool.h"
00015 #include "Util/ReadIOBufferFromChannel.h"
00016 #include "Util/IOBufferFile.h"
00017 #include "Util/IOBufferChannel.h"
00018
00019 namespace Impala
00020 {
00021 namespace Persistency
00022 {
00023
00024
00033 class FileSystem
00034 {
00035 public:
00036
00037 FileSystem(CString data, CString dataServer)
00038 {
00039 CmdOptions& options = CmdOptions::GetInstance();
00040 mDataString = data;
00041 mData = StringList(mDataString, ';');
00042 mDataServer = dataServer;
00043 mDataChannel = 0;
00044 if (!mDataServer.empty())
00045 {
00046 ILOG_INFO_ONCE("Retrieving channel to server " << mDataServer);
00047 String passFile = options.GetString("passwordFile");
00048 mDataChannel = Util::ChannelPool::Instance().Get(mDataServer,
00049 passFile);
00050 }
00051 mOverride = options.GetBool("override");
00052 #ifdef WIN32
00053
00054 _setmaxstdio(2048);
00055 #endif
00056 }
00057
00058
00059 void
00060 SetDataPath(String data)
00061 {
00062 mDataString = data;
00063 mData.clear();
00064 mData = StringList(mDataString, ';');
00065 }
00066
00067 String
00068 GetDataPath()
00069 {
00070 return mDataString;
00071 }
00072
00073
00074 void
00075 SetOverride(bool override)
00076 {
00077 mOverride = override;
00078 }
00079
00080
00081
00082 String
00083 MakeDir(CString dir)
00084 {
00085 return DoMkDirPath(dir);
00086 }
00087
00088
00089
00090 String
00091 GetFilePath(CString file, bool toWrite, bool silent)
00092 {
00093 CString path = file;
00094 if (toWrite)
00095 return GetWritableFile(mData.begin(), mData.end(), path, silent);
00096 return GetReadableFile(mData.begin(), mData.end(), path, silent);
00097 }
00098
00099 String
00100 GetFilePath(CString dir, CString file, bool toWrite, bool silent)
00101 {
00102 String path = FileNameConcat(dir, file);
00103 if (toWrite)
00104 return GetWritableFile(mData.begin(), mData.end(), path, silent);
00105 return GetReadableFile(mData.begin(), mData.end(), path, silent);
00106 }
00107
00128 Util::IOBuffer*
00129 GetIOBuffer(CString fileName, bool readMode, bool useMemoryIfLocal,
00130 String useLocalFileIfRemote, Int64 useLocalSizeIfRemote = 0,
00131 bool useIOBufferChannel = false)
00132 {
00133 Util::IOBuffer* res = 0;
00134 ILOG_DEBUG("GetIOBuffer [" << fileName << "]");
00135 if (mDataChannel)
00136 {
00137 if (readMode)
00138 {
00139 if (useIOBufferChannel)
00140 {
00141 ILOG_DEBUG("GetIOBuffer: Read using Channel to " <<
00142 mDataServer << ": " << fileName);
00143 res = new Util::IOBufferChannel(fileName, true,
00144 mDataChannel);
00145 }
00146 else
00147 {
00148 ILOG_DEBUG("GetIOBuffer: Read using File: " << fileName);
00149
00150
00151
00152
00153 if (useLocalFileIfRemote == "tmp")
00154 useLocalFileIfRemote = FileNameTmp();
00155 res = Util::ReadIOBufferFromChannel(mDataChannel, fileName,
00156 useLocalFileIfRemote);
00157 }
00158 }
00159 else
00160 {
00161 if (useIOBufferChannel)
00162 {
00163 ILOG_DEBUG("GetIOBuffer: Write using Channel " << fileName);
00164 res = new Util::IOBufferChannel(fileName, false,
00165 mDataChannel);
00166 }
00167 else if (useLocalFileIfRemote.empty())
00168 {
00169 ILOG_DEBUG("GetIOBuffer: Write using mem " << fileName);
00170 res = new Util::IOBuffer(useLocalSizeIfRemote);
00171 }
00172 else
00173 {
00174 ILOG_DEBUG("GetIOBuffer: Write using File " << fileName);
00175 if (useLocalFileIfRemote == "tmp")
00176 useLocalFileIfRemote = FileNameTmp();
00177 res = new Util::IOBufferFile(useLocalFileIfRemote, false,
00178 false);
00179 }
00180 if (!useIOBufferChannel)
00181 res->SetUseChannel(mDataChannel, fileName,
00182 useLocalFileIfRemote);
00183 }
00184 }
00185 else
00186 {
00187 res = new Util::IOBufferFile(fileName, readMode, useMemoryIfLocal);
00188 }
00189
00190 if (res)
00191 {
00192 if (!res->Valid())
00193 {
00194 delete res;
00195 res = 0;
00196 }
00197 }
00198 return res;
00199 }
00200
00201 Util::Channel*
00202 GetDataChannel()
00203 {
00204 return mDataChannel;
00205 }
00206
00207 void
00208 Dump() const
00209 {
00210 ILOG_INFO_ONCE("Dump: data=[" << mDataString << "] server=[" <<
00211 mDataServer << "]");
00212 }
00213
00214 private:
00215
00216
00217
00218 void
00219 DoMkDir(CString dir)
00220 {
00221 #ifdef unix
00222 mode_t m = 0755;
00223 mkdir(dir.c_str(), m);
00224 #else
00225 mkdir(dir.c_str());
00226 #endif
00227 }
00228
00229 String
00230 DoMkDirPath(CString dir)
00231 {
00232 if (mDataChannel)
00233 return DoMkDirPathServer(dir);
00234
00235 return DoMkDirPathLocal(ConcatDir(*mData.begin(), dir));
00236 }
00237
00238 String
00239 DoMkDirPathLocal(CString dirWithBackSlashes)
00240 {
00241 String dir = StringReplaceAll(dirWithBackSlashes, "\\", "/");
00242 ILOG_DEBUG("DoMkDirPath: " << dir);
00243 StringList subDirs(dir, '/');
00244 String path("");
00245 if (dir[0] == '/')
00246 path = String("/");
00247 for (StringListCI i=subDirs.begin() ; i!=subDirs.end() ; i++)
00248 {
00249 CString subDir = *i;
00250 ILOG_DEBUG(" DoMkDirPath: subDir = " << subDir);
00251 if (subDir.size() == 0)
00252 continue;
00253 if ((path.size() > 0) && (*path.rbegin() != '/'))
00254 path = path + "/";
00255 path = path + *i;
00256 DoMkDir(path);
00257 }
00258 return path;
00259 }
00260
00261 String
00262 DoMkDirPathServer(CString dir)
00263 {
00264 char* buf = mDataChannel->Buffer();
00265 sprintf(buf, "domkdirpath \"%s\" \"%s\"\0", dir.c_str(),
00266 mDataString.c_str());
00267 int len = mDataChannel->SendRequest(strlen(buf)+1);
00268 if (mDataChannel->LastSendHadError())
00269 return "";
00270 buf[len] = 0;
00271 String res(buf);
00272 ILOG_DEBUG("Got dir [" << res << "]");
00273 return res;
00274 }
00275
00276 String
00277 GetReadableFile(StringListCI begin, StringListCI end, CString file,
00278 bool silent)
00279 {
00280 if (mDataChannel)
00281 return GetReadableFileServer(file, silent);
00282 return GetReadableFileLocal(begin, end, file, silent);
00283 }
00284
00285 String
00286 GetReadableFileLocal(StringListCI begin, StringListCI end, CString file,
00287 bool silent)
00288 {
00289 StringListCI i;
00290 for (i=begin ; i!=end ; i++)
00291 {
00292 String path = FileNameConcat(*i, file);
00293 ILOG_DEBUG("GetReadableFile: trying " << path);
00294 if (Util::IOBufferFile::FileExists(path))
00295 return path;
00296 }
00297
00298 if (!silent)
00299 {
00300 ILOG_ERROR("Unable to find " << file << " in path ");
00301 for (i=begin ; i!=end ; i++)
00302 ILOG_INFO(" " << *i);
00303 }
00304 return String("");
00305 }
00306
00307 String
00308 GetReadableFileServer(CString file, bool silent)
00309 {
00310 char* buf = mDataChannel->Buffer();
00311 sprintf(buf, "getreadablefile \"%s\" \"%s\" %d\0", file.c_str(),
00312 mDataString.c_str(), silent);
00313 int len = mDataChannel->SendRequest(strlen(buf)+1);
00314 if (mDataChannel->LastSendHadError())
00315 return "";
00316 buf[len] = 0;
00317 String res(buf);
00318 ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00319 return res;
00320 }
00321
00322 String
00323 GetWritableFile(StringListCI begin, StringListCI end, CString file,
00324 bool silent)
00325 {
00326 if (mDataChannel)
00327 return GetWritableFileServer(file, silent);
00328 return GetWritableFileLocal(begin, end, file, silent);
00329 }
00330
00331 String
00332 GetWritableFileLocal(StringListCI begin, StringListCI end, CString file,
00333 bool silent)
00334 {
00335 if (begin == end)
00336 {
00337 ILOG_ERROR("GetWritableFile: no search path");
00338 return String("");
00339 }
00340 String path = FileNameConcat(*begin, file);
00341 ILOG_DEBUG("GetWritableFile: trying " << path);
00342 if (Util::IOBufferFile::FileExists(path))
00343 {
00344 if (! mOverride)
00345 {
00346 if (!silent)
00347 ILOG_ERROR(path << " already exists, will not override!");
00348 return String("");
00349 }
00350 }
00351 return path;
00352 }
00353
00354 String
00355 GetWritableFileServer(CString file, bool silent)
00356 {
00357 char* buf = mDataChannel->Buffer();
00358 sprintf(buf, "getwritablefile \"%s\" \"%s\" %d %d\0", file.c_str(),
00359 mDataString.c_str(), silent, mOverride);
00360 int len = mDataChannel->SendRequest(strlen(buf)+1);
00361 if (mDataChannel->LastSendHadError())
00362 return "";
00363 buf[len] = 0;
00364 String res(buf);
00365 ILOG_DEBUG("Got file [" << res << "] from " << mDataServer);
00366 return res;
00367 }
00368
00369 String
00370 ConcatDir(CString dir, CString subDir)
00371 {
00372 return (subDir.empty() ? dir : (dir.empty() ? subDir
00373 : (dir + "/" + subDir)));
00374 }
00375
00376 bool mOverride;
00377 String mDataString;
00378 StringList mData;
00379 String mDataServer;
00380 Util::Channel* mDataChannel;
00381
00382 ILOG_VAR_DEC;
00383 };
00384
00385 ILOG_VAR_INIT(FileSystem, Impala.Persistency);
00386
00387 }
00388 }
00389
00390 #endif