00001 #ifndef Impala_Util_ChannelServer_h
00002 #define Impala_Util_ChannelServer_h
00003
00004 #include <strstream>
00005 #include <map>
00006
00007 #include "Basis/ILog.h"
00008 #include "Basis/Timer.h"
00009 #include "Util/Channel.h"
00010 #include "Util/ChannelAuthorization.h"
00011
00012 namespace Impala
00013 {
00014 namespace Util
00015 {
00016
00017
00018
00019
00020
00021
00022 class ChannelServer : public ChannelListener
00023 {
00024
00025 public:
00026
00027 ChannelServer(int port, int nrPorts, bool doDynaPorts, CString passwordFile)
00028 : mFirstPort(port), mNrOfPorts(nrPorts), mDoDynaPorts(doDynaPorts),
00029 mPasswordFile(passwordFile)
00030 {
00031 Init();
00032 }
00033
00034 virtual
00035 ~ChannelServer()
00036 {
00037 if (mChannel)
00038 delete mChannel;
00039 }
00040
00041 virtual void
00042 HandleInfo(CString msg)
00043 {
00044 ILOG_INFO(msg);
00045 }
00046
00047 virtual void
00048 HandleError(CString msg)
00049 {
00050 ILOG_ERROR(msg);
00051 }
00052
00053 virtual void
00054 HandleIdle()
00055 {
00056 UpdateStatusBuf();
00057 }
00058
00059 virtual int
00060 HandleRequest(char* buf, int len, int bufSize)
00061 {
00062 buf[len] = '\0';
00063 String bufStr(buf, len - 1);
00064 String bufStrPart = (len > 20) ? bufStr.substr(0, 20) : bufStr;
00065 ILOG_DEBUG("Request from " << ConnectionDescr()
00066 << " (" << len << " bytes) : [" << bufStrPart << "]");
00067
00068 CString reqConn = mChannel->GetCurRequestConnection();
00069 int reqPort = mChannel->GetCurRequestPort();
00070 int responseLen = AcceptRequest(buf, len, bufSize, reqConn, reqPort);
00071 if (responseLen < 0)
00072 {
00073 String bufStr(buf, len - 1);
00074 String bufStrPart = (len > 20) ? bufStr.substr(0, 20) : bufStr;
00075 ILOG_WARN("Unknown request from " << ConnectionDescr() << " : [" <<
00076 bufStrPart << "]");
00077 sprintf(buf, "ERROR : Unknown request\0");
00078 responseLen = strlen(buf) + 1;
00079 }
00080
00081
00082 return Min(responseLen, Channel::DATA_BUFFER_SIZE);
00083 }
00084
00085
00086
00087
00088
00089
00090
00091 virtual void
00092 HandleDisconnect(int port)
00093 {
00094 if (mDoDynaPorts && port >= mFirstDynaPort)
00095 {
00096 if (mPortAssignment[port] != "")
00097 {
00098 AcceptDisconnect(port);
00099 mPortAssignment[port] = "";
00100 mPortAllowWrite[port] = false;
00101 ILOG_INFO("Port " << port << " is free and available again");
00102 }
00103 }
00104 }
00105
00106
00107 virtual void
00108 Start(bool doIdle)
00109 {
00110 mChannel = new Channel(mFirstPort, mNrOfPorts, this);
00111 mRunning = true;
00112 mTimer.Start();
00113 mChannel->Serve(doIdle);
00114
00115 }
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126 int
00127 NrOfPorts()
00128 {
00129 return mNrOfPorts;
00130 }
00131
00132 int
00133 FirstPort()
00134 {
00135 return mFirstPort;
00136 }
00137
00138 bool
00139 UsesDynamicPortAssignment()
00140 {
00141 return mDoDynaPorts;
00142 }
00143
00144 protected:
00145
00146 virtual int
00147 AcceptRequest(char* buf, int len, int bufSize, CString conn, int port)
00148 {
00149 String curCon = ConnectionDescr();
00150
00151 if (strncmp(buf, "ShutDown", 8) == 0)
00152 {
00153 ILOG_WARN("Received shut down request from " << curCon <<
00154 "; shutting down now");
00155 exit(-99);
00156 }
00157
00158 else if (strncmp(buf, "ping", 4) == 0)
00159 {
00160 ILOG_DEBUG("Received ping request from " << curCon);
00161 sprintf(buf, "OK\0");
00162 }
00163
00164 else if ((port > mFirstPort) && mDoDynaPorts &&
00165 (conn != mPortAssignment[port]))
00166 {
00167 ILOG_WARN("Illegal request from " << conn << " at port " << port <<
00168 "; port assigned to " << mPortAssignment[port]);
00169 sprintf(buf, "ERROR\0");
00170 }
00171
00172 else if (strncmp(buf, "PortMode", 8) == 0)
00173 AcceptRequestPortMode(buf, len, bufSize, conn, port);
00174
00175 else if (strncmp(buf, "PortAssignment", 14) == 0)
00176 AcceptRequestPortAssignment(buf, len, bufSize, conn, port);
00177
00178 else
00179 return -1;
00180
00181 return strlen(buf) + 1;
00182 }
00183
00184 virtual void
00185 AcceptRequestPortMode(char* buf, int len, int bufSize, CString conn,
00186 int port)
00187 {
00188 String curCon = ConnectionDescr();
00189 ILOG_DEBUG("Receiving port mode request from " << curCon);
00190 int index = 14;
00191 String readPass = ReadQuotedString(buf, index, len);
00192 if (! (mAuthorization.Valid() && mAuthorization.AllowRead(readPass)))
00193 {
00194 ILOG_WARN("Authorization failed");
00195 sprintf(buf, "Permission denied\0");
00196 return;
00197 }
00198
00199 if (mDoDynaPorts)
00200 {
00201 sprintf(buf, "Dynamic\0");
00202 if (port != mFirstPort)
00203 {
00204 ILOG_WARN("Port mode request from " << curCon <<
00205 " should have been targeted at port " << mFirstPort);
00206 }
00207 }
00208 else
00209 {
00210 sprintf(buf, "Static\0", port);
00211 index += 7;
00212 String writePass = ReadQuotedString(buf, index, len);
00213 mStaticPortAllowWrite = mAuthorization.AllowWrite(writePass);
00214 }
00215 }
00216
00217 virtual void
00218 AcceptRequestPortAssignment(char* buf, int len, int bufSize, CString conn,
00219 int port)
00220 {
00221 String curCon = ConnectionDescr();
00222 ILOG_DEBUG("Receiving port assignment request from " << curCon);
00223 if (mDoDynaPorts)
00224 {
00225 if (port == mFirstPort)
00226 {
00227 int index = 21;
00228 String writePass = ReadQuotedString(buf, index, len);
00229
00230 int dynaPort = 0;
00231 for (dynaPort = mFirstDynaPort; dynaPort <= mLastDynaPort;
00232 dynaPort++)
00233 {
00234 if (mPortAssignment[dynaPort].empty())
00235 {
00236 ILOG_INFO("Assigning port " << dynaPort << " to " <<
00237 curCon);
00238 mPortAssignment[dynaPort] = conn;
00239 mPortAllowWrite[dynaPort] =
00240 mAuthorization.AllowWrite(writePass);
00241 sprintf(buf, "PortAssigned=%i\0", dynaPort);
00242 break;
00243 }
00244 }
00245 if (dynaPort > mLastDynaPort)
00246 {
00247 ILOG_WARN("Cannot assign port: all ports are in use");
00248 sprintf(buf, "NoPortAvailable\0");
00249 }
00250 }
00251 else
00252 {
00253 ILOG_WARN("Invalid port assignment request from " << curCon <<
00254 "; should request at port " << mFirstPort);
00255 sprintf(buf, "ERROR\0");
00256 }
00257 }
00258 else
00259 {
00260 ILOG_WARN("No dynamic port assignment; no need for " << curCon <<
00261 " to request port assignment");
00262 sprintf(buf, "PortAssigned=%i\0", port);
00263 }
00264 }
00265
00266 virtual void
00267 AcceptDisconnect(int port)
00268 {
00269 }
00270
00271 virtual String
00272 ConnectionDescr()
00273 {
00274 CString reqConn = mChannel->GetCurRequestConnection();
00275 int reqPort = mChannel->GetCurRequestPort();
00276 return reqConn + " at port " + MakeString(reqPort);
00277 }
00278
00279 virtual void
00280 UpdateStatusBuf()
00281 {
00282 double timeVal = SplitTime();
00283 std::ostrstream statusStream;
00284 statusStream << "timeVal = " << timeVal << std::ends;
00285 strcpy(mStatusBuf, statusStream.str());
00286
00287 statusStream.freeze(false);
00288 }
00289
00290 double
00291 SplitTime()
00292 {
00293 return mTimer.SplitTime();
00294 }
00295
00296
00297
00298
00299
00300
00301 char*
00302 GetStatusBuffer()
00303 {
00304 return mStatusBuf;
00305 }
00306
00307 String
00308 ReadQuotedString(const char* buf, int& index, int len)
00309 {
00310 while (index < len && buf[index] != '"')
00311 index++;
00312 if (index < len)
00313 {
00314 int startIndex = ++index;
00315 while (index < len && buf[index] != '"')
00316 index++;
00317 const int quotedStringLen = index - startIndex;
00318 if (index < len)
00319 index++;
00320 return String(buf + startIndex, quotedStringLen);
00321 }
00322 return "";
00323 }
00324
00325 bool
00326 AllowWrite(int port)
00327 {
00328 if (mDoDynaPorts)
00329 return mPortAllowWrite[port];
00330 return mStaticPortAllowWrite;
00331 }
00332
00333 private:
00334
00335 void
00336 Init()
00337 {
00338 mChannel = 0;
00339 mRunning = false;
00340 mTimer = 1;
00341
00342
00343 mDoDynaPorts = mDoDynaPorts && (mNrOfPorts > 1);
00344 if (mDoDynaPorts)
00345 {
00346 mFirstDynaPort = mFirstPort + 1;
00347 int nrOfDynaPorts = mNrOfPorts - 1;
00348 mLastDynaPort = mFirstPort + nrOfDynaPorts;
00349 ILOG_INFO("Using dynamic port assignment");
00350 }
00351 else
00352 {
00353 mFirstDynaPort = -1;
00354 mLastDynaPort = -1;
00355 ILOG_INFO("Not using dynamic port assignment");
00356 }
00357
00358 mAuthorization = ChannelAuthorization(mPasswordFile);
00359 }
00360
00361 int mFirstPort;
00362 int mNrOfPorts;
00363 bool mDoDynaPorts;
00364 String mPasswordFile;
00365 int mFirstDynaPort;
00366 int mLastDynaPort;
00367
00368 Channel* mChannel;
00369 bool mRunning;
00370 Timer mTimer;
00371 char mStatusBuf[256];
00372 std::map<int, String> mPortAssignment;
00373 std::map<int, bool> mPortAllowWrite;
00374 bool mStaticPortAllowWrite;
00375 ChannelAuthorization mAuthorization;
00376
00377 ILOG_VAR_DEC;
00378
00379 };
00380
00381 ILOG_VAR_INIT(ChannelServer, Impala.Util);
00382
00383 }
00384 }
00385
00386 #endif