00001 #ifndef Impala_IDash_Server_h
00002 #define Impala_IDash_Server_h
00003
00004 #include "Basis/ILog.h"
00005 #include "Util/SimpleMap.h"
00006 #include "Core/VideoJob/ServerProxy.h"
00007 #include "Core/IDash/XmlJob.h"
00008 #include "Core/IDash/XmlJobReference.h"
00009 #include "Core/IDash/Href.h"
00010
00011 namespace Impala
00012 {
00013 namespace Core
00014 {
00015 namespace IDash
00016 {
00017
00018
00019 class Server : public Util::ChannelServer
00020 {
00021 typedef Job::State JobState;
00022 typedef Job::State::StateType JobStateType;
00023
00024 public:
00025
00026 Server(int port, int nrPorts, CString passwordFile, CString jobServerAddr)
00027 : Util::ChannelServer(port, nrPorts, true, passwordFile)
00028 {
00029 mJobServer = new VideoJob::ServerProxy(jobServerAddr, passwordFile, 10);
00030 }
00031
00032 virtual
00033 ~Server()
00034 {
00035 }
00036
00037 virtual void
00038 HandleIdle()
00039 {
00040 ChannelServer::HandleIdle();
00041
00042
00043 }
00044
00045 protected:
00046
00047 virtual int
00048 AcceptRequest(char* buf, int len, int bufSize, CString conn, int port)
00049 {
00050 int rcBase = ChannelServer::AcceptRequest(buf, len, bufSize, conn, port);
00051 if (rcBase >= 0)
00052 return rcBase;
00053
00054 ILogErrors& errors = ILogErrors::GetInstance();
00055 errors.Mark();
00056
00057 if (strncmp(buf, "ScheduleJob", 11) == 0)
00058 ScheduleJob(buf, len, bufSize, conn, port);
00059 else if (strncmp(buf, "GetJobs", 7) == 0)
00060 GetJobs(buf, len, bufSize, conn, port);
00061 else if (strncmp(buf, "StatusJob", 9) == 0)
00062 StatusJob(buf, len, bufSize, conn, port);
00063 else if (strncmp(buf, "DeleteCases", 11) == 0)
00064 DeleteCases(buf, len, bufSize, conn, port);
00065 else
00066 return -1;
00067
00068 if (errors.GetNrErrorsSinceMark() > 0)
00069 {
00070 String last = errors.GetLastErrorSinceMark();
00071 sprintf(buf, "ERROR: 500: Last error at server : %s\0", last.c_str());
00072 }
00073
00074 return strlen(buf) + 1;
00075 }
00076
00077 virtual void
00078 ScheduleJob(char* buf, int len, int bufSize, CString conn, int port)
00079 {
00080 String curCon = ConnectionDescr();
00081 ILOG_INFO("ScheduleJob from " << curCon);
00082
00083 if (!mJobServer->IsConnected())
00084 {
00085 ILOG_ERROR("Cannot do schedule: No connection to jobserver");
00086 return;
00087 }
00088
00089 if (strncmp(buf, "ScheduleJob:", 12) == 0)
00090 {
00091 Util::StringParser parser(String(buf, len));
00092 parser.Eat("baseUri=");
00093 String baseUri = parser.GetString(';', true);
00094 parser.Eat("job=");
00095 int jobLen = len - parser.Position();
00096 Util::IOBuffer ioBuf(jobLen);
00097 memcpy(ioBuf.GetBuffer(), buf + parser.Position(), jobLen);
00098 XmlJob job("IDashJob", &ioBuf);
00099 if (!job.Valid())
00100 {
00101 ILOG_ERROR("Invalid job request");
00102 return;
00103 }
00104
00105 int dummy;
00106 if (mJobMap.Get(job.GetId(), dummy))
00107 {
00108 ILOG_ERROR("ScheduleJob: jobId already exists");
00109 return;
00110 }
00111
00112 String cmdLine = "do_process ";
00113 if (job.GetAnnotationSet().empty())
00114 {
00115 if (job.GetVideo().empty())
00116 {
00117 cmdLine += "queryset " + job.GetQuerySet();
00118 }
00119 else
00120 {
00121 cmdLine += "video " + job.GetVideo() + " " + job.GetQuerySet();
00122 }
00123 }
00124 else
00125 {
00126 cmdLine = "do_annotator " + job.GetAnnotationSet();
00127 }
00128 int serverJobId = mJobServer->ScheduleJob(cmdLine, 0);
00129 ILOG_INFO("Scheduled job: xmlJobId=" << job.GetId() <<
00130 ", serverJobId=" << serverJobId << ", cmdLine=" <<
00131 cmdLine);
00132 mJobMap.Add(job.GetId(), serverJobId);
00133
00134 String hRef = baseUri + "jobs/" + job.GetId();
00135 XmlJobReference jobRef(hRef);
00136 String jobRefString = jobRef.Export();
00137 sprintf(buf, "JobAccepted:jobRef=%s\0", jobRefString.c_str());
00138 }
00139 else
00140 {
00141 ILOG_ERROR("Invalid schedule request: " << String(buf, len));
00142 }
00143 }
00144
00145 virtual void
00146 GetJobs(char* buf, int len, int bufSize, CString conn, int port)
00147 {
00148 String curCon = ConnectionDescr();
00149 ILOG_INFO("GetJobs from " << curCon);
00150
00151 if (!mJobServer->IsConnected())
00152 {
00153 ILOG_ERROR("Cannot do jobs: No connection to jobserver");
00154 return;
00155 }
00156
00157 if (strncmp(buf, "GetJobs:", 7) == 0)
00158 {
00159 Util::StringParser parser(String(buf, len));
00160 parser.Eat("baseUri=");
00161 String baseUri = parser.GetString(';', true);
00162
00163 std::vector<String> jobs = mJobMap.GetAllIdx();
00164 String s = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>";
00165 s += "<jobs xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" xmlns=\"http://www.i-dash.eu/MetaData\" >";
00166 for (int i=0 ; i<jobs.size() ; i++)
00167 {
00168 String url = baseUri + "jobs/" + jobs[i];
00169 s += "<job xlink:type=\"simple\" xlink:href=\"";
00170 s += url;
00171 s += "\" entityType=\"Job\" />";
00172 }
00173 s += "</jobs>";
00174 sprintf(buf, "Jobs:%s\0", s.c_str());
00175 }
00176 else
00177 {
00178 ILOG_ERROR("Invalid jobs request: " << String(buf, len));
00179 }
00180 }
00181
00182 virtual void
00183 StatusJob(char* buf, int len, int bufSize, CString conn, int port)
00184 {
00185 String curCon = ConnectionDescr();
00186 ILOG_INFO("StatusJob from " << curCon);
00187
00188 if (!mJobServer->IsConnected())
00189 {
00190 ILOG_ERROR("Cannot do status: No connection to jobserver");
00191 return;
00192 }
00193
00194 if (strncmp(buf, "StatusJob:job=", 14) == 0)
00195 {
00196 Util::StringParser parser(String(buf, len));
00197 parser.Eat('=');
00198 String jobId = parser.GetString(';', false);
00199
00200 int serverJobId;
00201 if (!mJobMap.Get(jobId, serverJobId))
00202 {
00203 ILOG_ERROR("StatusJob: no jobId " << jobId);
00204 return;
00205 }
00206
00207 Job::State::StateType state;
00208 int exitCode;
00209 String errorLog;
00210 mJobServer->GetJobState(serverJobId, state, exitCode, errorLog);
00211 ILOG_INFO("state = " << Job::State::ToString(state) <<
00212 ", exitCode = " << exitCode <<
00213 ", errorLog = " << errorLog);
00214
00215 String status = "initial";
00216 String progress = "0";
00217 if (state == Job::State::RUNNING)
00218 {
00219 status = "running";
00220 progress = "50";
00221 }
00222 if (state == Job::State::TERMINATED_NRM)
00223 {
00224 if (exitCode == 0)
00225 status = "completed";
00226 else
00227 status = "error";
00228 progress = "100";
00229 }
00230 if (state == Job::State::TERMINATED_ABNRM)
00231 {
00232 status = "error";
00233 progress = "100";
00234 }
00235
00236 XmlJob job(jobId, "", "", status, errorLog, progress);
00237 String jobString = job.Export();
00238 sprintf(buf, "JobStatus:job=%s\0", jobString.c_str());
00239 }
00240 else
00241 {
00242 ILOG_ERROR("Invalid job specification: " << String(buf, len));
00243 }
00244 }
00245
00246 virtual void
00247 DeleteCases(char* buf, int len, int bufSize, CString conn, int port)
00248 {
00249 String curCon = ConnectionDescr();
00250 ILOG_INFO("DeleteCases from " << curCon);
00251
00252 if (!mJobServer->IsConnected())
00253 {
00254 ILOG_ERROR("Cannot do delete: No connection to jobserver");
00255 return;
00256 }
00257
00258 String cmdLine = "do_clean_vds; do_clean_annotator";
00259 int serverJobId = mJobServer->ScheduleJob(cmdLine, 0);
00260 ILOG_INFO("Scheduled job: serverJobId=" << serverJobId <<
00261 ", cmdLine=" << cmdLine);
00262 mJobMap.Clear();
00263 }
00264
00265 private:
00266
00267 VideoJob::ServerProxy* mJobServer;
00268 Util::SimpleMap<String,int> mJobMap;
00269
00270 ILOG_VAR_DECL;
00271
00272 };
00273
00274 ILOG_VAR_INIT(Server, Impala.Core.IDash);
00275
00276 }
00277 }
00278 }
00279
00280 #endif