Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

ServerProxy.h

Go to the documentation of this file.
00001 #ifndef Impala_Core_VideoJob_ServerProxy_h
00002 #define Impala_Core_VideoJob_ServerProxy_h
00003 
00004 #include <map>
00005 #include <sstream>
00006 
00007 #include "Basis/ILog.h"
00008 #include "Util/StringParser.h"
00009 #include "Util/ChannelProxy.h"
00010 #include "Util/Sleep.h"
00011 #include "Job/State.h"
00012 #include "Core/VideoJob/Data.h"
00013 
00014 namespace Impala
00015 {
00016 namespace Core
00017 {
00018 namespace VideoJob
00019 {
00020 
00021 /***********************************************************************
00022 Instances of this class are responsible for dispatching jobs to a job
00023 server and monitoring job status.
00024 ************************************************************************/
00025 
00026 class ServerProxy
00027 {
00028     typedef Job::State::StateType StateType;
00029 
00030 public:
00031 
00032     ServerProxy(CString socketName, CString passwordFile, int requestIntervalSec)
00033     {
00034         int colonPos = socketName.find(":");
00035         if (colonPos <= 0)
00036         {
00037             ILOG_ERROR("Not a valid server address (port number is missing): "
00038                        << socketName);
00039             return;
00040         }
00041         Init(socketName.substr(0, colonPos),
00042              atol(socketName.substr(colonPos + 1)),
00043              passwordFile, requestIntervalSec);
00044     }
00045 
00046     ServerProxy(CString serverAddress, int port, CString passwordFile,
00047                 int requestIntervalSec)
00048     {
00049         Init(serverAddress, port, passwordFile, requestIntervalSec);
00050     }
00051 
00052     virtual
00053     ~ServerProxy()
00054     {
00055         Disconnect();
00056     }
00057 
00058     bool
00059     IsConnected()
00060     {
00061         return (mChannelProxy && mChannelProxy->ChannelIsValid());
00062     }
00063 
00064     // returns server assigned job id if successful, zero otherwise
00065     int
00066     ScheduleJob(CString cmdLine, int realVideoNr, bool delayFirst = false)
00067     {
00068         if (!IsConnected())
00069             return 0;
00070 
00071         if (delayFirst)
00072         {
00073             ILOG_DEBUG("Delaying job dispatch");
00074             Util::Sleep(mRequestIntervalSec);
00075         }
00076 
00077         ILOG_INFO("Dispatching job to server (actual video nr = " <<
00078                   realVideoNr << ") : " << cmdLine );
00079         String request = "AcceptJob:cmdLine=" + cmdLine;
00080         String response = mChannelProxy->Send(request);
00081         int jobId = 0;
00082         if (response.substr(0, 18) == "JobAccepted:jobId=")
00083         {
00084             jobId = atoi(response.substr(18));
00085             if (jobId > 0)
00086             {
00087                 ILOG_DEBUG("Received assigned job ID: " << jobId);
00088                 mJobIds.push_back(jobId);
00089             }
00090             else
00091             {
00092                 ILOG_ERROR("Received invalid job ID: " << jobId);
00093             }
00094         }
00095         else if (response == "MaxJobs")
00096         {
00097             ILOG_WARN("Job dispatch rejected: job max reached");
00098         }
00099         else
00100         {
00101             ILOG_ERROR("Job dispatch failed: " << response);
00102         }
00103             
00104         return jobId;
00105     }
00106 
00107     // Polls the job server for state changes of dispatched jobs and
00108     // returns immediately on any reported state change.
00109     std::map<int, StateType>& 
00110     MonitorJobState(Data& vpData)
00111     {
00112         static std::map<int, StateType> jobsWithStateChanged;
00113         jobsWithStateChanged.clear();
00114 
00115         std::ostringstream oss;
00116         std::map<int, Data::VideoJob*>::iterator jobIter =
00117             vpData.mJobsScheduled.begin();
00118         for ( ; jobIter != vpData.mJobsScheduled.end(); jobIter++ )
00119         {
00120             Data::VideoJob& job = *(jobIter->second);
00121             oss << job.id << ':' << job.state << ' ';
00122         }
00123         String request = "ProvideStatus:reference=" + oss.str();
00124 
00125         do
00126         {
00127             String response = mChannelProxy->Send(request);
00128             Util::StringParser parser(response);
00129             String responseType = parser.GetString(':', false);
00130             if (responseType == "NoChange")
00131             {
00132                 Util::Sleep(mRequestIntervalSec);
00133             }
00134             else if (responseType == "JobStatus" &&
00135                      parser.GetString('=') == "changed")
00136             {
00137                 int jobId = parser.GetInt(':', false, true);
00138                 while (jobId > 0)
00139                 {
00140                     StateType jobState = (StateType) parser.GetInt(' ', true,
00141                                                                    true);
00142                     jobsWithStateChanged[jobId] = jobState;
00143                     jobId = parser.GetInt(':', false, true);
00144                 }
00145             }
00146             else
00147             {
00148                 ILOG_ERROR("Job report request failed: " << response);
00149                 jobsWithStateChanged.clear();
00150                 break;
00151             }
00152         }
00153         while (jobsWithStateChanged.empty());
00154 
00155         return jobsWithStateChanged;
00156     }
00157 
00158     int
00159     GetJobState(int jobId, StateType& state, int& exitCode, String& errorLog)
00160     {
00161         String request = "ProvideStatus:jobId=" + MakeString(jobId);
00162         String response = mChannelProxy->Send(request);
00163         Util::StringParser parser(response);
00164         if (parser.GetString(':') == "JobStatus" &&
00165             parser.GetString('=') == "jobId" &&
00166             parser.GetInt(';', true, true) == jobId &&
00167             parser.GetString('=') == "state")
00168         {
00169             state = (StateType) parser.GetInt(';', true, true);
00170             if (parser.GetString('=') == "exitCode")
00171             {
00172                 exitCode = parser.GetInt(';', true, true);
00173                 if (parser.GetString('=') == "errorLog")
00174                     errorLog = response.substr(parser.Position());
00175                 return 0;
00176             }
00177         }
00178         ILOG_ERROR("Job report request failed: " << response);
00179         return -1;
00180     }
00181 
00182 private:
00183 
00184     void
00185     Init(CString serverAddress, int port, CString passwordFile,
00186          int requestIntervalSec)
00187     {
00188         mServerAddress = serverAddress;
00189         mPort = port;
00190         mPasswordFile = passwordFile;
00191         mRequestIntervalSec = requestIntervalSec;
00192 
00193         mChannelProxy = 0;
00194 
00195         if (! Connect(mServerAddress, mPort, mPasswordFile))
00196         {
00197             ILOG_ERROR("Cannot connect to job server at " << mServerAddress <<
00198                        ':' << mPort);
00199         }
00200     }
00201 
00202     bool
00203     Connect(CString serverAddress, int port, CString passwordFile,
00204             int timeOutSec = -1)
00205     {
00206         mChannelProxy = new Util::ChannelProxy(serverAddress, port,
00207                                                passwordFile);
00208         if (!mChannelProxy->GetChannel())
00209         {
00210             ILOG_ERROR("Failed to connect to job server");
00211             Disconnect();
00212             return false;
00213         }
00214 
00215         return true;
00216     }
00217 
00218     void
00219     Disconnect()
00220     {
00221         if (mChannelProxy)
00222         {
00223             //mChannelProxy->CloseChannel();
00224             delete mChannelProxy;
00225             mChannelProxy = 0;
00226         }
00227     }
00228 
00229     ILOG_VAR_DECL;
00230 
00231     String mServerAddress;
00232     int    mPort;
00233     String mPasswordFile;
00234     int    mRequestIntervalSec;
00235 
00236     Util::ChannelProxy* mChannelProxy;
00237 
00238     std::vector<int> mJobIds;
00239 
00240 }; // class
00241 
00242 ILOG_VAR_INIT(ServerProxy, Impala.Core.VideoJob);
00243 
00244 } // namespace 
00245 } // namespace 
00246 } // namespace 
00247 
00248 #endif

Generated on Fri Mar 19 09:31:29 2010 for ImpalaSrc by  doxygen 1.5.1