Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

ChannelServer.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_ChannelServer_h
00002 #define Impala_Util_ChannelServer_h
00003 
00004 #include <strstream>
00005 #include <map>
00006 
00007 #include "Basis/ILog.h"
00008 #include "Basis/Timer.h"
00009 #include "Util/Channel.h"
00010 #include "Util/ChannelAuthorization.h"
00011 
00012 namespace Impala
00013 {
00014 namespace Util
00015 {
00016 
00017 /**********************************************************************
00018  Instances of this class support communication with clients, based on 
00019  channels that are based on sockets.
00020 ***********************************************************************/
00021 
00022 class ChannelServer : public ChannelListener
00023 {
00024 
00025 public: 
00026 
00027     ChannelServer(int port, int nrPorts, bool doDynaPorts, CString passwordFile)
00028         : mFirstPort(port), mNrOfPorts(nrPorts), mDoDynaPorts(doDynaPorts),
00029           mPasswordFile(passwordFile)
00030     {
00031         Init();
00032     }
00033 
00034     virtual
00035     ~ChannelServer()
00036     {
00037         if (mChannel)
00038             delete mChannel;
00039     }
00040 
00041     virtual void 
00042     HandleInfo(CString msg)
00043     {
00044         ILOG_INFO(msg);
00045     }
00046 
00047     virtual void 
00048     HandleError(CString msg)
00049     {
00050         ILOG_ERROR(msg);
00051     }
00052 
00053     virtual void 
00054     HandleIdle()
00055     {
00056         UpdateStatusBuf();
00057     }
00058 
00059     virtual int 
00060     HandleRequest(char* buf, int len, int bufSize)
00061     {
00062         buf[len] = '\0';
00063         String bufStr(buf, len - 1); // exclude '\0'
00064         String bufStrPart = (len > 20) ? bufStr.substr(0, 20) : bufStr;
00065         ILOG_DEBUG("Request from " << ConnectionDescr()
00066                    << " (" << len << " bytes) : [" << bufStrPart << "]");
00067 
00068         CString reqConn = mChannel->GetCurRequestConnection();
00069         int reqPort = mChannel->GetCurRequestPort();
00070         int responseLen = AcceptRequest(buf, len, bufSize, reqConn, reqPort);
00071         if (responseLen < 0)
00072         {
00073             String bufStr(buf, len - 1); // exclude '\0'
00074             String bufStrPart = (len > 20) ? bufStr.substr(0, 20) : bufStr;
00075             ILOG_WARN("Unknown request from " << ConnectionDescr() << " : [" <<
00076                       bufStrPart << "]");
00077             sprintf(buf, "ERROR : Unknown request\0");
00078             responseLen = strlen(buf) + 1;
00079         }
00080 
00081         // SK: guard against buffer overflow
00082         return Min(responseLen, Channel::DATA_BUFFER_SIZE);
00083     }
00084 
00085     //virtual void 
00086     //HandleConnect(CString conn, int port)
00087     //{
00088     //    ...
00089     //}
00090 
00091     virtual void 
00092     HandleDisconnect(int port)
00093     {
00094         if (mDoDynaPorts && port >= mFirstDynaPort)
00095         {
00096             if (mPortAssignment[port] != "")
00097             {
00098                 AcceptDisconnect(port);
00099                 mPortAssignment[port] = "";
00100                 mPortAllowWrite[port] = false;
00101                 ILOG_INFO("Port " << port << " is free and available again");
00102             }
00103         }
00104     }
00105 
00106 
00107     virtual void
00108     Start(bool doIdle)
00109     {
00110         mChannel = new Channel(mFirstPort, mNrOfPorts, this);
00111         mRunning = true;
00112         mTimer.Start();
00113         mChannel->Serve(doIdle);
00114         //Stop(); // SK: yet to be elaborated
00115     }
00116 
00117     // SK: yet to be elaborated
00118     //virtual void
00119     //Stop()
00120     //{
00121     //    mTimer.Stop();
00122     //    //mChannel...Stop() ??
00123     //    mRunning = false;
00124     //}
00125 
00126     int 
00127     NrOfPorts()
00128     {
00129         return mNrOfPorts;
00130     }
00131 
00132     int 
00133     FirstPort()
00134     {
00135         return mFirstPort;
00136     }
00137 
00138     bool 
00139     UsesDynamicPortAssignment()
00140     {
00141         return mDoDynaPorts;
00142     }
00143 
00144 protected:
00145 
00146     virtual int
00147     AcceptRequest(char* buf, int len, int bufSize, CString conn, int port)
00148     {
00149         String curCon = ConnectionDescr();
00150 
00151         if (strncmp(buf, "ShutDown", 8) == 0)
00152         {
00153             ILOG_WARN("Received shut down request from " << curCon <<
00154                       "; shutting down now");
00155             exit(-99);
00156         }
00157 
00158         else if (strncmp(buf, "ping", 4) == 0)
00159         {
00160             ILOG_DEBUG("Received ping request from " << curCon);
00161             sprintf(buf, "OK\0");
00162         }
00163 
00164         else if ((port > mFirstPort) && mDoDynaPorts &&
00165                  (conn != mPortAssignment[port]))
00166         {
00167             ILOG_WARN("Illegal request from " << conn << " at port " << port <<
00168                       "; port assigned to " << mPortAssignment[port]);
00169             sprintf(buf, "ERROR\0");
00170         }
00171 
00172         else if (strncmp(buf, "PortMode", 8) == 0)
00173             AcceptRequestPortMode(buf, len, bufSize, conn, port);
00174 
00175         else if (strncmp(buf, "PortAssignment", 14) == 0)
00176             AcceptRequestPortAssignment(buf, len, bufSize, conn, port);
00177 
00178         else
00179             return -1;
00180 
00181         return strlen(buf) + 1;
00182     }
00183 
00184     virtual void
00185     AcceptRequestPortMode(char* buf, int len, int bufSize, CString conn,
00186                           int port)
00187     {
00188         String curCon = ConnectionDescr();
00189         ILOG_DEBUG("Receiving port mode request from " << curCon);
00190         int index = 14;
00191         String readPass = ReadQuotedString(buf, index, len);
00192         if (! (mAuthorization.Valid() && mAuthorization.AllowRead(readPass)))
00193         {
00194             ILOG_WARN("Authorization failed");
00195             sprintf(buf, "Permission denied\0");
00196             return;
00197         }
00198 
00199         if (mDoDynaPorts)
00200         {
00201             sprintf(buf, "Dynamic\0");
00202             if (port != mFirstPort)
00203             {
00204                 ILOG_WARN("Port mode request from " << curCon <<
00205                           " should have been targeted at port " << mFirstPort);
00206             }
00207         }
00208         else
00209         {
00210             sprintf(buf, "Static\0", port);
00211             index += 7;
00212             String writePass = ReadQuotedString(buf, index, len);
00213             mStaticPortAllowWrite = mAuthorization.AllowWrite(writePass);
00214         }
00215     }
00216 
00217     virtual void
00218     AcceptRequestPortAssignment(char* buf, int len, int bufSize, CString conn,
00219                                 int port)
00220     {
00221         String curCon = ConnectionDescr();
00222         ILOG_DEBUG("Receiving port assignment request from " << curCon);
00223         if (mDoDynaPorts)
00224         {
00225             if (port == mFirstPort)
00226             {
00227                 int index = 21;
00228                 String writePass = ReadQuotedString(buf, index, len);
00229 
00230                 int dynaPort = 0;
00231                 for (dynaPort = mFirstDynaPort; dynaPort <= mLastDynaPort;
00232                      dynaPort++)
00233                 {
00234                     if (mPortAssignment[dynaPort].empty())
00235                     {
00236                         ILOG_INFO("Assigning port " << dynaPort << " to " <<
00237                                   curCon);
00238                         mPortAssignment[dynaPort] = conn;
00239                         mPortAllowWrite[dynaPort] =
00240                             mAuthorization.AllowWrite(writePass);
00241                         sprintf(buf, "PortAssigned=%i\0", dynaPort);
00242                         break;
00243                     }
00244                 }
00245                 if (dynaPort > mLastDynaPort)
00246                 {
00247                     ILOG_WARN("Cannot assign port: all ports are in use");
00248                     sprintf(buf, "NoPortAvailable\0");
00249                 }
00250             }
00251             else
00252             {
00253                 ILOG_WARN("Invalid port assignment request from " << curCon <<
00254                           "; should request at port " << mFirstPort);
00255                 sprintf(buf, "ERROR\0");
00256             }
00257         }
00258         else
00259         {
00260             ILOG_WARN("No dynamic port assignment; no need for " << curCon <<
00261                       " to request port assignment");
00262             sprintf(buf, "PortAssigned=%i\0", port);
00263         }
00264     }
00265 
00266     virtual void 
00267     AcceptDisconnect(int port)
00268     {
00269     }
00270 
00271     virtual String 
00272     ConnectionDescr()
00273     {
00274         CString reqConn = mChannel->GetCurRequestConnection();
00275         int reqPort = mChannel->GetCurRequestPort();
00276         return reqConn + " at port " + MakeString(reqPort);
00277     }
00278 
00279     virtual void 
00280     UpdateStatusBuf()
00281     {
00282         double timeVal = SplitTime();
00283         std::ostrstream statusStream;
00284         statusStream << "timeVal = " << timeVal << std::ends;
00285         strcpy(mStatusBuf, statusStream.str());
00286         //SetStatusBuffer(statusStream.str());
00287         statusStream.freeze(false);
00288     }
00289 
00290     double 
00291     SplitTime()
00292     {
00293         return mTimer.SplitTime();
00294     }
00295 
00296     //void SetStatusBuffer(const char* msg)
00297     //{
00298     //    strcpy(mStatusBuf, msg);
00299     //}
00300 
00301     char* 
00302     GetStatusBuffer()
00303     {
00304         return mStatusBuf;
00305     }
00306 
00307     String
00308     ReadQuotedString(const char* buf, int& index, int len)
00309     {
00310         while (index < len && buf[index] != '"')
00311             index++;
00312         if (index < len)
00313         {
00314             int startIndex = ++index; // read past start quote
00315             while (index < len && buf[index] != '"')
00316                 index++;
00317             const int quotedStringLen = index - startIndex;
00318             if (index < len)
00319                 index++; // read past end quote
00320             return String(buf + startIndex, quotedStringLen);
00321         }
00322         return "";
00323     }
00324 
00325     bool
00326     AllowWrite(int port)
00327     {
00328         if (mDoDynaPorts)
00329             return mPortAllowWrite[port];
00330         return mStaticPortAllowWrite;
00331     }
00332 
00333 private:
00334 
00335     void 
00336     Init()
00337     {
00338         mChannel = 0;
00339         mRunning = false;
00340         mTimer = 1;
00341 
00342         // need at least 2 ports for dynamic port assignment
00343         mDoDynaPorts = mDoDynaPorts && (mNrOfPorts > 1);
00344         if (mDoDynaPorts)
00345         {
00346             mFirstDynaPort = mFirstPort + 1;
00347             int nrOfDynaPorts = mNrOfPorts - 1;
00348             mLastDynaPort = mFirstPort + nrOfDynaPorts;
00349             ILOG_INFO("Using dynamic port assignment");
00350         }
00351         else
00352         {
00353             mFirstDynaPort = -1;
00354             mLastDynaPort = -1;
00355             ILOG_INFO("Not using dynamic port assignment");
00356         }
00357 
00358         mAuthorization = ChannelAuthorization(mPasswordFile);
00359     }
00360 
00361     int      mFirstPort;
00362     int      mNrOfPorts;
00363     bool     mDoDynaPorts;
00364     String   mPasswordFile;
00365     int      mFirstDynaPort;
00366     int      mLastDynaPort;
00367     //bool     mDynaPorts;
00368     Channel* mChannel;
00369     bool     mRunning;
00370     Timer    mTimer;
00371     char     mStatusBuf[256];
00372     std::map<int, String> mPortAssignment;
00373     std::map<int, bool>   mPortAllowWrite;
00374     bool                  mStaticPortAllowWrite;
00375     ChannelAuthorization  mAuthorization;
00376 
00377     ILOG_VAR_DEC;
00378 
00379 }; // class
00380 
00381 ILOG_VAR_INIT(ChannelServer, Impala.Util);
00382 
00383 } // namespace 
00384 } // namespace 
00385 
00386 #endif

Generated on Fri Mar 19 09:31:46 2010 for ImpalaSrc by  doxygen 1.5.1