Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

Runner.h

Go to the documentation of this file.
00001 #ifndef Impala_Job_Runner_h
00002 #define Impala_Job_Runner_h
00003 
00004 #include <sstream>
00005 
00006 #include "Basis/ILog.h"
00007 #include "Basis/Timer.h"
00008 #include "Basis/FileReadString.h"
00009 
00010 #include "Util/StringParser.h"
00011 #include "Util/ChannelProxy.h"
00012 #include "Job/State.h"
00013 #include "Process/ManagerFactory.h"
00014 #include "Process/Manager.h"
00015 
00016 namespace Impala
00017 {
00018 namespace Job
00019 {
00020 
00021 /***********************************************************************
00022 Instances of this class can run jobs served by a job server. 
00023 ************************************************************************/
00024 
00025 class Runner
00026 {
00027 
00028 public:
00029 
00030     Runner()
00031     {
00032         mProcessManager = Process::ManagerFactory::Create();
00033         mJobServerProxy = 0;
00034     }
00035 
00036     virtual
00037     ~Runner()
00038     {
00039         Disconnect();
00040 
00041         if (mProcessManager)
00042             delete mProcessManager;
00043     }
00044 
00045     bool
00046     IsConnected()
00047     {
00048         return (mJobServerProxy && mJobServerProxy->ChannelIsValid());
00049     }
00050 
00051     void
00052     Disconnect()
00053     {
00054         if (mJobServerProxy)
00055         {
00056             delete mJobServerProxy;
00057             mJobServerProxy = 0;
00058         }
00059     }
00060 
00061     bool
00062     Connect(CString serverName, int port, CString passwordFile)
00063     {
00064         mJobServerProxy = new Util::ChannelProxy(serverName, port, passwordFile);
00065         if (!mJobServerProxy->GetChannel())
00066         {
00067             ILOG_ERROR("Failed to connect to job server");
00068             Disconnect();
00069             return false;
00070         }
00071 
00072         return true;
00073     }
00074 
00075     void
00076     PoisonServer()
00077     {
00078         ILOG_WARN("Requesting job server to shut down all runners " <<
00079                   "after they finish their current job");
00080         mJobServerProxy->Send("Poison");
00081     }
00082 
00083     void
00084     StopServer()
00085     {
00086         ILOG_WARN("Requesting job server to shut down");
00087         mJobServerProxy->Send("ShutDown");
00088     }
00089 
00090     void
00091     RunJobs(int maxJobs, int maxIdleTimeSec, int maxJobTimeMinutes,
00092             int sleepIntervalSec, String cmdLineExtension)
00093     {
00094         if (!IsConnected())
00095         {
00096             ILOG_ERROR("Not connected to a job server");
00097             return;
00098         }
00099 
00100         mMaxJobs = maxJobs;
00101         mMaxIdleTimeSec = maxIdleTimeSec;
00102         mMaxJobTimeMinutes = maxJobTimeMinutes;
00103         mSleepIntervalSec = sleepIntervalSec;
00104 
00105         while (true)
00106         {
00107             if (mMaxJobs > 0 && mJobCmdLineList.size() >= mMaxJobs)
00108             {
00109                 ILOG_INFO("Maximum of jobs to run was reached (" <<
00110                           mJobCmdLineList.size() << ")");
00111                 break;
00112             }
00113             
00114             int jobId = 0;
00115             String cmdLine = "";
00116             if (!WaitForJob(jobId, cmdLine) ||
00117                 !RunJob(jobId, cmdLine, cmdLineExtension))
00118             {
00119                 break;
00120             }
00121         }
00122     }
00123 
00124     const std::list<String>&
00125     GetJobList() const
00126     {
00127         return mJobCmdLineList;
00128     }
00129 
00130 private:
00131 
00132     bool
00133     WaitForJob(int& jobId, String& cmdLine)
00134     {
00135         jobId = 0;
00136         int secWaiting = 0;
00137         String response = "";
00138         bool reportedWaiting = false;
00139         while (true)
00140         {
00141             response = mJobServerProxy->Send("ProvideJob");
00142             if (response == "NoJob")
00143             {
00144                 if (mMaxIdleTimeSec > 0 && secWaiting >= mMaxIdleTimeSec)
00145                 {
00146                     ILOG_INFO("Maximum idle time reached (" << secWaiting <<
00147                               ")");
00148                     return false;
00149                 }
00150                 if (!reportedWaiting)
00151                 {
00152                     ILOG_INFO(mJobCmdLineList.size() <<
00153                               " job(s) done; waiting for new job");
00154                     reportedWaiting = true;
00155                 }
00156                 Sleep(mSleepIntervalSec);
00157                 secWaiting += mSleepIntervalSec;
00158             }
00159             else if (response.substr(0, 7) == "RunJob:")
00160             {
00161                 if (response.substr(7, 6) == "jobId=")
00162                 {
00163                     Util::StringParser parser(response, 13);
00164                     jobId = parser.GetInt(';');
00165                     if (jobId > 0)
00166                     {
00167                         String rest = parser.GetString(';');
00168                         if (rest.substr(0, 8) == "cmdLine=")
00169                         {
00170                             cmdLine = rest.substr(8);
00171                             if (!cmdLine.empty())
00172                                 return true;
00173                         }
00174                     }
00175                 }
00176                 ILOG_WARN("Received invalid job specification: " << response);
00177                 return false;
00178             }
00179             else if (response.substr(0, 6) == "Poison")
00180             {
00181                 // exit the job runner
00182                 return false;
00183             }
00184             else
00185             {
00186                 ILOG_ERROR("Unexpected job request response: " << response);
00187                 return false;
00188             }
00189         }
00190     }
00191 
00192     bool
00193     RunJob(int jobId, String cmdLine, String cmdLineExtension)
00194     {
00195         ILOG_INFO("Received job with ID: " << jobId);
00196         mJobIdList.push_back(jobId);
00197         cmdLine += " " + cmdLineExtension;
00198         cmdLine += " --jobErrorLog job_" + MakeString(jobId) + "_errors.log";
00199         mJobCmdLineList.push_back(cmdLine);
00200         int procHandle = mProcessManager->Create(cmdLine);
00201         if (!procHandle)
00202             return false; // an error was already reported
00203 
00204         Timer jobTimer(0); // starts the timer implicitly
00205         bool licensedToKill = false;
00206         while (mProcessManager->IsRunning(procHandle))
00207         {
00208             // report to job server
00209             std::ostringstream oss;
00210             oss << "AcceptStatus:jobId=" << jobId << ";state="
00211                 << Job::State::RUNNING;
00212             String response = mJobServerProxy->Send(oss.str());
00213 
00214             if (response == "OK")
00215             {
00216                 if (licensedToKill)
00217                     ILOG_INFO("Waiting for job " << jobId << " to get killed");
00218 
00219                 double elapsedSec = jobTimer.SplitTime();
00220                 int elapsedMinutes = (int) (elapsedSec / 60.0);
00221 
00222                 if (mMaxJobTimeMinutes > 0 &&
00223                     elapsedMinutes >= mMaxJobTimeMinutes && !licensedToKill)
00224                 {
00225                     ILOG_ERROR("Job " << jobId << " runs too long (over " <<
00226                                mMaxJobTimeMinutes <<
00227                                " minutes); requesting to kill it");
00228                     licensedToKill = true;
00229                     mProcessManager->Kill(procHandle);
00230                 }
00231 
00232                 Sleep(mSleepIntervalSec);
00233             }
00234 
00235             else if (response == "Kill")
00236             {
00237                 if (licensedToKill)
00238                 {
00239                     ILOG_WARN("Job " << jobId <<
00240                               " was already requested to be killed");
00241                 }
00242                 else
00243                 {
00244                     ILOG_WARN("Received kill request for job " << jobId <<
00245                               "; forwarding request to process manager");
00246                     licensedToKill = true;
00247                     mProcessManager->Kill(procHandle);
00248                 }
00249             }
00250 
00251             else
00252             {
00253                 ILOG_ERROR("Reporting on job " << jobId << " failed: " <<
00254                            response);
00255                 return false;
00256             }
00257         }
00258 
00259         // report job termination to job server
00260         int pmState = mProcessManager->GetState(procHandle);
00261         ILOG_DEBUG("pmState = " << pmState);
00262         int jobState = Job::State::UNDEFINED;
00263         int exitCode = mProcessManager->GetExitCode(procHandle);
00264 
00265         if (licensedToKill)
00266         {
00267             if (pmState == Process::Manager::STATE_TERM_SUCCESS ||
00268                 pmState == Process::Manager::STATE_TERM_ERROR)
00269             {
00270                 ILOG_INFO("Job " << jobId <<
00271                           " terminated normally and did not require killing");
00272                 jobState = Job::State::TERMINATED_NRM;
00273             }
00274             else
00275             {
00276                 ILOG_INFO("Job " << jobId << " has been killed");
00277                 jobState = Job::State::TERMINATED_ABNRM;
00278             }
00279         }
00280         else
00281         {
00282             if (pmState == Process::Manager::STATE_TERM_SUCCESS)
00283             {
00284                 ILOG_INFO("Job " << jobId << " is done");
00285                 jobState = Job::State::TERMINATED_NRM;
00286             }
00287             else if (pmState == Process::Manager::STATE_TERM_ERROR)
00288             {
00289                 ILOG_ERROR("Job " << jobId << " exited with a value of "
00290                            << exitCode << "; check the job's log for errors");
00291                 jobState = Job::State::TERMINATED_NRM;
00292             }
00293             else if (pmState == Process::Manager::STATE_TERM_ABNORMAL)
00294             {
00295                 ILOG_ERROR("Job " << jobId << " exited with a value of " <<
00296                            exitCode << "; check the job's log for errors");
00297                 jobState = Job::State::TERMINATED_ABNRM;
00298             }
00299             else
00300             {
00301                 ILOG_ERROR("Job " << jobId <<
00302                            " was rendered in an unknown state" <<
00303                            "; check the job's log for errors");
00304                 jobState = Job::State::TERMINATED_ABNRM;
00305             }
00306         }
00307 
00308         std::ostringstream oss;
00309         oss << "AcceptStatus:jobId=" << jobId << ";state=" << jobState
00310             << ";exitCode=" << exitCode << ";errorLog=" << GetJobErrorLog(jobId);
00311 
00312         ILOG_DEBUG("Reporting job termination to server: " << oss.str());
00313         String response = mJobServerProxy->Send(oss.str());
00314         if (response != "OK")
00315         {
00316             ILOG_ERROR("Reporting termination of job " << jobId << " failed: "
00317                        << response);
00318             return false;
00319         }
00320         return true;
00321     }
00322 
00323     void
00324     Sleep(int sec)
00325     {
00326         mProcessManager->Sleep(sec);
00327     }
00328 
00329     String
00330     GetJobErrorLog(int jobId)
00331     {
00332         String res("no job errors found");
00333         std::vector<String> errs;
00334         String fName = "job_" + MakeString(jobId) + "_errors.log";
00335 
00336         FileReadString(std::back_inserter(errs), fName, false, false);
00337         if (errs.size() == 0)
00338             return res;
00339 
00340         res = errs[0];
00341         for (int i=1 ; i<errs.size() ; i++)
00342             res += errs[i];
00343         unlink(fName.c_str());
00344         return res;
00345     }
00346 
00347     // sleep interval for receiving and monitoring jobs
00348     //static const int SLEEP_INTERVAL_SEC = 10; // oversleeping 5 sec on average!
00349 
00350     ILOG_VAR_DECL;
00351 
00352     Process::Manager* mProcessManager;
00353     Util::ChannelProxy* mJobServerProxy;
00354     int mMaxJobs;
00355     int mMaxIdleTimeSec;
00356     int mMaxJobTimeMinutes;
00357     int mSleepIntervalSec;
00358     std::list<int> mJobIdList;
00359     std::list<String> mJobCmdLineList;
00360 
00361 }; // class
00362 
00363 ILOG_VAR_INIT(Runner, Impala.Job);
00364 
00365 } // namespace 
00366 } // namespace 
00367 
00368 #endif

Generated on Fri Mar 19 09:31:33 2010 for ImpalaSrc by  doxygen 1.5.1