00001 #ifndef Impala_Job_Runner_h
00002 #define Impala_Job_Runner_h
00003
00004 #include <sstream>
00005
00006 #include "Basis/ILog.h"
00007 #include "Basis/Timer.h"
00008 #include "Basis/FileReadString.h"
00009
00010 #include "Util/StringParser.h"
00011 #include "Util/ChannelProxy.h"
00012 #include "Job/State.h"
00013 #include "Process/ManagerFactory.h"
00014 #include "Process/Manager.h"
00015
00016 namespace Impala
00017 {
00018 namespace Job
00019 {
00020
00021
00022
00023
00024
00025 class Runner
00026 {
00027
00028 public:
00029
00030 Runner()
00031 {
00032 mProcessManager = Process::ManagerFactory::Create();
00033 mJobServerProxy = 0;
00034 }
00035
00036 virtual
00037 ~Runner()
00038 {
00039 Disconnect();
00040
00041 if (mProcessManager)
00042 delete mProcessManager;
00043 }
00044
00045 bool
00046 IsConnected()
00047 {
00048 return (mJobServerProxy && mJobServerProxy->ChannelIsValid());
00049 }
00050
00051 void
00052 Disconnect()
00053 {
00054 if (mJobServerProxy)
00055 {
00056 delete mJobServerProxy;
00057 mJobServerProxy = 0;
00058 }
00059 }
00060
00061 bool
00062 Connect(CString serverName, int port, CString passwordFile)
00063 {
00064 mJobServerProxy = new Util::ChannelProxy(serverName, port, passwordFile);
00065 if (!mJobServerProxy->GetChannel())
00066 {
00067 ILOG_ERROR("Failed to connect to job server");
00068 Disconnect();
00069 return false;
00070 }
00071
00072 return true;
00073 }
00074
00075 void
00076 PoisonServer()
00077 {
00078 ILOG_WARN("Requesting job server to shut down all runners " <<
00079 "after they finish their current job");
00080 mJobServerProxy->Send("Poison");
00081 }
00082
00083 void
00084 StopServer()
00085 {
00086 ILOG_WARN("Requesting job server to shut down");
00087 mJobServerProxy->Send("ShutDown");
00088 }
00089
00090 void
00091 RunJobs(int maxJobs, int maxIdleTimeSec, int maxJobTimeMinutes,
00092 int sleepIntervalSec, String cmdLineExtension)
00093 {
00094 if (!IsConnected())
00095 {
00096 ILOG_ERROR("Not connected to a job server");
00097 return;
00098 }
00099
00100 mMaxJobs = maxJobs;
00101 mMaxIdleTimeSec = maxIdleTimeSec;
00102 mMaxJobTimeMinutes = maxJobTimeMinutes;
00103 mSleepIntervalSec = sleepIntervalSec;
00104
00105 while (true)
00106 {
00107 if (mMaxJobs > 0 && mJobCmdLineList.size() >= mMaxJobs)
00108 {
00109 ILOG_INFO("Maximum of jobs to run was reached (" <<
00110 mJobCmdLineList.size() << ")");
00111 break;
00112 }
00113
00114 int jobId = 0;
00115 String cmdLine = "";
00116 if (!WaitForJob(jobId, cmdLine) ||
00117 !RunJob(jobId, cmdLine, cmdLineExtension))
00118 {
00119 break;
00120 }
00121 }
00122 }
00123
00124 const std::list<String>&
00125 GetJobList() const
00126 {
00127 return mJobCmdLineList;
00128 }
00129
00130 private:
00131
00132 bool
00133 WaitForJob(int& jobId, String& cmdLine)
00134 {
00135 jobId = 0;
00136 int secWaiting = 0;
00137 String response = "";
00138 bool reportedWaiting = false;
00139 while (true)
00140 {
00141 response = mJobServerProxy->Send("ProvideJob");
00142 if (response == "NoJob")
00143 {
00144 if (mMaxIdleTimeSec > 0 && secWaiting >= mMaxIdleTimeSec)
00145 {
00146 ILOG_INFO("Maximum idle time reached (" << secWaiting <<
00147 ")");
00148 return false;
00149 }
00150 if (!reportedWaiting)
00151 {
00152 ILOG_INFO(mJobCmdLineList.size() <<
00153 " job(s) done; waiting for new job");
00154 reportedWaiting = true;
00155 }
00156 Sleep(mSleepIntervalSec);
00157 secWaiting += mSleepIntervalSec;
00158 }
00159 else if (response.substr(0, 7) == "RunJob:")
00160 {
00161 if (response.substr(7, 6) == "jobId=")
00162 {
00163 Util::StringParser parser(response, 13);
00164 jobId = parser.GetInt(';');
00165 if (jobId > 0)
00166 {
00167 String rest = parser.GetString(';');
00168 if (rest.substr(0, 8) == "cmdLine=")
00169 {
00170 cmdLine = rest.substr(8);
00171 if (!cmdLine.empty())
00172 return true;
00173 }
00174 }
00175 }
00176 ILOG_WARN("Received invalid job specification: " << response);
00177 return false;
00178 }
00179 else if (response.substr(0, 6) == "Poison")
00180 {
00181
00182 return false;
00183 }
00184 else
00185 {
00186 ILOG_ERROR("Unexpected job request response: " << response);
00187 return false;
00188 }
00189 }
00190 }
00191
00192 bool
00193 RunJob(int jobId, String cmdLine, String cmdLineExtension)
00194 {
00195 ILOG_INFO("Received job with ID: " << jobId);
00196 mJobIdList.push_back(jobId);
00197 cmdLine += " " + cmdLineExtension;
00198 cmdLine += " --jobErrorLog job_" + MakeString(jobId) + "_errors.log";
00199 mJobCmdLineList.push_back(cmdLine);
00200 int procHandle = mProcessManager->Create(cmdLine);
00201 if (!procHandle)
00202 return false;
00203
00204 Timer jobTimer(0);
00205 bool licensedToKill = false;
00206 while (mProcessManager->IsRunning(procHandle))
00207 {
00208
00209 std::ostringstream oss;
00210 oss << "AcceptStatus:jobId=" << jobId << ";state="
00211 << Job::State::RUNNING;
00212 String response = mJobServerProxy->Send(oss.str());
00213
00214 if (response == "OK")
00215 {
00216 if (licensedToKill)
00217 ILOG_INFO("Waiting for job " << jobId << " to get killed");
00218
00219 double elapsedSec = jobTimer.SplitTime();
00220 int elapsedMinutes = (int) (elapsedSec / 60.0);
00221
00222 if (mMaxJobTimeMinutes > 0 &&
00223 elapsedMinutes >= mMaxJobTimeMinutes && !licensedToKill)
00224 {
00225 ILOG_ERROR("Job " << jobId << " runs too long (over " <<
00226 mMaxJobTimeMinutes <<
00227 " minutes); requesting to kill it");
00228 licensedToKill = true;
00229 mProcessManager->Kill(procHandle);
00230 }
00231
00232 Sleep(mSleepIntervalSec);
00233 }
00234
00235 else if (response == "Kill")
00236 {
00237 if (licensedToKill)
00238 {
00239 ILOG_WARN("Job " << jobId <<
00240 " was already requested to be killed");
00241 }
00242 else
00243 {
00244 ILOG_WARN("Received kill request for job " << jobId <<
00245 "; forwarding request to process manager");
00246 licensedToKill = true;
00247 mProcessManager->Kill(procHandle);
00248 }
00249 }
00250
00251 else
00252 {
00253 ILOG_ERROR("Reporting on job " << jobId << " failed: " <<
00254 response);
00255 return false;
00256 }
00257 }
00258
00259
00260 int pmState = mProcessManager->GetState(procHandle);
00261 ILOG_DEBUG("pmState = " << pmState);
00262 int jobState = Job::State::UNDEFINED;
00263 int exitCode = mProcessManager->GetExitCode(procHandle);
00264
00265 if (licensedToKill)
00266 {
00267 if (pmState == Process::Manager::STATE_TERM_SUCCESS ||
00268 pmState == Process::Manager::STATE_TERM_ERROR)
00269 {
00270 ILOG_INFO("Job " << jobId <<
00271 " terminated normally and did not require killing");
00272 jobState = Job::State::TERMINATED_NRM;
00273 }
00274 else
00275 {
00276 ILOG_INFO("Job " << jobId << " has been killed");
00277 jobState = Job::State::TERMINATED_ABNRM;
00278 }
00279 }
00280 else
00281 {
00282 if (pmState == Process::Manager::STATE_TERM_SUCCESS)
00283 {
00284 ILOG_INFO("Job " << jobId << " is done");
00285 jobState = Job::State::TERMINATED_NRM;
00286 }
00287 else if (pmState == Process::Manager::STATE_TERM_ERROR)
00288 {
00289 ILOG_ERROR("Job " << jobId << " exited with a value of "
00290 << exitCode << "; check the job's log for errors");
00291 jobState = Job::State::TERMINATED_NRM;
00292 }
00293 else if (pmState == Process::Manager::STATE_TERM_ABNORMAL)
00294 {
00295 ILOG_ERROR("Job " << jobId << " exited with a value of " <<
00296 exitCode << "; check the job's log for errors");
00297 jobState = Job::State::TERMINATED_ABNRM;
00298 }
00299 else
00300 {
00301 ILOG_ERROR("Job " << jobId <<
00302 " was rendered in an unknown state" <<
00303 "; check the job's log for errors");
00304 jobState = Job::State::TERMINATED_ABNRM;
00305 }
00306 }
00307
00308 std::ostringstream oss;
00309 oss << "AcceptStatus:jobId=" << jobId << ";state=" << jobState
00310 << ";exitCode=" << exitCode << ";errorLog=" << GetJobErrorLog(jobId);
00311
00312 ILOG_DEBUG("Reporting job termination to server: " << oss.str());
00313 String response = mJobServerProxy->Send(oss.str());
00314 if (response != "OK")
00315 {
00316 ILOG_ERROR("Reporting termination of job " << jobId << " failed: "
00317 << response);
00318 return false;
00319 }
00320 return true;
00321 }
00322
00323 void
00324 Sleep(int sec)
00325 {
00326 mProcessManager->Sleep(sec);
00327 }
00328
00329 String
00330 GetJobErrorLog(int jobId)
00331 {
00332 String res("no job errors found");
00333 std::vector<String> errs;
00334 String fName = "job_" + MakeString(jobId) + "_errors.log";
00335
00336 FileReadString(std::back_inserter(errs), fName, false, false);
00337 if (errs.size() == 0)
00338 return res;
00339
00340 res = errs[0];
00341 for (int i=1 ; i<errs.size() ; i++)
00342 res += errs[i];
00343 unlink(fName.c_str());
00344 return res;
00345 }
00346
00347
00348
00349
00350 ILOG_VAR_DECL;
00351
00352 Process::Manager* mProcessManager;
00353 Util::ChannelProxy* mJobServerProxy;
00354 int mMaxJobs;
00355 int mMaxIdleTimeSec;
00356 int mMaxJobTimeMinutes;
00357 int mSleepIntervalSec;
00358 std::list<int> mJobIdList;
00359 std::list<String> mJobCmdLineList;
00360
00361 };
00362
00363 ILOG_VAR_INIT(Runner, Impala.Job);
00364
00365 }
00366 }
00367
00368 #endif