Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

Server.h

Go to the documentation of this file.
00001 #ifndef Impala_Job_Server_h
00002 #define Impala_Job_Server_h
00003 
00004 #include <deque>
00005 #include <map>
00006 
00007 #include <strstream>
00008 
00009 #include "Basis/ILog.h"
00010 #include "Util/StringParser.h"
00011 #include "Util/ChannelServer.h"
00012 #include "Job/State.h"
00013 
00014 namespace Impala
00015 {
00016 namespace Job
00017 {
00018 
00019 /******************************************************************
00020  Instances of this class can serve jobs to job processing clients.
00021 *******************************************************************/
00022 
00023 class Server : public Util::ChannelServer
00024 {
00025     typedef Job::State JobState;
00026     typedef Job::State::StateType JobStateType;
00027 
00028 public: 
00029 
00030     typedef struct Job
00031     {
00032         // 'public':
00033         int id; 
00034         String jobSpec; 
00035         JobStateType state;
00036         int exitCode;
00037         String errorLog;
00038 
00039         // 'private':
00040         //String srcAddress;
00041         int srcPort;
00042         int portUsed;
00043         String agent;
00044     } Job;
00045 
00046     Server(int port, int nrPorts, CString passwordFile) 
00047         : Util::ChannelServer(port, nrPorts, true, passwordFile)
00048     {
00049         Init();
00050     }
00051 
00052     virtual
00053     ~Server()
00054     {
00055         int len = mJobsDone.size();
00056         for (int i = 0; i < len; i++)
00057             delete mJobsDone[i];
00058 
00059         len = mJobsWaiting.size();
00060         for (int i = 0; i < len; i++)
00061             delete mJobsWaiting[i];
00062 
00063         int maxPort = FirstPort() + NrOfPorts() - 1;
00064         for (int port = (FirstPort() + 1); port <= maxPort; port++)
00065         {
00066             if (mJobByPort[port] != 0)
00067             {
00068                 Job* job = mJobByPort[port];
00069                 delete job;
00070                 mJobByPort[port] = 0;
00071             }
00072         }
00073     }
00074 
00075     virtual void 
00076     HandleIdle()
00077     {
00078         ChannelServer::HandleIdle();
00079 
00080         int jobsWaiting = mJobsWaiting.size();
00081         int jobsDone = mJobsDone.size();
00082         int runningJobs = 0;
00083         int maxPort = FirstPort() + NrOfPorts() - 1;
00084         for (int port = (FirstPort() + 1); port <= maxPort; port++)
00085             if (mJobByPort[port] != 0)
00086                 runningJobs++;
00087         ILOG_DEBUG("Jobs: waiting=" << jobsWaiting << ", running=" <<
00088                    runningJobs << ", done=" << jobsDone);
00089     }
00090 
00091     virtual void 
00092     HandleDisconnect(int port)
00093     {
00094         ChannelServer::HandleDisconnect(port);
00095         RescheduleJob(port);
00096         RemoveJobsAccepted(port);
00097     }
00098 
00099     bool 
00100     JobQueueFull()
00101     {
00102         return (mJobsWaiting.size() > 1000000);
00103     }
00104 
00105     // Adds a new job to the job queue
00106     int 
00107     AddToJobQueue(Job* job)
00108     {
00109         if (JobQueueFull())
00110         {
00111             ILOG_ERROR("Job queue full; cannot add job");
00112             return 0;
00113         }
00114         else
00115         {
00116             job->id = mNextJobId++; // doesnt account for overflow
00117             ILOG_INFO("Assigning job ID: " << job->id);
00118             job->state = State::SCHEDULED;
00119             job->portUsed = -1; // undefined
00120             job->exitCode = -99; // undefined
00121             mJobsWaiting.push_back(job);
00122             return job->id;
00123         }
00124     }
00125 
00126     Job* 
00127     FindJob(int jobId)
00128     {
00129         // check running jobs...
00130         int maxPort = FirstPort() + NrOfPorts() - 1;
00131         for (int port = (FirstPort() + 1); port <= maxPort; port++)
00132             if (mJobByPort[port] != 0 && mJobByPort[port]->id == jobId)
00133                 return mJobByPort[port];
00134 
00135         // ...and new jobs...
00136         int len = mJobsWaiting.size();
00137         for (int i = 0; i < len; i++)
00138             if (mJobsWaiting[i]->id == jobId)
00139                 return mJobsWaiting[i];
00140 
00141         // ...and terminated job
00142         len = mJobsDone.size();
00143         for (int i = 0; i < len; i++)
00144             if (mJobsDone[i]->id == jobId)
00145                 return mJobsDone[i];
00146 
00147         //ILOG_ERROR("Job ID " << jobId << " unknown");
00148         return 0;
00149     }
00150 
00151     // Pops the next job to run from the queue 
00152     Job* 
00153     PopFirstFromJobQueue()
00154     {
00155         Job* job = 0;
00156         if (mJobsWaiting.empty())
00157         {
00158             ILOG_ERROR("Job queue empty; cannot pop job");
00159         }
00160         else
00161         {
00162             job = mJobsWaiting[0];
00163             mJobsWaiting.pop_front();
00164         }
00165         return job;
00166     }
00167 
00168 protected:
00169 
00170     virtual int
00171     AcceptRequest(char* buf, int len, int bufSize, CString conn, int port)
00172     {
00173         int rcBase = ChannelServer::AcceptRequest(buf, len, bufSize, conn, port);
00174         if (rcBase >= 0)
00175             return rcBase;
00176 
00177         ILogErrors& errors = ILogErrors::GetInstance();
00178         errors.Mark();
00179 
00180         if (strncmp(buf, "AcceptJob", 9) == 0)
00181             AcceptNewJob(buf, len, bufSize, conn, port);
00182         else if (strncmp(buf, "ProvideStatus", 13) == 0)
00183             AcceptRequestForStatus(buf, len, bufSize, conn, port);
00184         else if (strncmp(buf, "ProvideJob", 10) == 0)
00185             AcceptRequestForJob(buf, len, bufSize, conn, port);
00186         else if (strncmp(buf, "AcceptStatus", 12) == 0)
00187             AcceptJobStatus(buf, len, bufSize, conn, port);
00188         else if (strncmp(buf, "Poison", 6) == 0)
00189             mPoisoned = !mPoisoned;
00190         else
00191             return -1;
00192 
00193         if (errors.GetNrErrorsSinceMark() > 0)
00194         {
00195             String last = errors.GetLastErrorSinceMark();
00196             sprintf(buf, "ERROR : last error at server : %s\0", last.c_str());
00197         }
00198 
00199         return strlen(buf) + 1;
00200     }
00201 
00202     virtual void
00203     AcceptNewJob(char* buf, int len, int bufSize, CString conn, int port)
00204     {
00205         String curCon = ConnectionDescr();
00206         ILOG_INFO("Receiving new job from " << curCon);
00207         if (JobQueueFull())
00208         {
00209             ILOG_WARN("Job queue full, cannot accept new job");
00210             sprintf(buf, "MaxJobs\0");
00211         }
00212         else
00213         {
00214             Util::StringParser parser(String(buf, len));
00215             parser.GetString(':');
00216             if (parser.GetString('=') == "cmdLine")
00217             {
00218                 String cmdLine = parser.GetString('\0');
00219                 Job* job = new Job();
00220                 job->srcPort = port;
00221                 job->jobSpec = cmdLine;
00222                 int jobId = AddToJobQueue(job);
00223                 sprintf(buf, "JobAccepted:jobId=%i\0", jobId);
00224             }
00225             else
00226             {
00227                 ILOG_ERROR("Invalid job specification: " << String(buf, len));
00228                 sprintf(buf, "MaxJobs\0");
00229             }
00230         }
00231     }
00232 
00233     // SK: currently only job termination code; should perhaps be extended
00234     virtual void
00235     AcceptRequestForStatus(char* buf, int len, int bufSize, CString conn,
00236                            int port)
00237     {
00238         String curCon = ConnectionDescr();
00239         Util::StringParser parser(buf);
00240         parser.GetString(':');
00241         String requestKey = parser.GetString('=');
00242         if (requestKey == "jobId")
00243         {
00244             int jobId = parser.GetInt();
00245             if (jobId > 0)
00246             {
00247                 ILOG_DEBUG("Receiving request for report on job " << jobId <<
00248                            " from " << curCon);
00249                 Job* job = FindJob(jobId);
00250                 if (job)
00251                 {
00252                     sprintf(buf,
00253                             "JobStatus:jobId=%d;state=%d;exitCode=%d;errorLog=%s\0",
00254                             jobId, job->state, job->exitCode,
00255                             job->errorLog.c_str());
00256                     return;
00257                 }
00258                 else
00259                 {
00260                     ILOG_ERROR("Job ID unknown: " << jobId);
00261                 }
00262             }
00263             else
00264             {
00265                 ILOG_ERROR("No job ID specified: " << String(buf, len));
00266             }
00267         }
00268         else if (requestKey == "reference")
00269         {
00270             ILOG_DEBUG("Receiving request for job state changes from " <<
00271                        curCon);
00272             std::map<int, int> refList;
00273             int jobId = parser.GetInt(':', false, true);
00274             while (jobId > 0)
00275             {
00276                 JobStateType state = (JobStateType) parser.GetInt(' ', true,
00277                                                                   true);
00278                 refList[jobId] = state;
00279                 jobId = parser.GetInt(':', false, true);
00280             }
00281 
00282             int nrOfChanges = 0;
00283             std::ostringstream oss;
00284             std::map<int, int>::iterator iter = refList.begin();
00285             for ( ; iter != refList.end(); iter++)
00286             {
00287                 int jobId = iter->first;
00288                 Job* job = FindJob(jobId);
00289                 if (job)
00290                 {
00291                     if (job->state != iter->second)
00292                     {
00293                         oss << jobId << ':' << job->state << " ";
00294                         nrOfChanges++;
00295                     }
00296                 }
00297                 else
00298                 {
00299                     ILOG_ERROR("Job ID unknown: " << jobId);
00300                 }
00301             }
00302             if (nrOfChanges > 0)
00303                 sprintf(buf, "JobStatus:changed=%s\0", oss.str().c_str());
00304             else
00305                 sprintf(buf, "NoChange\0");
00306             return;
00307         }
00308         else
00309         {
00310            ILOG_ERROR("Invalid job report request: " << String(buf, len));
00311         }
00312 
00313         sprintf(buf, "ERROR\0");
00314     }
00315 
00316     virtual void
00317     AcceptRequestForJob(char* buf, int len, int bufSize, CString conn, int port)
00318     {
00319         String curCon = ConnectionDescr();
00320         ILOG_DEBUG("Receiving job request from " << curCon);
00321         if (mJobByPort[port] != 0)
00322         {
00323             ILOG_WARN("Unexpected job request from " << curCon);
00324             sprintf(buf, "ERROR\0");
00325             return;
00326         }
00327 
00328         if (mPoisoned)
00329         {
00330             ILOG_DEBUG("Poisoning " << curCon);
00331             sprintf(buf, "Poison\0");
00332             return;
00333         }
00334 
00335         if (mJobsWaiting.empty())
00336         {
00337             ILOG_DEBUG("No job for " << curCon);
00338             sprintf(buf, "NoJob\0");
00339             return;
00340         }
00341 
00342         Job* job = PopFirstFromJobQueue();
00343         job->portUsed = port;
00344         job->agent = conn;
00345         job->state = State::RUNNING;
00346 
00347         mJobByPort[port] = job;
00348         ILOG_INFO("Serving job " << job->id << " to " << curCon);
00349         sprintf(buf, "RunJob:jobId=%d;cmdLine=%s\0", job->id,
00350                 job->jobSpec.c_str());
00351     }
00352 
00353     virtual void
00354     AcceptJobStatus(char* buf, int len, int bufSize, CString conn, int port)
00355     {
00356         String curCon = ConnectionDescr();
00357         ILOG_DEBUG("Receiving job report from " << curCon);
00358         Job* job = mJobByPort[port];
00359         if (job == 0)
00360         {
00361             ILOG_WARN("Unexpected job report from " << curCon);
00362             sprintf(buf, "ERROR\0");
00363             return;
00364         }
00365 
00366         String bufStr(buf);
00367         Util::StringParser parser(bufStr);
00368         parser.GetString(':');
00369         if (parser.GetString('=') == "jobId")
00370         {
00371             int jobId = parser.GetInt(';', true, true);
00372             if (jobId == job->id)
00373             {
00374                 if (parser.GetString('=') == "state")
00375                 {
00376                     JobStateType reportedState =
00377                         (JobStateType) parser.GetInt(';', false, true);
00378                     if (reportedState == State::RUNNING)
00379                     {
00380                         if (job->state == State::KILL_ATTEMPT)
00381                         {
00382                             ILOG_WARN("Requesting " << curCon << " to kill job "
00383                                       << jobId);
00384                             sprintf(buf, "Kill\0");
00385                         }
00386                         else
00387                         {
00388                             sprintf(buf, "OK\0");
00389                         }
00390                         return;
00391                     }
00392                     else if (reportedState == State::TERMINATED_NRM ||
00393                              reportedState == State::TERMINATED_ABNRM)
00394                     {
00395                         if (job->state == State::KILL_ATTEMPT)
00396                             ILOG_WARN("Successfully completed kill for job "
00397                                       << job->id);
00398                         job->state = reportedState;
00399                         if (parser.GetString('=') == "exitCode")
00400                             job->exitCode = parser.GetInt(';', true, true);
00401                         if (parser.GetString('=') == "errorLog")
00402                             job->errorLog = bufStr.substr(parser.Position());
00403                         mJobsDone.push_back(job);
00404                         mJobByPort[port] = 0;
00405                         sprintf(buf, "OK\0");
00406                         return;
00407                     }
00408                 }
00409             }
00410         }
00411         ILOG_WARN("Invalid job report (from " << curCon << ") : " <<
00412                   String(buf, len));
00413         sprintf(buf, "ERROR\0");
00414     }
00415 
00416 
00417 private:
00418 
00419     void 
00420     Init()
00421     {
00422         mNextJobId = 1;
00423         mPoisoned = false;
00424     }
00425 
00426     void
00427     RemoveJobsAccepted(int srcPort)
00428     {
00429         CString logInformation = "Job source at port " + MakeString(srcPort) +
00430             " disconnected; removing its jobs";
00431         bool loggedInformational = false;
00432 
00433         std::deque<Job*>::iterator iter = mJobsWaiting.begin();
00434         while (iter != mJobsWaiting.end())
00435         {
00436             if ((*iter)->srcPort == srcPort)
00437             {
00438                 if (!loggedInformational)
00439                     ILOG_WARN(logInformation);
00440                 loggedInformational = true;
00441 
00442                 ILOG_INFO("Removed pending job " << (*iter)->id <<
00443                           " accepted at port " << srcPort);
00444                 delete *iter;
00445                 iter = mJobsWaiting.erase(iter);
00446             }
00447             else
00448             {
00449                 iter++;
00450             }
00451         }
00452 
00453         int maxPort = FirstPort() + NrOfPorts() - 1;
00454         for (int port = (FirstPort() + 1); port <= maxPort; port++)
00455         {
00456             Job* job = mJobByPort[port];
00457             if (job && job->srcPort == srcPort)
00458             {
00459                 if (!loggedInformational)
00460                     ILOG_WARN(logInformation);
00461                 loggedInformational = true;
00462 
00463                 ILOG_INFO("Requesting to kill job " << job->id << " run by "
00464                           << job->agent << " at port " << port);
00465                 job->state = State::KILL_ATTEMPT;
00466             }
00467         }
00468     }
00469 
00470     // If the specified port was assigned a job, the job will be rescheduled.
00471     void
00472     RescheduleJob(int port)
00473     {
00474         Job* job = mJobByPort[port];
00475         if (job)
00476         {
00477             if (job->state != State::RUNNING)
00478                 ILOG_ERROR("Unexpected state of job at port " << port << ": "
00479                            << job->state);
00480             mJobByPort[port] = 0;
00481             mJobsWaiting.push_front(job);
00482             job->state = State::RESCHEDULED;
00483             job->portUsed = -99; // undefined
00484             ILOG_INFO("Job " << job->id << " rescheduled for execution");
00485         }
00486     }
00487 
00488     bool mPoisoned;
00489     int mNextJobId;
00490     std::deque<Job*> mJobsWaiting;
00491     std::deque<Job*> mJobsDone;
00492     std::map<int, Job*> mJobByPort;
00493 
00494     ILOG_VAR_DECL;
00495 
00496 }; // class
00497 
00498 ILOG_VAR_INIT(Server, Impala.Job);
00499 
00500 } // namespace 
00501 } // namespace 
00502 
00503 #endif

Generated on Fri Mar 19 09:31:11 2010 for ImpalaSrc by  doxygen 1.5.1