Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

Manager.h

Go to the documentation of this file.
00001 #ifndef Impala_Core_VideoJob_Manager_h
00002 #define Impala_Core_VideoJob_Manager_h
00003 
00004 #include <map>
00005 #include <sstream>
00006 #include <fstream>
00007 
00008 #include "Basis/ILog.h"
00009 #include "Util/Database.h"
00010 #include "Util/IOBuffer.h"
00011 #include "Job/State.h"
00012 #include "Core/VideoJob/ProcessDefinition.h"
00013 #include "Core/VideoJob/Data.h"
00014 #include "Core/VideoJob/ServerProxy.h"
00015 #include "Core/VideoJob/Reporter.h"
00016 
00017 #include "Core/VideoSet/VideoSet.h"
00018 #include "Core/VideoSet/MakeVideoSet.h"
00019 
00020 namespace Impala
00021 {
00022 namespace Core
00023 {
00024 namespace VideoJob
00025 {
00026 
00027 /***********************************************************************
00028 An instance of this class manages the processing of a video set according
00029 to a process definition, by decomposing work into packages (jobs) 
00030 following from dependencies within the process definition. 
00031 Work packages will be issued for execution to a job server 
00032 and on completion will have their status inspected by the manager. 
00033 This may result in more work packages being. 
00034 The cycle continues until eventually the overall process is complete.
00035 ************************************************************************/
00036 
00037 class Manager
00038 {
00039     typedef Job::State::StateType JobStateType;
00040 
00041     typedef Data::VideoMeta VideoMeta;
00042     typedef Data::VideoJob VideoJob;
00043 
00044 public:
00045 
00046     Manager(CString videoSetName, CString processDefinitionFile, 
00047             CString serverName, int port, CString passwordFile, CmdOptions& options)
00048     {
00049         mData.mVideoSetName = videoSetName;
00050         mData.mServerName = serverName;
00051         mData.mPort = port;
00052         mData.mPasswordFile = passwordFile;
00053 
00054         mData.mProcessDefFile = processDefinitionFile;
00055 
00056         mJobServer = 0;
00057         mCurrentMask = "";
00058         mProcessDone = false;
00059 
00060         flagVideoUpdated = options.GetString("videoUpdate", "0");
00061 
00062         mReporter = 0;
00063     }
00064 
00065     virtual 
00066     ~Manager()
00067     {
00068         if (mJobServer)
00069             delete mJobServer;
00070 
00071         if (mReporter)
00072             delete mReporter;
00073 
00074         if (mVideoSet)
00075             delete mVideoSet;
00076     }
00077 
00078     void 
00079     SetMaskFileName(CString processStateFile)
00080     {
00081         mMaskFileName = processStateFile;
00082     }
00083 
00084     int 
00085     Start(int serverRequestIntervalSec, CmdOptions& options)
00086     {
00087         ILOG_INFO("Start managing video processing");
00088 
00089         if (!Prepare(options))
00090             return 0;
00091 
00092         mCurrentStage = mData.mProcessDef->FirstStageNr();
00093         ILOG_INFO("Entering stage " << mCurrentStage + 1);
00094         CreateInitialStageJobs();
00095 
00096         mJobServer = new ServerProxy(mData.mServerName, mData.mPort,
00097                                      mData.mPasswordFile,
00098                                      serverRequestIntervalSec);
00099         mProcessDone = false;
00100         while (!mProcessDone)
00101         {
00102             while (HasMoreJobs())
00103             {
00104                 VideoJob* job = GetNextJob();
00105                 String cmdLine = mData.mProcessDef->GetCmdLine
00106                     (job->stepNr, mData.mVideoSetName, job->videoNrCompact);
00107                 int attempts = 0;
00108                 int jobId = mJobServer->ScheduleJob(cmdLine, job->videoNr);
00109                 while (jobId <= 0)
00110                 {
00111                     if (attempts++ < 3)
00112                     {
00113                         ILOG_WARN("Job dispatch failed; retrying...");
00114                         jobId = mJobServer->ScheduleJob(cmdLine, job->videoNr,
00115                                                         true);
00116                     }
00117                     else
00118                     {
00119                         ILOG_ERROR("Job dispatch failed");
00120                         return -3;
00121                     }
00122                 }
00123                 job->id = jobId;
00124                 job->state = Job::State::SUBMITTED;
00125                 mData.mJobsScheduled[jobId] = job;
00126             }
00127 
00128             int nrOfJobsCompleted = 0;
00129             do
00130             {
00131                 nrOfJobsCompleted = MonitorJobs();
00132                 ReportState();
00133                 if (nrOfJobsCompleted < 0) // SK: lousy trick
00134                     mProcessDone = true;
00135             }
00136             while (nrOfJobsCompleted == 0);
00137         }
00138 
00139         ReportState();
00140         ILOG_INFO("Finished managing video processing");
00141         return 0;
00142     }
00143 
00144 
00145 private:
00146 
00147     bool
00148     Prepare(CmdOptions& options)
00149     {
00150         mVideoSet = Core::VideoSet::MakeVideoSet(mData.mVideoSetName);
00151         mData.mVideoSetSize = mVideoSet->NrFiles();
00152         ILOG_INFO("Video set contains " << mData.mVideoSetSize <<
00153                   " video file(s)");
00154         if (mData.mVideoSetSize <= 0)
00155         {
00156             ILOG_WARN("No video files to process");
00157             return false;
00158         }
00159 
00160         mData.mProcessDef = new ProcessDefinition(mData.mProcessDefFile);
00161         if (mData.mProcessDef->StageCount() < 1)
00162         {
00163             ILOG_WARN("Empty process definition");
00164             return false;
00165         }
00166 
00167         mMaskPath = mVideoSet->GetFilePathVideoData(mMaskFileName, false, true);
00168         if (mMaskPath.empty())
00169         {
00170             ILOG_INFO("Mask file '" << mMaskFileName << "' not found");
00171             // since the file was not found, set path for the file for writing
00172             mMaskPath = mVideoSet->GetFilePathVideoData(mMaskFileName, true,
00173                                                         false);
00174         }
00175         else
00176         {
00177             Util::Database* db = mVideoSet->GetDatabase();
00178             Util::IOBuffer* buffer = db->GetIOBuffer(mMaskPath, true, false, "");
00179             if (buffer)
00180             {
00181                 mCurrentMask = buffer->ReadLine();
00182                 delete buffer;
00183                 int maskLength = Min<int>(mCurrentMask.size(),
00184                                           mData.VideoSetSize());
00185                 mCurrentMask = mCurrentMask.substr(0, maskLength);
00186                 if (mCurrentMask.empty())
00187                     ILOG_INFO("Mask file '" << mMaskFileName << "' is empty");
00188             }
00189             else
00190             {
00191                 ILOG_ERROR("Failed to read mask file '" << mMaskFileName << "'");
00192                 return false;
00193             }
00194         }
00195         mData.mInitialMask = mCurrentMask;
00196 
00197         const int setSize = mData.VideoSetSize();
00198         for (int v = 0; v < setSize; v++)
00199         {
00200             if (! mData.WasInitiallyMaskedOut(v))
00201             {
00202                 VideoMeta video;
00203                 video.videoNr = v;
00204                 video.discarded = false;
00205                 mData.mVideos[v] = video;
00206             }
00207         }
00208 
00209         ILOG_INFO("Ignoring " << mData.mVideoSetSize - mData.mVideos.size() <<
00210                   " video file(s)");
00211 
00212         if (mData.mVideos.empty())
00213         {
00214             ILOG_WARN("No video files to process");
00215             return false;
00216         }
00217 
00218         if (!UpdateMaskFile(false))
00219         {
00220             ILOG_ERROR("Mask file update test failed");
00221             return false;
00222         }
00223 
00224         mReporter = new Reporter(mData, options);
00225         ReportVideoSet(mVideoSet);
00226         ReportProcessDefinition();
00227 
00228         WriteVideoSet(FileNameBase(mData.mVideoSetName) + "_bak." + 
00229                       FileNameExt(mData.mVideoSetName));
00230 
00231         mCompactedVideoSetSize = mData.mVideoSetSize; // must initially have
00232                                                       // this value
00233         return true;
00234     }
00235 
00236     // Returns true if at least one more job is waiting to be run;
00237     // false otherwise.
00238     bool 
00239     HasMoreJobs()
00240     {
00241         return (mData.mCountOfWaitingJobs > 0);
00242     }
00243 
00244     // Returns one job that is waiting to be run;
00245     // assumes that HasMoreJobs() would return true if called at this point
00246     VideoJob* 
00247     GetNextJob()
00248     {
00249         int indexOfFirstWaitingJob = mData.mAllJobs.size() -
00250                                      mData.mCountOfWaitingJobs--;
00251         return mData.mAllJobs[indexOfFirstWaitingJob];
00252     }
00253 
00254     // Waits for job state changes
00255     int
00256     MonitorJobs()
00257     {
00258         int nrOfJobsCompleted = 0;
00259         std::map<int, JobStateType>& updatedJobs =
00260             mJobServer->MonitorJobState(mData);
00261         if (updatedJobs.empty()) // SK: lousy trick
00262             return -1;
00263         std::map<int, JobStateType>::iterator iter = updatedJobs.begin();
00264         for ( ; iter != updatedJobs.end(); iter++ )
00265         {
00266             int jobId = iter->first;
00267             JobStateType newState = iter->second;
00268             VideoJob* job = mData.mJobsScheduled[jobId];
00269             if (newState == Job::State::TERMINATED_NRM ||
00270                 newState == Job::State::TERMINATED_ABNRM)
00271             {
00272                 JobStateType dummy = Job::State::UNDEFINED;
00273                 int exitCode = -99;
00274                 String errorLog;
00275                 int rc = mJobServer->GetJobState(jobId, dummy, exitCode, errorLog);
00276                 if (rc != 0)
00277                     return -2;
00278                 JobDone(mData.mJobsScheduled[jobId], newState, exitCode);
00279                 nrOfJobsCompleted++;
00280             }
00281             else
00282             {
00283                 job->state = newState;
00284             }
00285         }
00286         return nrOfJobsCompleted;
00287     }
00288 
00289     void 
00290     JobDone(VideoJob* job, JobStateType state, int exitCode)
00291     {
00292         ILOG_DEBUG("Job " << job->id << " is done (state=" << state <<
00293                    ", exitCode=" << exitCode << ")");
00294 
00295         job->state = state;
00296         job->exitCode = exitCode;
00297         mData.mJobsScheduled.erase(mData.mJobsScheduled.find(job->id));
00298 
00299         if (state == Job::State::TERMINATED_ABNRM || exitCode != 0)
00300         {
00301             if (job->isIndiv)
00302             {
00303                 mData.mVideos[job->videoNr].discarded = true;
00304             }
00305             else
00306             {
00307                 for (int v = 0; v < mData.VideoSetSize(); v++)
00308                     mData.mVideos[v].discarded = true;
00309             }
00310         }
00311         else
00312         {
00313             bool atEndOfStage =
00314                 mData.mProcessDef->IsLastStepOfStage(job->stepNr, mCurrentStage);
00315             if (! atEndOfStage)
00316             {
00317                 CreateMoreStageJobs(job);
00318                 return;
00319             }
00320         }
00321 
00322         EvaluateStage();
00323     }
00324 
00325     void
00326     EvaluateStage()
00327     {
00328         int nrOfInvalid = 0;
00329         for (int v = 0; v < mData.mVideoSetSize; v++)
00330         {
00331             if (mData.IsValid(v))
00332             {
00333                 VideoMeta& video = mData.mVideos[v];
00334                 VideoJob* mostRecentJob = video.jobs.back();
00335                 if (mostRecentJob->state != Job::State::TERMINATED_NRM &&
00336                     mostRecentJob->state != Job::State::TERMINATED_ABNRM)
00337                     return; // current stage not complete yet
00338             }
00339             else
00340             {
00341                 nrOfInvalid++;
00342             }
00343 
00344             // added by Jun Wu, 2009/09/15    
00345             if (flagVideoUpdated == "1")
00346             {
00347                 // update once each video is processed
00348                 UpdateMaskFile();
00349             }
00350         }
00351 
00352         // ok, we're at end of stage 
00353 
00354         UpdateMaskFile();
00355 
00356         ILOG_INFO("Stage " << mCurrentStage + 1 << " completed");
00357         if (mCurrentStage + 1 < mData.mProcessDef->StageCount())
00358         {
00359             if (nrOfInvalid < mData.VideoSetSize())
00360             {
00361                 mCurrentStage++;
00362                 ILOG_INFO("Advancing to stage " << mCurrentStage + 1);
00363                 CreateInitialStageJobs();
00364                 return;
00365             }
00366             else
00367             {
00368                 ILOG_DEBUG("Will not advance stage since no valid videos left");
00369             }
00370         }
00371 
00372         mProcessDone = true;
00373     }
00374 
00375     bool
00376     UpdateMaskFile(bool onlyIfChanged = true)
00377     {
00378         std::ostringstream oss;
00379         for (int v = 0; v < mData.VideoSetSize(); v++)
00380             oss << (mData.IsValid(v) ? '1' : '0');
00381         String mask = oss.str();
00382         if (! (mask == mCurrentMask && onlyIfChanged) )
00383         {
00384             Util::Database* db = mVideoSet->GetDatabase();
00385             Util::IOBuffer* buffer = db->GetIOBuffer(mMaskPath, false, false,
00386                                                      "dummy_value");
00387             if (!buffer)
00388             {
00389                 ILOG_ERROR("Cannot write mask to '" << mMaskPath << "'");
00390                 return false;
00391             }
00392             buffer->Write(mask.c_str(), mask.size());
00393             buffer->Write("\n", 1);
00394             delete buffer;
00395             ILOG_INFO("Mask file updated");
00396             mCurrentMask = mask;
00397         }
00398         return true;
00399     }
00400 
00401     void 
00402     CreateInitialStageJobs()
00403     {
00404         int firstStepOfStage =
00405             mData.mProcessDef->FirstStepOfStage(mCurrentStage);
00406         std::vector<String> validVideos;
00407         if (mData.mProcessDef->IsIndividualStage(mCurrentStage))
00408         {
00409             for (int v = 0; v < mData.VideoSetSize(); v++)
00410             {
00411                 if (mData.IsValid(v))
00412                 {
00413                     VideoJob* newJob = new VideoJob;
00414                     newJob->stepNr = firstStepOfStage;
00415                     newJob->videoNr = v;
00416                     const int compactedVideoNr = validVideos.size();
00417                     newJob->videoNrCompact = compactedVideoNr;
00418                     newJob->isIndiv = true;
00419                     newJob->state = Job::State::CREATED;
00420                     mData.mVideos[v].jobs.push_back(newJob);
00421 
00422                     mData.mAllJobs.push_back(newJob);
00423                     mData.mCountOfWaitingJobs++;
00424 
00425                     validVideos.push_back(mVideoSet->GetAsPath(v));
00426                 }
00427             }
00428         }
00429         else
00430         {
00431             VideoJob* newJob = new VideoJob;
00432             newJob->stepNr = firstStepOfStage;
00433             newJob->videoNr = -1;
00434             newJob->videoNrCompact = -1;
00435             newJob->isIndiv = false;
00436             newJob->state = Job::State::CREATED;
00437             for (int v = 0; v < mData.VideoSetSize(); v++)
00438             {
00439                 if (mData.IsValid(v))
00440                 {
00441                     mData.mVideos[v].jobs.push_back(newJob);
00442 
00443                     validVideos.push_back(mVideoSet->GetAsPath(v));
00444                 }
00445             }
00446 
00447             mData.mAllJobs.push_back(newJob);
00448             mData.mCountOfWaitingJobs++;
00449         }
00450 
00451         if (validVideos.size() < mCompactedVideoSetSize)
00452         {
00453             UpdateVideoSet(validVideos);
00454             mCompactedVideoSetSize = validVideos.size();
00455         }
00456     }
00457 
00458     bool
00459     WriteVideoSet(CString setName)
00460     {
00461         String pathToVideoSet = mVideoSet->GetFilePathVideoData(setName, true,
00462                                                                 false);
00463         Util::Database* db = mVideoSet->GetDatabase();
00464         Util::IOBuffer* buffer = db->GetIOBuffer(pathToVideoSet, false, false,
00465                                                  "temp_videoset.txt");
00466         if (!buffer)
00467         {
00468             ILOG_ERROR("Cannot write video set backup file: " << pathToVideoSet);
00469             return false;
00470         }
00471 
00472         for (int v = 0; v < mVideoSet->NrFiles(); v++)
00473         {
00474             String line = mVideoSet->GetAsPath(v);
00475             buffer->Write("\"", 1);
00476             buffer->Write(line.c_str(), line.size());
00477             buffer->Write("\"\n", 2);
00478         }
00479 
00480         delete buffer;
00481         ILOG_INFO("Video set file backed up: " << pathToVideoSet);
00482 
00483         return true;
00484     }
00485 
00486     bool
00487     UpdateVideoSet(std::vector<String> videoSpecs)
00488     {
00489         String pathToVideoSet =
00490             mVideoSet->GetFilePathVideoData(mVideoSet->GetSetName(), true,
00491                                             false);
00492         Util::Database* db = mVideoSet->GetDatabase();
00493         Util::IOBuffer* buffer = db->GetIOBuffer(pathToVideoSet, false, false,
00494                                                  "temp_videoset.txt");
00495         if (!buffer)
00496         {
00497             ILOG_ERROR("Cannot update video set file: " << pathToVideoSet);
00498             return false;
00499         }
00500 
00501         for (int v = 0; v < videoSpecs.size(); v++)
00502         {
00503             CString line = videoSpecs[v];
00504             buffer->Write("\"", 1);
00505             buffer->Write(line.c_str(), line.size());
00506             buffer->Write("\"\n", 2);
00507         }
00508 
00509         delete buffer;
00510         ILOG_INFO("Video set file updated");
00511 
00512         return true;
00513     }
00514 
00515     void 
00516     CreateMoreStageJobs(VideoJob* job)
00517     {
00518         if (!job->isIndiv)
00519             ILOG_ERROR("Stage " << mCurrentStage <<
00520                        " is not individual and can contain only one step");
00521 
00522         VideoMeta& video = mData.mVideos[job->videoNr];
00523         VideoJob* newJob = new VideoJob;
00524         newJob->stepNr = job->stepNr + 1;
00525         newJob->videoNr = job->videoNr;
00526         newJob->videoNrCompact = job->videoNrCompact;
00527         newJob->isIndiv = true;
00528         newJob->state = Job::State::CREATED;
00529         video.jobs.push_back(newJob);
00530 
00531         mData.mAllJobs.push_back(newJob);
00532         mData.mCountOfWaitingJobs++;
00533     }
00534 
00535     void 
00536     ReportState()
00537     {
00538         mReporter->WriteReportStatus();
00539     }
00540 
00541     void 
00542     ReportVideoSet(Core::VideoSet::VideoSet* const videoSet)
00543     {
00544         mReporter->WriteVideoSet(videoSet);
00545     }
00546     
00547     void 
00548     ReportProcessDefinition()
00549     {
00550         mReporter->WriteProcessDefinition();
00551     }
00552     
00553     ILOG_VAR_DECL;
00554 
00555     Data mData;
00556 
00557     ServerProxy* mJobServer;
00558     int mCurrentStage;
00559     bool mProcessDone;
00560 
00561     String mMaskFileName;
00562     String mMaskPath;
00563     String mCurrentMask;
00564 
00565     Reporter* mReporter;
00566 
00567     String flagVideoUpdated;
00568 
00569     Core::VideoSet::VideoSet* mVideoSet;
00570     int mCompactedVideoSetSize;
00571 
00572 }; // class
00573 
00574 ILOG_VAR_INIT(Manager, Impala.Core.VideoJob);
00575 
00576 } // namespace 
00577 } // namespace 
00578 } // namespace 
00579 
00580 #endif

Generated on Fri Mar 19 09:31:29 2010 for ImpalaSrc by  doxygen 1.5.1