00001 #ifndef Impala_Job_Server_h
00002 #define Impala_Job_Server_h
00003
00004 #include <deque>
00005 #include <map>
00006
00007 #include <strstream>
00008
00009 #include "Basis/ILog.h"
00010 #include "Util/StringParser.h"
00011 #include "Util/ChannelServer.h"
00012 #include "Job/State.h"
00013
00014 namespace Impala
00015 {
00016 namespace Job
00017 {
00018
00019
00020
00021
00022
00023 class Server : public Util::ChannelServer
00024 {
00025 typedef Job::State JobState;
00026 typedef Job::State::StateType JobStateType;
00027
00028 public:
00029
00030 typedef struct Job
00031 {
00032
00033 int id;
00034 String jobSpec;
00035 JobStateType state;
00036 int exitCode;
00037 String errorLog;
00038
00039
00040
00041 int srcPort;
00042 int portUsed;
00043 String agent;
00044 } Job;
00045
00046 Server(int port, int nrPorts, CString passwordFile)
00047 : Util::ChannelServer(port, nrPorts, true, passwordFile)
00048 {
00049 Init();
00050 }
00051
00052 virtual
00053 ~Server()
00054 {
00055 int len = mJobsDone.size();
00056 for (int i = 0; i < len; i++)
00057 delete mJobsDone[i];
00058
00059 len = mJobsWaiting.size();
00060 for (int i = 0; i < len; i++)
00061 delete mJobsWaiting[i];
00062
00063 int maxPort = FirstPort() + NrOfPorts() - 1;
00064 for (int port = (FirstPort() + 1); port <= maxPort; port++)
00065 {
00066 if (mJobByPort[port] != 0)
00067 {
00068 Job* job = mJobByPort[port];
00069 delete job;
00070 mJobByPort[port] = 0;
00071 }
00072 }
00073 }
00074
00075 virtual void
00076 HandleIdle()
00077 {
00078 ChannelServer::HandleIdle();
00079
00080 int jobsWaiting = mJobsWaiting.size();
00081 int jobsDone = mJobsDone.size();
00082 int runningJobs = 0;
00083 int maxPort = FirstPort() + NrOfPorts() - 1;
00084 for (int port = (FirstPort() + 1); port <= maxPort; port++)
00085 if (mJobByPort[port] != 0)
00086 runningJobs++;
00087 ILOG_DEBUG("Jobs: waiting=" << jobsWaiting << ", running=" <<
00088 runningJobs << ", done=" << jobsDone);
00089 }
00090
00091 virtual void
00092 HandleDisconnect(int port)
00093 {
00094 ChannelServer::HandleDisconnect(port);
00095 RescheduleJob(port);
00096 RemoveJobsAccepted(port);
00097 }
00098
00099 bool
00100 JobQueueFull()
00101 {
00102 return (mJobsWaiting.size() > 1000000);
00103 }
00104
00105
00106 int
00107 AddToJobQueue(Job* job)
00108 {
00109 if (JobQueueFull())
00110 {
00111 ILOG_ERROR("Job queue full; cannot add job");
00112 return 0;
00113 }
00114 else
00115 {
00116 job->id = mNextJobId++;
00117 ILOG_INFO("Assigning job ID: " << job->id);
00118 job->state = State::SCHEDULED;
00119 job->portUsed = -1;
00120 job->exitCode = -99;
00121 mJobsWaiting.push_back(job);
00122 return job->id;
00123 }
00124 }
00125
00126 Job*
00127 FindJob(int jobId)
00128 {
00129
00130 int maxPort = FirstPort() + NrOfPorts() - 1;
00131 for (int port = (FirstPort() + 1); port <= maxPort; port++)
00132 if (mJobByPort[port] != 0 && mJobByPort[port]->id == jobId)
00133 return mJobByPort[port];
00134
00135
00136 int len = mJobsWaiting.size();
00137 for (int i = 0; i < len; i++)
00138 if (mJobsWaiting[i]->id == jobId)
00139 return mJobsWaiting[i];
00140
00141
00142 len = mJobsDone.size();
00143 for (int i = 0; i < len; i++)
00144 if (mJobsDone[i]->id == jobId)
00145 return mJobsDone[i];
00146
00147
00148 return 0;
00149 }
00150
00151
00152 Job*
00153 PopFirstFromJobQueue()
00154 {
00155 Job* job = 0;
00156 if (mJobsWaiting.empty())
00157 {
00158 ILOG_ERROR("Job queue empty; cannot pop job");
00159 }
00160 else
00161 {
00162 job = mJobsWaiting[0];
00163 mJobsWaiting.pop_front();
00164 }
00165 return job;
00166 }
00167
00168 protected:
00169
00170 virtual int
00171 AcceptRequest(char* buf, int len, int bufSize, CString conn, int port)
00172 {
00173 int rcBase = ChannelServer::AcceptRequest(buf, len, bufSize, conn, port);
00174 if (rcBase >= 0)
00175 return rcBase;
00176
00177 ILogErrors& errors = ILogErrors::GetInstance();
00178 errors.Mark();
00179
00180 if (strncmp(buf, "AcceptJob", 9) == 0)
00181 AcceptNewJob(buf, len, bufSize, conn, port);
00182 else if (strncmp(buf, "ProvideStatus", 13) == 0)
00183 AcceptRequestForStatus(buf, len, bufSize, conn, port);
00184 else if (strncmp(buf, "ProvideJob", 10) == 0)
00185 AcceptRequestForJob(buf, len, bufSize, conn, port);
00186 else if (strncmp(buf, "AcceptStatus", 12) == 0)
00187 AcceptJobStatus(buf, len, bufSize, conn, port);
00188 else if (strncmp(buf, "Poison", 6) == 0)
00189 mPoisoned = !mPoisoned;
00190 else
00191 return -1;
00192
00193 if (errors.GetNrErrorsSinceMark() > 0)
00194 {
00195 String last = errors.GetLastErrorSinceMark();
00196 sprintf(buf, "ERROR : last error at server : %s\0", last.c_str());
00197 }
00198
00199 return strlen(buf) + 1;
00200 }
00201
00202 virtual void
00203 AcceptNewJob(char* buf, int len, int bufSize, CString conn, int port)
00204 {
00205 String curCon = ConnectionDescr();
00206 ILOG_INFO("Receiving new job from " << curCon);
00207 if (JobQueueFull())
00208 {
00209 ILOG_WARN("Job queue full, cannot accept new job");
00210 sprintf(buf, "MaxJobs\0");
00211 }
00212 else
00213 {
00214 Util::StringParser parser(String(buf, len));
00215 parser.GetString(':');
00216 if (parser.GetString('=') == "cmdLine")
00217 {
00218 String cmdLine = parser.GetString('\0');
00219 Job* job = new Job();
00220 job->srcPort = port;
00221 job->jobSpec = cmdLine;
00222 int jobId = AddToJobQueue(job);
00223 sprintf(buf, "JobAccepted:jobId=%i\0", jobId);
00224 }
00225 else
00226 {
00227 ILOG_ERROR("Invalid job specification: " << String(buf, len));
00228 sprintf(buf, "MaxJobs\0");
00229 }
00230 }
00231 }
00232
00233
00234 virtual void
00235 AcceptRequestForStatus(char* buf, int len, int bufSize, CString conn,
00236 int port)
00237 {
00238 String curCon = ConnectionDescr();
00239 Util::StringParser parser(buf);
00240 parser.GetString(':');
00241 String requestKey = parser.GetString('=');
00242 if (requestKey == "jobId")
00243 {
00244 int jobId = parser.GetInt();
00245 if (jobId > 0)
00246 {
00247 ILOG_DEBUG("Receiving request for report on job " << jobId <<
00248 " from " << curCon);
00249 Job* job = FindJob(jobId);
00250 if (job)
00251 {
00252 sprintf(buf,
00253 "JobStatus:jobId=%d;state=%d;exitCode=%d;errorLog=%s\0",
00254 jobId, job->state, job->exitCode,
00255 job->errorLog.c_str());
00256 return;
00257 }
00258 else
00259 {
00260 ILOG_ERROR("Job ID unknown: " << jobId);
00261 }
00262 }
00263 else
00264 {
00265 ILOG_ERROR("No job ID specified: " << String(buf, len));
00266 }
00267 }
00268 else if (requestKey == "reference")
00269 {
00270 ILOG_DEBUG("Receiving request for job state changes from " <<
00271 curCon);
00272 std::map<int, int> refList;
00273 int jobId = parser.GetInt(':', false, true);
00274 while (jobId > 0)
00275 {
00276 JobStateType state = (JobStateType) parser.GetInt(' ', true,
00277 true);
00278 refList[jobId] = state;
00279 jobId = parser.GetInt(':', false, true);
00280 }
00281
00282 int nrOfChanges = 0;
00283 std::ostringstream oss;
00284 std::map<int, int>::iterator iter = refList.begin();
00285 for ( ; iter != refList.end(); iter++)
00286 {
00287 int jobId = iter->first;
00288 Job* job = FindJob(jobId);
00289 if (job)
00290 {
00291 if (job->state != iter->second)
00292 {
00293 oss << jobId << ':' << job->state << " ";
00294 nrOfChanges++;
00295 }
00296 }
00297 else
00298 {
00299 ILOG_ERROR("Job ID unknown: " << jobId);
00300 }
00301 }
00302 if (nrOfChanges > 0)
00303 sprintf(buf, "JobStatus:changed=%s\0", oss.str().c_str());
00304 else
00305 sprintf(buf, "NoChange\0");
00306 return;
00307 }
00308 else
00309 {
00310 ILOG_ERROR("Invalid job report request: " << String(buf, len));
00311 }
00312
00313 sprintf(buf, "ERROR\0");
00314 }
00315
00316 virtual void
00317 AcceptRequestForJob(char* buf, int len, int bufSize, CString conn, int port)
00318 {
00319 String curCon = ConnectionDescr();
00320 ILOG_DEBUG("Receiving job request from " << curCon);
00321 if (mJobByPort[port] != 0)
00322 {
00323 ILOG_WARN("Unexpected job request from " << curCon);
00324 sprintf(buf, "ERROR\0");
00325 return;
00326 }
00327
00328 if (mPoisoned)
00329 {
00330 ILOG_DEBUG("Poisoning " << curCon);
00331 sprintf(buf, "Poison\0");
00332 return;
00333 }
00334
00335 if (mJobsWaiting.empty())
00336 {
00337 ILOG_DEBUG("No job for " << curCon);
00338 sprintf(buf, "NoJob\0");
00339 return;
00340 }
00341
00342 Job* job = PopFirstFromJobQueue();
00343 job->portUsed = port;
00344 job->agent = conn;
00345 job->state = State::RUNNING;
00346
00347 mJobByPort[port] = job;
00348 ILOG_INFO("Serving job " << job->id << " to " << curCon);
00349 sprintf(buf, "RunJob:jobId=%d;cmdLine=%s\0", job->id,
00350 job->jobSpec.c_str());
00351 }
00352
00353 virtual void
00354 AcceptJobStatus(char* buf, int len, int bufSize, CString conn, int port)
00355 {
00356 String curCon = ConnectionDescr();
00357 ILOG_DEBUG("Receiving job report from " << curCon);
00358 Job* job = mJobByPort[port];
00359 if (job == 0)
00360 {
00361 ILOG_WARN("Unexpected job report from " << curCon);
00362 sprintf(buf, "ERROR\0");
00363 return;
00364 }
00365
00366 String bufStr(buf);
00367 Util::StringParser parser(bufStr);
00368 parser.GetString(':');
00369 if (parser.GetString('=') == "jobId")
00370 {
00371 int jobId = parser.GetInt(';', true, true);
00372 if (jobId == job->id)
00373 {
00374 if (parser.GetString('=') == "state")
00375 {
00376 JobStateType reportedState =
00377 (JobStateType) parser.GetInt(';', false, true);
00378 if (reportedState == State::RUNNING)
00379 {
00380 if (job->state == State::KILL_ATTEMPT)
00381 {
00382 ILOG_WARN("Requesting " << curCon << " to kill job "
00383 << jobId);
00384 sprintf(buf, "Kill\0");
00385 }
00386 else
00387 {
00388 sprintf(buf, "OK\0");
00389 }
00390 return;
00391 }
00392 else if (reportedState == State::TERMINATED_NRM ||
00393 reportedState == State::TERMINATED_ABNRM)
00394 {
00395 if (job->state == State::KILL_ATTEMPT)
00396 ILOG_WARN("Successfully completed kill for job "
00397 << job->id);
00398 job->state = reportedState;
00399 if (parser.GetString('=') == "exitCode")
00400 job->exitCode = parser.GetInt(';', true, true);
00401 if (parser.GetString('=') == "errorLog")
00402 job->errorLog = bufStr.substr(parser.Position());
00403 mJobsDone.push_back(job);
00404 mJobByPort[port] = 0;
00405 sprintf(buf, "OK\0");
00406 return;
00407 }
00408 }
00409 }
00410 }
00411 ILOG_WARN("Invalid job report (from " << curCon << ") : " <<
00412 String(buf, len));
00413 sprintf(buf, "ERROR\0");
00414 }
00415
00416
00417 private:
00418
00419 void
00420 Init()
00421 {
00422 mNextJobId = 1;
00423 mPoisoned = false;
00424 }
00425
00426 void
00427 RemoveJobsAccepted(int srcPort)
00428 {
00429 CString logInformation = "Job source at port " + MakeString(srcPort) +
00430 " disconnected; removing its jobs";
00431 bool loggedInformational = false;
00432
00433 std::deque<Job*>::iterator iter = mJobsWaiting.begin();
00434 while (iter != mJobsWaiting.end())
00435 {
00436 if ((*iter)->srcPort == srcPort)
00437 {
00438 if (!loggedInformational)
00439 ILOG_WARN(logInformation);
00440 loggedInformational = true;
00441
00442 ILOG_INFO("Removed pending job " << (*iter)->id <<
00443 " accepted at port " << srcPort);
00444 delete *iter;
00445 iter = mJobsWaiting.erase(iter);
00446 }
00447 else
00448 {
00449 iter++;
00450 }
00451 }
00452
00453 int maxPort = FirstPort() + NrOfPorts() - 1;
00454 for (int port = (FirstPort() + 1); port <= maxPort; port++)
00455 {
00456 Job* job = mJobByPort[port];
00457 if (job && job->srcPort == srcPort)
00458 {
00459 if (!loggedInformational)
00460 ILOG_WARN(logInformation);
00461 loggedInformational = true;
00462
00463 ILOG_INFO("Requesting to kill job " << job->id << " run by "
00464 << job->agent << " at port " << port);
00465 job->state = State::KILL_ATTEMPT;
00466 }
00467 }
00468 }
00469
00470
00471 void
00472 RescheduleJob(int port)
00473 {
00474 Job* job = mJobByPort[port];
00475 if (job)
00476 {
00477 if (job->state != State::RUNNING)
00478 ILOG_ERROR("Unexpected state of job at port " << port << ": "
00479 << job->state);
00480 mJobByPort[port] = 0;
00481 mJobsWaiting.push_front(job);
00482 job->state = State::RESCHEDULED;
00483 job->portUsed = -99;
00484 ILOG_INFO("Job " << job->id << " rescheduled for execution");
00485 }
00486 }
00487
00488 bool mPoisoned;
00489 int mNextJobId;
00490 std::deque<Job*> mJobsWaiting;
00491 std::deque<Job*> mJobsDone;
00492 std::map<int, Job*> mJobByPort;
00493
00494 ILOG_VAR_DECL;
00495
00496 };
00497
00498 ILOG_VAR_INIT(Server, Impala.Job);
00499
00500 }
00501 }
00502
00503 #endif