Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

bool Impala::Job::Runner::RunJob ( int  jobId,
String  cmdLine,
String  cmdLineExtension 
) [inline, private]

Definition at line 193 of file Runner.h.

References Impala::Process::Manager::Create(), Impala::Process::Manager::GetExitCode(), GetJobErrorLog(), Impala::Process::Manager::GetState(), ILOG_DEBUG, ILOG_ERROR, ILOG_INFO, ILOG_WARN, Impala::Process::Manager::IsRunning(), Impala::Process::Manager::Kill(), Impala::MakeString(), mJobCmdLineList, mJobIdList, mJobServerProxy, mMaxJobTimeMinutes, mProcessManager, mSleepIntervalSec, Impala::Job::State::RUNNING, Impala::Util::ChannelProxy::Send(), Sleep(), Impala::Timer::SplitTime(), Impala::Process::Manager::STATE_TERM_ABNORMAL, Impala::Process::Manager::STATE_TERM_ERROR, Impala::Process::Manager::STATE_TERM_SUCCESS, Impala::Job::State::TERMINATED_ABNRM, Impala::Job::State::TERMINATED_NRM, and Impala::Job::State::UNDEFINED.

Referenced by RunJobs().

00194     {
00195         ILOG_INFO("Received job with ID: " << jobId);
00196         mJobIdList.push_back(jobId);
00197         cmdLine += " " + cmdLineExtension;
00198         cmdLine += " --jobErrorLog job_" + MakeString(jobId) + "_errors.log";
00199         mJobCmdLineList.push_back(cmdLine);
00200         int procHandle = mProcessManager->Create(cmdLine);
00201         if (!procHandle)
00202             return false; // an error was already reported
00203 
00204         Timer jobTimer(0); // starts the timer implicitly
00205         bool licensedToKill = false;
00206         while (mProcessManager->IsRunning(procHandle))
00207         {
00208             // report to job server
00209             std::ostringstream oss;
00210             oss << "AcceptStatus:jobId=" << jobId << ";state="
00211                 << Job::State::RUNNING;
00212             String response = mJobServerProxy->Send(oss.str());
00213 
00214             if (response == "OK")
00215             {
00216                 if (licensedToKill)
00217                     ILOG_INFO("Waiting for job " << jobId << " to get killed");
00218 
00219                 double elapsedSec = jobTimer.SplitTime();
00220                 int elapsedMinutes = (int) (elapsedSec / 60.0);
00221 
00222                 if (mMaxJobTimeMinutes > 0 &&
00223                     elapsedMinutes >= mMaxJobTimeMinutes && !licensedToKill)
00224                 {
00225                     ILOG_ERROR("Job " << jobId << " runs too long (over " <<
00226                                mMaxJobTimeMinutes <<
00227                                " minutes); requesting to kill it");
00228                     licensedToKill = true;
00229                     mProcessManager->Kill(procHandle);
00230                 }
00231 
00232                 Sleep(mSleepIntervalSec);
00233             }
00234 
00235             else if (response == "Kill")
00236             {
00237                 if (licensedToKill)
00238                 {
00239                     ILOG_WARN("Job " << jobId <<
00240                               " was already requested to be killed");
00241                 }
00242                 else
00243                 {
00244                     ILOG_WARN("Received kill request for job " << jobId <<
00245                               "; forwarding request to process manager");
00246                     licensedToKill = true;
00247                     mProcessManager->Kill(procHandle);
00248                 }
00249             }
00250 
00251             else
00252             {
00253                 ILOG_ERROR("Reporting on job " << jobId << " failed: " <<
00254                            response);
00255                 return false;
00256             }
00257         }
00258 
00259         // report job termination to job server
00260         int pmState = mProcessManager->GetState(procHandle);
00261         ILOG_DEBUG("pmState = " << pmState);
00262         int jobState = Job::State::UNDEFINED;
00263         int exitCode = mProcessManager->GetExitCode(procHandle);
00264 
00265         if (licensedToKill)
00266         {
00267             if (pmState == Process::Manager::STATE_TERM_SUCCESS ||
00268                 pmState == Process::Manager::STATE_TERM_ERROR)
00269             {
00270                 ILOG_INFO("Job " << jobId <<
00271                           " terminated normally and did not require killing");
00272                 jobState = Job::State::TERMINATED_NRM;
00273             }
00274             else
00275             {
00276                 ILOG_INFO("Job " << jobId << " has been killed");
00277                 jobState = Job::State::TERMINATED_ABNRM;
00278             }
00279         }
00280         else
00281         {
00282             if (pmState == Process::Manager::STATE_TERM_SUCCESS)
00283             {
00284                 ILOG_INFO("Job " << jobId << " is done");
00285                 jobState = Job::State::TERMINATED_NRM;
00286             }
00287             else if (pmState == Process::Manager::STATE_TERM_ERROR)
00288             {
00289                 ILOG_ERROR("Job " << jobId << " exited with a value of "
00290                            << exitCode << "; check the job's log for errors");
00291                 jobState = Job::State::TERMINATED_NRM;
00292             }
00293             else if (pmState == Process::Manager::STATE_TERM_ABNORMAL)
00294             {
00295                 ILOG_ERROR("Job " << jobId << " exited with a value of " <<
00296                            exitCode << "; check the job's log for errors");
00297                 jobState = Job::State::TERMINATED_ABNRM;
00298             }
00299             else
00300             {
00301                 ILOG_ERROR("Job " << jobId <<
00302                            " was rendered in an unknown state" <<
00303                            "; check the job's log for errors");
00304                 jobState = Job::State::TERMINATED_ABNRM;
00305             }
00306         }
00307 
00308         std::ostringstream oss;
00309         oss << "AcceptStatus:jobId=" << jobId << ";state=" << jobState
00310             << ";exitCode=" << exitCode << ";errorLog=" << GetJobErrorLog(jobId);
00311 
00312         ILOG_DEBUG("Reporting job termination to server: " << oss.str());
00313         String response = mJobServerProxy->Send(oss.str());
00314         if (response != "OK")
00315         {
00316             ILOG_ERROR("Reporting termination of job " << jobId << " failed: "
00317                        << response);
00318             return false;
00319         }
00320         return true;
00321     }

Here is the call graph for this function:


Generated on Fri Mar 19 11:34:02 2010 for ImpalaSrc by  doxygen 1.5.1