00001 #ifndef Impala_Core_VideoJob_ServerProxy_h
00002 #define Impala_Core_VideoJob_ServerProxy_h
00003
00004 #include <map>
00005 #include <sstream>
00006
00007 #include "Basis/ILog.h"
00008 #include "Util/StringParser.h"
00009 #include "Util/ChannelProxy.h"
00010 #include "Util/Sleep.h"
00011 #include "Job/State.h"
00012 #include "Core/VideoJob/Data.h"
00013
00014 namespace Impala
00015 {
00016 namespace Core
00017 {
00018 namespace VideoJob
00019 {
00020
00021
00022
00023
00024
00025
00026 class ServerProxy
00027 {
00028 typedef Job::State::StateType StateType;
00029
00030 public:
00031
00032 ServerProxy(CString socketName, CString passwordFile, int requestIntervalSec)
00033 {
00034 int colonPos = socketName.find(":");
00035 if (colonPos <= 0)
00036 {
00037 ILOG_ERROR("Not a valid server address (port number is missing): "
00038 << socketName);
00039 return;
00040 }
00041 Init(socketName.substr(0, colonPos),
00042 atol(socketName.substr(colonPos + 1)),
00043 passwordFile, requestIntervalSec);
00044 }
00045
00046 ServerProxy(CString serverAddress, int port, CString passwordFile,
00047 int requestIntervalSec)
00048 {
00049 Init(serverAddress, port, passwordFile, requestIntervalSec);
00050 }
00051
00052 virtual
00053 ~ServerProxy()
00054 {
00055 Disconnect();
00056 }
00057
00058 bool
00059 IsConnected()
00060 {
00061 return (mChannelProxy && mChannelProxy->ChannelIsValid());
00062 }
00063
00064
00065 int
00066 ScheduleJob(CString cmdLine, int realVideoNr, bool delayFirst = false)
00067 {
00068 if (!IsConnected())
00069 return 0;
00070
00071 if (delayFirst)
00072 {
00073 ILOG_DEBUG("Delaying job dispatch");
00074 Util::Sleep(mRequestIntervalSec);
00075 }
00076
00077 ILOG_INFO("Dispatching job to server (actual video nr = " <<
00078 realVideoNr << ") : " << cmdLine );
00079 String request = "AcceptJob:cmdLine=" + cmdLine;
00080 String response = mChannelProxy->Send(request);
00081 int jobId = 0;
00082 if (response.substr(0, 18) == "JobAccepted:jobId=")
00083 {
00084 jobId = atoi(response.substr(18));
00085 if (jobId > 0)
00086 {
00087 ILOG_DEBUG("Received assigned job ID: " << jobId);
00088 mJobIds.push_back(jobId);
00089 }
00090 else
00091 {
00092 ILOG_ERROR("Received invalid job ID: " << jobId);
00093 }
00094 }
00095 else if (response == "MaxJobs")
00096 {
00097 ILOG_WARN("Job dispatch rejected: job max reached");
00098 }
00099 else
00100 {
00101 ILOG_ERROR("Job dispatch failed: " << response);
00102 }
00103
00104 return jobId;
00105 }
00106
00107
00108
00109 std::map<int, StateType>&
00110 MonitorJobState(Data& vpData)
00111 {
00112 static std::map<int, StateType> jobsWithStateChanged;
00113 jobsWithStateChanged.clear();
00114
00115 std::ostringstream oss;
00116 std::map<int, Data::VideoJob*>::iterator jobIter =
00117 vpData.mJobsScheduled.begin();
00118 for ( ; jobIter != vpData.mJobsScheduled.end(); jobIter++ )
00119 {
00120 Data::VideoJob& job = *(jobIter->second);
00121 oss << job.id << ':' << job.state << ' ';
00122 }
00123 String request = "ProvideStatus:reference=" + oss.str();
00124
00125 do
00126 {
00127 String response = mChannelProxy->Send(request);
00128 Util::StringParser parser(response);
00129 String responseType = parser.GetString(':', false);
00130 if (responseType == "NoChange")
00131 {
00132 Util::Sleep(mRequestIntervalSec);
00133 }
00134 else if (responseType == "JobStatus" &&
00135 parser.GetString('=') == "changed")
00136 {
00137 int jobId = parser.GetInt(':', false, true);
00138 while (jobId > 0)
00139 {
00140 StateType jobState = (StateType) parser.GetInt(' ', true,
00141 true);
00142 jobsWithStateChanged[jobId] = jobState;
00143 jobId = parser.GetInt(':', false, true);
00144 }
00145 }
00146 else
00147 {
00148 ILOG_ERROR("Job report request failed: " << response);
00149 jobsWithStateChanged.clear();
00150 break;
00151 }
00152 }
00153 while (jobsWithStateChanged.empty());
00154
00155 return jobsWithStateChanged;
00156 }
00157
00158 int
00159 GetJobState(int jobId, StateType& state, int& exitCode, String& errorLog)
00160 {
00161 String request = "ProvideStatus:jobId=" + MakeString(jobId);
00162 String response = mChannelProxy->Send(request);
00163 Util::StringParser parser(response);
00164 if (parser.GetString(':') == "JobStatus" &&
00165 parser.GetString('=') == "jobId" &&
00166 parser.GetInt(';', true, true) == jobId &&
00167 parser.GetString('=') == "state")
00168 {
00169 state = (StateType) parser.GetInt(';', true, true);
00170 if (parser.GetString('=') == "exitCode")
00171 {
00172 exitCode = parser.GetInt(';', true, true);
00173 if (parser.GetString('=') == "errorLog")
00174 errorLog = response.substr(parser.Position());
00175 return 0;
00176 }
00177 }
00178 ILOG_ERROR("Job report request failed: " << response);
00179 return -1;
00180 }
00181
00182 private:
00183
00184 void
00185 Init(CString serverAddress, int port, CString passwordFile,
00186 int requestIntervalSec)
00187 {
00188 mServerAddress = serverAddress;
00189 mPort = port;
00190 mPasswordFile = passwordFile;
00191 mRequestIntervalSec = requestIntervalSec;
00192
00193 mChannelProxy = 0;
00194
00195 if (! Connect(mServerAddress, mPort, mPasswordFile))
00196 {
00197 ILOG_ERROR("Cannot connect to job server at " << mServerAddress <<
00198 ':' << mPort);
00199 }
00200 }
00201
00202 bool
00203 Connect(CString serverAddress, int port, CString passwordFile,
00204 int timeOutSec = -1)
00205 {
00206 mChannelProxy = new Util::ChannelProxy(serverAddress, port,
00207 passwordFile);
00208 if (!mChannelProxy->GetChannel())
00209 {
00210 ILOG_ERROR("Failed to connect to job server");
00211 Disconnect();
00212 return false;
00213 }
00214
00215 return true;
00216 }
00217
00218 void
00219 Disconnect()
00220 {
00221 if (mChannelProxy)
00222 {
00223
00224 delete mChannelProxy;
00225 mChannelProxy = 0;
00226 }
00227 }
00228
00229 ILOG_VAR_DECL;
00230
00231 String mServerAddress;
00232 int mPort;
00233 String mPasswordFile;
00234 int mRequestIntervalSec;
00235
00236 Util::ChannelProxy* mChannelProxy;
00237
00238 std::vector<int> mJobIds;
00239
00240 };
00241
00242 ILOG_VAR_INIT(ServerProxy, Impala.Core.VideoJob);
00243
00244 }
00245 }
00246 }
00247
00248 #endif