Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

Server.h

Go to the documentation of this file.
00001 #ifndef Impala_IDash_Server_h
00002 #define Impala_IDash_Server_h
00003 
00004 #include "Basis/ILog.h"
00005 #include "Util/SimpleMap.h"
00006 #include "Core/VideoJob/ServerProxy.h"
00007 #include "Core/IDash/XmlJob.h"
00008 #include "Core/IDash/XmlJobReference.h"
00009 #include "Core/IDash/Href.h"
00010 
00011 namespace Impala
00012 {
00013 namespace Core
00014 {
00015 namespace IDash
00016 {
00017 
00018 
00019 class Server : public Util::ChannelServer
00020 {
00021     typedef Job::State JobState;
00022     typedef Job::State::StateType JobStateType;
00023 
00024 public:
00025 
00026     Server(int port, int nrPorts, CString passwordFile, CString jobServerAddr) 
00027         : Util::ChannelServer(port, nrPorts, true, passwordFile)
00028     {
00029         mJobServer = new VideoJob::ServerProxy(jobServerAddr, passwordFile, 10);
00030     }
00031 
00032     virtual
00033     ~Server()
00034     {
00035     }
00036 
00037     virtual void 
00038     HandleIdle()
00039     {
00040         ChannelServer::HandleIdle();
00041 
00042         // Could show some statistics here
00043     }
00044 
00045 protected:
00046 
00047     virtual int
00048     AcceptRequest(char* buf, int len, int bufSize, CString conn, int port)
00049     {
00050         int rcBase = ChannelServer::AcceptRequest(buf, len, bufSize, conn, port);
00051         if (rcBase >= 0)
00052             return rcBase;
00053 
00054         ILogErrors& errors = ILogErrors::GetInstance();
00055         errors.Mark();
00056 
00057         if (strncmp(buf, "ScheduleJob", 11) == 0)
00058             ScheduleJob(buf, len, bufSize, conn, port);
00059         else if (strncmp(buf, "GetJobs", 7) == 0)
00060             GetJobs(buf, len, bufSize, conn, port);
00061         else if (strncmp(buf, "StatusJob", 9) == 0)
00062             StatusJob(buf, len, bufSize, conn, port);
00063         else if (strncmp(buf, "DeleteCases", 11) == 0)
00064             DeleteCases(buf, len, bufSize, conn, port);
00065         else
00066             return -1;
00067 
00068         if (errors.GetNrErrorsSinceMark() > 0)
00069         {
00070             String last = errors.GetLastErrorSinceMark();
00071             sprintf(buf, "ERROR: 500: Last error at server : %s\0", last.c_str());
00072         }
00073 
00074         return strlen(buf) + 1;
00075     }
00076 
00077     virtual void
00078     ScheduleJob(char* buf, int len, int bufSize, CString conn, int port)
00079     {
00080         String curCon = ConnectionDescr();
00081         ILOG_INFO("ScheduleJob from " << curCon);
00082 
00083         if (!mJobServer->IsConnected())
00084         {
00085             ILOG_ERROR("Cannot do schedule: No connection to jobserver");
00086             return;
00087         }
00088 
00089         if (strncmp(buf, "ScheduleJob:", 12) == 0)
00090         {
00091             Util::StringParser parser(String(buf, len));
00092             parser.Eat("baseUri=");
00093             String baseUri = parser.GetString(';', true);
00094             parser.Eat("job=");
00095             int jobLen = len - parser.Position();
00096             Util::IOBuffer ioBuf(jobLen);
00097             memcpy(ioBuf.GetBuffer(), buf + parser.Position(), jobLen);
00098             XmlJob job("IDashJob", &ioBuf);
00099             if (!job.Valid())
00100             {
00101                 ILOG_ERROR("Invalid job request");
00102                 return;
00103             }
00104 
00105             int dummy;
00106             if (mJobMap.Get(job.GetId(), dummy))
00107             {
00108                 ILOG_ERROR("ScheduleJob: jobId already exists");
00109                 return;
00110             }
00111 
00112             String cmdLine = "do_process ";
00113             if (job.GetAnnotationSet().empty())
00114             {
00115                 if (job.GetVideo().empty())
00116                 {
00117                     cmdLine += "queryset " + job.GetQuerySet();
00118                 }
00119                 else
00120                 {
00121                     cmdLine += "video " + job.GetVideo() + " " + job.GetQuerySet();
00122                 }
00123             }
00124             else
00125             {
00126                 cmdLine = "do_annotator " + job.GetAnnotationSet();
00127             }
00128             int serverJobId = mJobServer->ScheduleJob(cmdLine, 0);
00129             ILOG_INFO("Scheduled job: xmlJobId=" << job.GetId() <<
00130                       ", serverJobId=" << serverJobId << ", cmdLine=" <<
00131                       cmdLine);
00132             mJobMap.Add(job.GetId(), serverJobId);
00133 
00134             String hRef = baseUri + "jobs/" + job.GetId();
00135             XmlJobReference jobRef(hRef);
00136             String jobRefString = jobRef.Export();
00137             sprintf(buf, "JobAccepted:jobRef=%s\0", jobRefString.c_str());
00138         }
00139         else
00140         {
00141             ILOG_ERROR("Invalid schedule request: " << String(buf, len));
00142         }
00143     }
00144 
00145     virtual void
00146     GetJobs(char* buf, int len, int bufSize, CString conn, int port)
00147     {
00148         String curCon = ConnectionDescr();
00149         ILOG_INFO("GetJobs from " << curCon);
00150 
00151         if (!mJobServer->IsConnected())
00152         {
00153             ILOG_ERROR("Cannot do jobs: No connection to jobserver");
00154             return;
00155         }
00156 
00157         if (strncmp(buf, "GetJobs:", 7) == 0)
00158         {
00159             Util::StringParser parser(String(buf, len));
00160             parser.Eat("baseUri=");
00161             String baseUri = parser.GetString(';', true);
00162 
00163             std::vector<String> jobs = mJobMap.GetAllIdx();
00164             String s = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>";
00165             s += "<jobs xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" xmlns=\"http://www.i-dash.eu/MetaData\" >";
00166             for (int i=0 ; i<jobs.size() ; i++)
00167             {
00168                 String url = baseUri + "jobs/" + jobs[i];
00169                 s += "<job xlink:type=\"simple\" xlink:href=\"";
00170                 s += url;
00171                  s += "\" entityType=\"Job\" />";
00172             }
00173             s += "</jobs>";
00174             sprintf(buf, "Jobs:%s\0", s.c_str());
00175         }
00176         else
00177         {
00178             ILOG_ERROR("Invalid jobs request: " << String(buf, len));
00179         }
00180     }
00181 
00182     virtual void
00183     StatusJob(char* buf, int len, int bufSize, CString conn, int port)
00184     {
00185         String curCon = ConnectionDescr();
00186         ILOG_INFO("StatusJob from " << curCon);
00187 
00188         if (!mJobServer->IsConnected())
00189         {
00190             ILOG_ERROR("Cannot do status: No connection to jobserver");
00191             return;
00192         }
00193 
00194         if (strncmp(buf, "StatusJob:job=", 14) == 0)
00195         {
00196             Util::StringParser parser(String(buf, len));
00197             parser.Eat('=');
00198             String jobId = parser.GetString(';', false);
00199 
00200             int serverJobId;
00201             if (!mJobMap.Get(jobId, serverJobId))
00202             {
00203                 ILOG_ERROR("StatusJob: no jobId " << jobId);
00204                 return;
00205             }
00206 
00207             Job::State::StateType state;
00208             int exitCode;
00209             String errorLog;
00210             mJobServer->GetJobState(serverJobId, state, exitCode, errorLog);
00211             ILOG_INFO("state = " << Job::State::ToString(state) <<
00212                       ", exitCode = " << exitCode <<
00213                       ", errorLog = " << errorLog);
00214 
00215             String status = "initial";
00216             String progress = "0";
00217             if (state == Job::State::RUNNING)
00218             {
00219                 status = "running";
00220                 progress = "50";
00221             }
00222             if (state == Job::State::TERMINATED_NRM)
00223             {
00224                 if (exitCode == 0)
00225                     status = "completed";
00226                 else
00227                     status = "error";
00228                 progress = "100";
00229             }
00230             if (state == Job::State::TERMINATED_ABNRM)
00231             {
00232                 status = "error";
00233                 progress = "100";
00234             }
00235 
00236             XmlJob job(jobId, "", "", status, errorLog, progress);
00237             String jobString = job.Export();
00238             sprintf(buf, "JobStatus:job=%s\0", jobString.c_str());
00239         }
00240         else
00241         {
00242             ILOG_ERROR("Invalid job specification: " << String(buf, len));
00243         }
00244     }
00245 
00246     virtual void
00247     DeleteCases(char* buf, int len, int bufSize, CString conn, int port)
00248     {
00249         String curCon = ConnectionDescr();
00250         ILOG_INFO("DeleteCases from " << curCon);
00251 
00252         if (!mJobServer->IsConnected())
00253         {
00254             ILOG_ERROR("Cannot do delete: No connection to jobserver");
00255             return;
00256         }
00257 
00258         String cmdLine = "do_clean_vds; do_clean_annotator";
00259         int serverJobId = mJobServer->ScheduleJob(cmdLine, 0);
00260         ILOG_INFO("Scheduled job: serverJobId=" << serverJobId <<
00261                   ", cmdLine=" << cmdLine);
00262         mJobMap.Clear();
00263     }
00264 
00265 private:
00266 
00267     VideoJob::ServerProxy*      mJobServer;
00268     Util::SimpleMap<String,int> mJobMap;
00269 
00270     ILOG_VAR_DECL;
00271 
00272 };
00273 
00274 ILOG_VAR_INIT(Server, Impala.Core.IDash);
00275 
00276 } // namespace IDash
00277 } // namespace Core
00278 } // namespace Impala
00279 
00280 #endif

Generated on Fri Mar 19 09:31:11 2010 for ImpalaSrc by  doxygen 1.5.1