00001 #ifndef Impala_Core_VideoJob_Manager_h
00002 #define Impala_Core_VideoJob_Manager_h
00003
00004 #include <map>
00005 #include <sstream>
00006 #include <fstream>
00007
00008 #include "Basis/ILog.h"
00009 #include "Util/Database.h"
00010 #include "Util/IOBuffer.h"
00011 #include "Job/State.h"
00012 #include "Core/VideoJob/ProcessDefinition.h"
00013 #include "Core/VideoJob/Data.h"
00014 #include "Core/VideoJob/ServerProxy.h"
00015 #include "Core/VideoJob/Reporter.h"
00016
00017 #include "Core/VideoSet/VideoSet.h"
00018 #include "Core/VideoSet/MakeVideoSet.h"
00019
00020 namespace Impala
00021 {
00022 namespace Core
00023 {
00024 namespace VideoJob
00025 {
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 class Manager
00038 {
00039 typedef Job::State::StateType JobStateType;
00040
00041 typedef Data::VideoMeta VideoMeta;
00042 typedef Data::VideoJob VideoJob;
00043
00044 public:
00045
00046 Manager(CString videoSetName, CString processDefinitionFile,
00047 CString serverName, int port, CString passwordFile, CmdOptions& options)
00048 {
00049 mData.mVideoSetName = videoSetName;
00050 mData.mServerName = serverName;
00051 mData.mPort = port;
00052 mData.mPasswordFile = passwordFile;
00053
00054 mData.mProcessDefFile = processDefinitionFile;
00055
00056 mJobServer = 0;
00057 mCurrentMask = "";
00058 mProcessDone = false;
00059
00060 flagVideoUpdated = options.GetString("videoUpdate", "0");
00061
00062 mReporter = 0;
00063 }
00064
00065 virtual
00066 ~Manager()
00067 {
00068 if (mJobServer)
00069 delete mJobServer;
00070
00071 if (mReporter)
00072 delete mReporter;
00073
00074 if (mVideoSet)
00075 delete mVideoSet;
00076 }
00077
00078 void
00079 SetMaskFileName(CString processStateFile)
00080 {
00081 mMaskFileName = processStateFile;
00082 }
00083
00084 int
00085 Start(int serverRequestIntervalSec, CmdOptions& options)
00086 {
00087 ILOG_INFO("Start managing video processing");
00088
00089 if (!Prepare(options))
00090 return 0;
00091
00092 mCurrentStage = mData.mProcessDef->FirstStageNr();
00093 ILOG_INFO("Entering stage " << mCurrentStage + 1);
00094 CreateInitialStageJobs();
00095
00096 mJobServer = new ServerProxy(mData.mServerName, mData.mPort,
00097 mData.mPasswordFile,
00098 serverRequestIntervalSec);
00099 mProcessDone = false;
00100 while (!mProcessDone)
00101 {
00102 while (HasMoreJobs())
00103 {
00104 VideoJob* job = GetNextJob();
00105 String cmdLine = mData.mProcessDef->GetCmdLine
00106 (job->stepNr, mData.mVideoSetName, job->videoNrCompact);
00107 int attempts = 0;
00108 int jobId = mJobServer->ScheduleJob(cmdLine, job->videoNr);
00109 while (jobId <= 0)
00110 {
00111 if (attempts++ < 3)
00112 {
00113 ILOG_WARN("Job dispatch failed; retrying...");
00114 jobId = mJobServer->ScheduleJob(cmdLine, job->videoNr,
00115 true);
00116 }
00117 else
00118 {
00119 ILOG_ERROR("Job dispatch failed");
00120 return -3;
00121 }
00122 }
00123 job->id = jobId;
00124 job->state = Job::State::SUBMITTED;
00125 mData.mJobsScheduled[jobId] = job;
00126 }
00127
00128 int nrOfJobsCompleted = 0;
00129 do
00130 {
00131 nrOfJobsCompleted = MonitorJobs();
00132 ReportState();
00133 if (nrOfJobsCompleted < 0)
00134 mProcessDone = true;
00135 }
00136 while (nrOfJobsCompleted == 0);
00137 }
00138
00139 ReportState();
00140 ILOG_INFO("Finished managing video processing");
00141 return 0;
00142 }
00143
00144
00145 private:
00146
00147 bool
00148 Prepare(CmdOptions& options)
00149 {
00150 mVideoSet = Core::VideoSet::MakeVideoSet(mData.mVideoSetName);
00151 mData.mVideoSetSize = mVideoSet->NrFiles();
00152 ILOG_INFO("Video set contains " << mData.mVideoSetSize <<
00153 " video file(s)");
00154 if (mData.mVideoSetSize <= 0)
00155 {
00156 ILOG_WARN("No video files to process");
00157 return false;
00158 }
00159
00160 mData.mProcessDef = new ProcessDefinition(mData.mProcessDefFile);
00161 if (mData.mProcessDef->StageCount() < 1)
00162 {
00163 ILOG_WARN("Empty process definition");
00164 return false;
00165 }
00166
00167 mMaskPath = mVideoSet->GetFilePathVideoData(mMaskFileName, false, true);
00168 if (mMaskPath.empty())
00169 {
00170 ILOG_INFO("Mask file '" << mMaskFileName << "' not found");
00171
00172 mMaskPath = mVideoSet->GetFilePathVideoData(mMaskFileName, true,
00173 false);
00174 }
00175 else
00176 {
00177 Util::Database* db = mVideoSet->GetDatabase();
00178 Util::IOBuffer* buffer = db->GetIOBuffer(mMaskPath, true, false, "");
00179 if (buffer)
00180 {
00181 mCurrentMask = buffer->ReadLine();
00182 delete buffer;
00183 int maskLength = Min<int>(mCurrentMask.size(),
00184 mData.VideoSetSize());
00185 mCurrentMask = mCurrentMask.substr(0, maskLength);
00186 if (mCurrentMask.empty())
00187 ILOG_INFO("Mask file '" << mMaskFileName << "' is empty");
00188 }
00189 else
00190 {
00191 ILOG_ERROR("Failed to read mask file '" << mMaskFileName << "'");
00192 return false;
00193 }
00194 }
00195 mData.mInitialMask = mCurrentMask;
00196
00197 const int setSize = mData.VideoSetSize();
00198 for (int v = 0; v < setSize; v++)
00199 {
00200 if (! mData.WasInitiallyMaskedOut(v))
00201 {
00202 VideoMeta video;
00203 video.videoNr = v;
00204 video.discarded = false;
00205 mData.mVideos[v] = video;
00206 }
00207 }
00208
00209 ILOG_INFO("Ignoring " << mData.mVideoSetSize - mData.mVideos.size() <<
00210 " video file(s)");
00211
00212 if (mData.mVideos.empty())
00213 {
00214 ILOG_WARN("No video files to process");
00215 return false;
00216 }
00217
00218 if (!UpdateMaskFile(false))
00219 {
00220 ILOG_ERROR("Mask file update test failed");
00221 return false;
00222 }
00223
00224 mReporter = new Reporter(mData, options);
00225 ReportVideoSet(mVideoSet);
00226 ReportProcessDefinition();
00227
00228 WriteVideoSet(FileNameBase(mData.mVideoSetName) + "_bak." +
00229 FileNameExt(mData.mVideoSetName));
00230
00231 mCompactedVideoSetSize = mData.mVideoSetSize;
00232
00233 return true;
00234 }
00235
00236
00237
00238 bool
00239 HasMoreJobs()
00240 {
00241 return (mData.mCountOfWaitingJobs > 0);
00242 }
00243
00244
00245
00246 VideoJob*
00247 GetNextJob()
00248 {
00249 int indexOfFirstWaitingJob = mData.mAllJobs.size() -
00250 mData.mCountOfWaitingJobs--;
00251 return mData.mAllJobs[indexOfFirstWaitingJob];
00252 }
00253
00254
00255 int
00256 MonitorJobs()
00257 {
00258 int nrOfJobsCompleted = 0;
00259 std::map<int, JobStateType>& updatedJobs =
00260 mJobServer->MonitorJobState(mData);
00261 if (updatedJobs.empty())
00262 return -1;
00263 std::map<int, JobStateType>::iterator iter = updatedJobs.begin();
00264 for ( ; iter != updatedJobs.end(); iter++ )
00265 {
00266 int jobId = iter->first;
00267 JobStateType newState = iter->second;
00268 VideoJob* job = mData.mJobsScheduled[jobId];
00269 if (newState == Job::State::TERMINATED_NRM ||
00270 newState == Job::State::TERMINATED_ABNRM)
00271 {
00272 JobStateType dummy = Job::State::UNDEFINED;
00273 int exitCode = -99;
00274 String errorLog;
00275 int rc = mJobServer->GetJobState(jobId, dummy, exitCode, errorLog);
00276 if (rc != 0)
00277 return -2;
00278 JobDone(mData.mJobsScheduled[jobId], newState, exitCode);
00279 nrOfJobsCompleted++;
00280 }
00281 else
00282 {
00283 job->state = newState;
00284 }
00285 }
00286 return nrOfJobsCompleted;
00287 }
00288
00289 void
00290 JobDone(VideoJob* job, JobStateType state, int exitCode)
00291 {
00292 ILOG_DEBUG("Job " << job->id << " is done (state=" << state <<
00293 ", exitCode=" << exitCode << ")");
00294
00295 job->state = state;
00296 job->exitCode = exitCode;
00297 mData.mJobsScheduled.erase(mData.mJobsScheduled.find(job->id));
00298
00299 if (state == Job::State::TERMINATED_ABNRM || exitCode != 0)
00300 {
00301 if (job->isIndiv)
00302 {
00303 mData.mVideos[job->videoNr].discarded = true;
00304 }
00305 else
00306 {
00307 for (int v = 0; v < mData.VideoSetSize(); v++)
00308 mData.mVideos[v].discarded = true;
00309 }
00310 }
00311 else
00312 {
00313 bool atEndOfStage =
00314 mData.mProcessDef->IsLastStepOfStage(job->stepNr, mCurrentStage);
00315 if (! atEndOfStage)
00316 {
00317 CreateMoreStageJobs(job);
00318 return;
00319 }
00320 }
00321
00322 EvaluateStage();
00323 }
00324
00325 void
00326 EvaluateStage()
00327 {
00328 int nrOfInvalid = 0;
00329 for (int v = 0; v < mData.mVideoSetSize; v++)
00330 {
00331 if (mData.IsValid(v))
00332 {
00333 VideoMeta& video = mData.mVideos[v];
00334 VideoJob* mostRecentJob = video.jobs.back();
00335 if (mostRecentJob->state != Job::State::TERMINATED_NRM &&
00336 mostRecentJob->state != Job::State::TERMINATED_ABNRM)
00337 return;
00338 }
00339 else
00340 {
00341 nrOfInvalid++;
00342 }
00343
00344
00345 if (flagVideoUpdated == "1")
00346 {
00347
00348 UpdateMaskFile();
00349 }
00350 }
00351
00352
00353
00354 UpdateMaskFile();
00355
00356 ILOG_INFO("Stage " << mCurrentStage + 1 << " completed");
00357 if (mCurrentStage + 1 < mData.mProcessDef->StageCount())
00358 {
00359 if (nrOfInvalid < mData.VideoSetSize())
00360 {
00361 mCurrentStage++;
00362 ILOG_INFO("Advancing to stage " << mCurrentStage + 1);
00363 CreateInitialStageJobs();
00364 return;
00365 }
00366 else
00367 {
00368 ILOG_DEBUG("Will not advance stage since no valid videos left");
00369 }
00370 }
00371
00372 mProcessDone = true;
00373 }
00374
00375 bool
00376 UpdateMaskFile(bool onlyIfChanged = true)
00377 {
00378 std::ostringstream oss;
00379 for (int v = 0; v < mData.VideoSetSize(); v++)
00380 oss << (mData.IsValid(v) ? '1' : '0');
00381 String mask = oss.str();
00382 if (! (mask == mCurrentMask && onlyIfChanged) )
00383 {
00384 Util::Database* db = mVideoSet->GetDatabase();
00385 Util::IOBuffer* buffer = db->GetIOBuffer(mMaskPath, false, false,
00386 "dummy_value");
00387 if (!buffer)
00388 {
00389 ILOG_ERROR("Cannot write mask to '" << mMaskPath << "'");
00390 return false;
00391 }
00392 buffer->Write(mask.c_str(), mask.size());
00393 buffer->Write("\n", 1);
00394 delete buffer;
00395 ILOG_INFO("Mask file updated");
00396 mCurrentMask = mask;
00397 }
00398 return true;
00399 }
00400
00401 void
00402 CreateInitialStageJobs()
00403 {
00404 int firstStepOfStage =
00405 mData.mProcessDef->FirstStepOfStage(mCurrentStage);
00406 std::vector<String> validVideos;
00407 if (mData.mProcessDef->IsIndividualStage(mCurrentStage))
00408 {
00409 for (int v = 0; v < mData.VideoSetSize(); v++)
00410 {
00411 if (mData.IsValid(v))
00412 {
00413 VideoJob* newJob = new VideoJob;
00414 newJob->stepNr = firstStepOfStage;
00415 newJob->videoNr = v;
00416 const int compactedVideoNr = validVideos.size();
00417 newJob->videoNrCompact = compactedVideoNr;
00418 newJob->isIndiv = true;
00419 newJob->state = Job::State::CREATED;
00420 mData.mVideos[v].jobs.push_back(newJob);
00421
00422 mData.mAllJobs.push_back(newJob);
00423 mData.mCountOfWaitingJobs++;
00424
00425 validVideos.push_back(mVideoSet->GetAsPath(v));
00426 }
00427 }
00428 }
00429 else
00430 {
00431 VideoJob* newJob = new VideoJob;
00432 newJob->stepNr = firstStepOfStage;
00433 newJob->videoNr = -1;
00434 newJob->videoNrCompact = -1;
00435 newJob->isIndiv = false;
00436 newJob->state = Job::State::CREATED;
00437 for (int v = 0; v < mData.VideoSetSize(); v++)
00438 {
00439 if (mData.IsValid(v))
00440 {
00441 mData.mVideos[v].jobs.push_back(newJob);
00442
00443 validVideos.push_back(mVideoSet->GetAsPath(v));
00444 }
00445 }
00446
00447 mData.mAllJobs.push_back(newJob);
00448 mData.mCountOfWaitingJobs++;
00449 }
00450
00451 if (validVideos.size() < mCompactedVideoSetSize)
00452 {
00453 UpdateVideoSet(validVideos);
00454 mCompactedVideoSetSize = validVideos.size();
00455 }
00456 }
00457
00458 bool
00459 WriteVideoSet(CString setName)
00460 {
00461 String pathToVideoSet = mVideoSet->GetFilePathVideoData(setName, true,
00462 false);
00463 Util::Database* db = mVideoSet->GetDatabase();
00464 Util::IOBuffer* buffer = db->GetIOBuffer(pathToVideoSet, false, false,
00465 "temp_videoset.txt");
00466 if (!buffer)
00467 {
00468 ILOG_ERROR("Cannot write video set backup file: " << pathToVideoSet);
00469 return false;
00470 }
00471
00472 for (int v = 0; v < mVideoSet->NrFiles(); v++)
00473 {
00474 String line = mVideoSet->GetAsPath(v);
00475 buffer->Write("\"", 1);
00476 buffer->Write(line.c_str(), line.size());
00477 buffer->Write("\"\n", 2);
00478 }
00479
00480 delete buffer;
00481 ILOG_INFO("Video set file backed up: " << pathToVideoSet);
00482
00483 return true;
00484 }
00485
00486 bool
00487 UpdateVideoSet(std::vector<String> videoSpecs)
00488 {
00489 String pathToVideoSet =
00490 mVideoSet->GetFilePathVideoData(mVideoSet->GetSetName(), true,
00491 false);
00492 Util::Database* db = mVideoSet->GetDatabase();
00493 Util::IOBuffer* buffer = db->GetIOBuffer(pathToVideoSet, false, false,
00494 "temp_videoset.txt");
00495 if (!buffer)
00496 {
00497 ILOG_ERROR("Cannot update video set file: " << pathToVideoSet);
00498 return false;
00499 }
00500
00501 for (int v = 0; v < videoSpecs.size(); v++)
00502 {
00503 CString line = videoSpecs[v];
00504 buffer->Write("\"", 1);
00505 buffer->Write(line.c_str(), line.size());
00506 buffer->Write("\"\n", 2);
00507 }
00508
00509 delete buffer;
00510 ILOG_INFO("Video set file updated");
00511
00512 return true;
00513 }
00514
00515 void
00516 CreateMoreStageJobs(VideoJob* job)
00517 {
00518 if (!job->isIndiv)
00519 ILOG_ERROR("Stage " << mCurrentStage <<
00520 " is not individual and can contain only one step");
00521
00522 VideoMeta& video = mData.mVideos[job->videoNr];
00523 VideoJob* newJob = new VideoJob;
00524 newJob->stepNr = job->stepNr + 1;
00525 newJob->videoNr = job->videoNr;
00526 newJob->videoNrCompact = job->videoNrCompact;
00527 newJob->isIndiv = true;
00528 newJob->state = Job::State::CREATED;
00529 video.jobs.push_back(newJob);
00530
00531 mData.mAllJobs.push_back(newJob);
00532 mData.mCountOfWaitingJobs++;
00533 }
00534
00535 void
00536 ReportState()
00537 {
00538 mReporter->WriteReportStatus();
00539 }
00540
00541 void
00542 ReportVideoSet(Core::VideoSet::VideoSet* const videoSet)
00543 {
00544 mReporter->WriteVideoSet(videoSet);
00545 }
00546
00547 void
00548 ReportProcessDefinition()
00549 {
00550 mReporter->WriteProcessDefinition();
00551 }
00552
00553 ILOG_VAR_DECL;
00554
00555 Data mData;
00556
00557 ServerProxy* mJobServer;
00558 int mCurrentStage;
00559 bool mProcessDone;
00560
00561 String mMaskFileName;
00562 String mMaskPath;
00563 String mCurrentMask;
00564
00565 Reporter* mReporter;
00566
00567 String flagVideoUpdated;
00568
00569 Core::VideoSet::VideoSet* mVideoSet;
00570 int mCompactedVideoSetSize;
00571
00572 };
00573
00574 ILOG_VAR_INIT(Manager, Impala.Core.VideoJob);
00575
00576 }
00577 }
00578 }
00579
00580 #endif