Definition at line 193 of file Runner.h. References Impala::Process::Manager::Create(), Impala::Process::Manager::GetExitCode(), GetJobErrorLog(), Impala::Process::Manager::GetState(), ILOG_DEBUG, ILOG_ERROR, ILOG_INFO, ILOG_WARN, Impala::Process::Manager::IsRunning(), Impala::Process::Manager::Kill(), Impala::MakeString(), mJobCmdLineList, mJobIdList, mJobServerProxy, mMaxJobTimeMinutes, mProcessManager, mSleepIntervalSec, Impala::Job::State::RUNNING, Impala::Util::ChannelProxy::Send(), Sleep(), Impala::Timer::SplitTime(), Impala::Process::Manager::STATE_TERM_ABNORMAL, Impala::Process::Manager::STATE_TERM_ERROR, Impala::Process::Manager::STATE_TERM_SUCCESS, Impala::Job::State::TERMINATED_ABNRM, Impala::Job::State::TERMINATED_NRM, and Impala::Job::State::UNDEFINED. Referenced by RunJobs(). 00194 { 00195 ILOG_INFO("Received job with ID: " << jobId); 00196 mJobIdList.push_back(jobId); 00197 cmdLine += " " + cmdLineExtension; 00198 cmdLine += " --jobErrorLog job_" + MakeString(jobId) + "_errors.log"; 00199 mJobCmdLineList.push_back(cmdLine); 00200 int procHandle = mProcessManager->Create(cmdLine); 00201 if (!procHandle) 00202 return false; // an error was already reported 00203 00204 Timer jobTimer(0); // starts the timer implicitly 00205 bool licensedToKill = false; 00206 while (mProcessManager->IsRunning(procHandle)) 00207 { 00208 // report to job server 00209 std::ostringstream oss; 00210 oss << "AcceptStatus:jobId=" << jobId << ";state=" 00211 << Job::State::RUNNING; 00212 String response = mJobServerProxy->Send(oss.str()); 00213 00214 if (response == "OK") 00215 { 00216 if (licensedToKill) 00217 ILOG_INFO("Waiting for job " << jobId << " to get killed"); 00218 00219 double elapsedSec = jobTimer.SplitTime(); 00220 int elapsedMinutes = (int) (elapsedSec / 60.0); 00221 00222 if (mMaxJobTimeMinutes > 0 && 00223 elapsedMinutes >= mMaxJobTimeMinutes && !licensedToKill) 00224 { 00225 ILOG_ERROR("Job " << jobId << " runs too long (over " << 00226 mMaxJobTimeMinutes << 00227 " minutes); requesting to kill it"); 00228 licensedToKill = true; 00229 mProcessManager->Kill(procHandle); 00230 } 00231 00232 Sleep(mSleepIntervalSec); 00233 } 00234 00235 else if (response == "Kill") 00236 { 00237 if (licensedToKill) 00238 { 00239 ILOG_WARN("Job " << jobId << 00240 " was already requested to be killed"); 00241 } 00242 else 00243 { 00244 ILOG_WARN("Received kill request for job " << jobId << 00245 "; forwarding request to process manager"); 00246 licensedToKill = true; 00247 mProcessManager->Kill(procHandle); 00248 } 00249 } 00250 00251 else 00252 { 00253 ILOG_ERROR("Reporting on job " << jobId << " failed: " << 00254 response); 00255 return false; 00256 } 00257 } 00258 00259 // report job termination to job server 00260 int pmState = mProcessManager->GetState(procHandle); 00261 ILOG_DEBUG("pmState = " << pmState); 00262 int jobState = Job::State::UNDEFINED; 00263 int exitCode = mProcessManager->GetExitCode(procHandle); 00264 00265 if (licensedToKill) 00266 { 00267 if (pmState == Process::Manager::STATE_TERM_SUCCESS || 00268 pmState == Process::Manager::STATE_TERM_ERROR) 00269 { 00270 ILOG_INFO("Job " << jobId << 00271 " terminated normally and did not require killing"); 00272 jobState = Job::State::TERMINATED_NRM; 00273 } 00274 else 00275 { 00276 ILOG_INFO("Job " << jobId << " has been killed"); 00277 jobState = Job::State::TERMINATED_ABNRM; 00278 } 00279 } 00280 else 00281 { 00282 if (pmState == Process::Manager::STATE_TERM_SUCCESS) 00283 { 00284 ILOG_INFO("Job " << jobId << " is done"); 00285 jobState = Job::State::TERMINATED_NRM; 00286 } 00287 else if (pmState == Process::Manager::STATE_TERM_ERROR) 00288 { 00289 ILOG_ERROR("Job " << jobId << " exited with a value of " 00290 << exitCode << "; check the job's log for errors"); 00291 jobState = Job::State::TERMINATED_NRM; 00292 } 00293 else if (pmState == Process::Manager::STATE_TERM_ABNORMAL) 00294 { 00295 ILOG_ERROR("Job " << jobId << " exited with a value of " << 00296 exitCode << "; check the job's log for errors"); 00297 jobState = Job::State::TERMINATED_ABNRM; 00298 } 00299 else 00300 { 00301 ILOG_ERROR("Job " << jobId << 00302 " was rendered in an unknown state" << 00303 "; check the job's log for errors"); 00304 jobState = Job::State::TERMINATED_ABNRM; 00305 } 00306 } 00307 00308 std::ostringstream oss; 00309 oss << "AcceptStatus:jobId=" << jobId << ";state=" << jobState 00310 << ";exitCode=" << exitCode << ";errorLog=" << GetJobErrorLog(jobId); 00311 00312 ILOG_DEBUG("Reporting job termination to server: " << oss.str()); 00313 String response = mJobServerProxy->Send(oss.str()); 00314 if (response != "OK") 00315 { 00316 ILOG_ERROR("Reporting termination of job " << jobId << " failed: " 00317 << response); 00318 return false; 00319 } 00320 return true; 00321 }
Here is the call graph for this function:
|