Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

ChannelPool.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_ChannelPool_h
00002 #define Impala_Util_ChannelPool_h
00003 
00004 #include "Basis/ILog.h"
00005 #include "Basis/String.h"
00006 #include "Util/Channel.h"
00007 #include "Util/ChannelAuthorization.h"
00008 #include "Util/Cache.h"
00009 
00010 #ifdef MPI_USED
00011 #include "Link/Mpi/MpiFuncs.h"
00012 #endif
00013 
00014 namespace Impala
00015 {
00016 namespace Util
00017 {
00018 
00019 
00020 class ChannelPool
00021 {
00022 public:
00023 
00024     ~ChannelPool()
00025     {
00026         delete mCache;
00027     }
00028 
00029     static ChannelPool&
00030     Instance()
00031     {
00032         static ChannelPool theSystem;
00033         return theSystem;
00034     }
00035 
00036     // socketName == host.domain:port
00037     Channel*
00038     Get(CString socketName, CString passwordFile)
00039     {
00040         String::size_type p = socketName.find(":");
00041         String serverName = socketName.substr(0, p);
00042         int port = atoi(socketName.substr(p+1, String::npos));
00043         return Get(serverName, port, socketName, passwordFile);
00044     }
00045 
00046     // serverName == host.domain
00047     Channel*
00048     Get(CString serverName, int port, CString passwordFile)
00049     {
00050         return Get(serverName, port, serverName + ":" + MakeString(port),
00051                    passwordFile);
00052     }
00053 
00054     void
00055     Remove(CString serverName, int port)
00056     {
00057         String socketName = serverName + ":" + MakeString(port);
00058         mCache->RemoveElement(socketName);
00059     }
00060 
00061     void
00062     Dump() const
00063     {
00064         mCache->Dump("ChannelPool - ");
00065     }
00066 
00067 private:
00068 
00069     ChannelPool()
00070     {
00071         mCache = new ChannelCache(CACHE_SIZE, true);
00072     }
00073 
00074     ChannelPool(const ChannelPool&)
00075     {
00076     }
00077 
00078     ChannelPool&
00079     operator=(const ChannelPool&);
00080 
00081     // multiple calls to this method specifying the same server+port, will 
00082     // be returned one and the same open channel object (assuming the channel
00083     // can successfully be created).
00084     Channel*
00085     Get(CString serverName, int port, CString socketName, CString passwordFile)
00086     {
00087         Channel* chan = 0;
00088         ILOG_DEBUG("Get for " << socketName);
00089         if (! mCache->GetElement(socketName, chan))
00090         {
00091             ILOG_DEBUG("Get: trying new Channel");
00092             chan = new Util::Channel(serverName, port);
00093             if (! chan->Valid())
00094             {
00095                 ILOG_ERROR("Failed to open channel to " << socketName);
00096                 delete chan;
00097                 return 0;
00098             }
00099             ILOG_DEBUG("Get: Opened channel to " << socketName);
00100 
00101             ChannelAuthorization auth(passwordFile);
00102             if (!auth.Valid())
00103             {
00104                 ILOG_ERROR("Cannot open Channel without valid authorization");
00105                 delete chan;
00106                 return 0;
00107             }
00108             char* buf = chan->Buffer();
00109             sprintf(buf, "PortMode read=\"%s\" write=\"%s\"\0",
00110                     auth.GetReadPassword().c_str(),
00111                     auth.GetWritePassword().c_str());
00112             int len = chan->SendRequest(strlen(buf) + 1);
00113             if (len < 0)
00114             {
00115                 ILOG_ERROR("Port mode request failed (" << len <<
00116                            "); closing channel");
00117                 delete chan;
00118                 return 0;
00119             }
00120 
00121             buf[len] = 0;
00122             ILOG_DEBUG("Port mode response from server: " << buf);
00123             if (String(buf) == "Permission denied")
00124             {
00125                 ILOG_ERROR("Permission denied; closing channel");
00126                 delete chan;
00127                 return 0;
00128             }
00129             else if (String(buf) == "Static")
00130             {
00131                 // do nothing; the current channel/port is used for serving
00132                 // further requests
00133             }
00134             else if (String(buf) == "Dynamic")
00135             {
00136                 ILOG_DEBUG("Requesting port assignment");
00137                 sprintf(buf, "PortAssignment write=\"%s\"\0",
00138                         auth.GetWritePassword().c_str());
00139                 len = chan->SendRequest(strlen(buf) + 1);
00140                 if (len < 0)
00141                 {
00142                     ILOG_ERROR("Port assignment request failed (" << len <<
00143                                "); closing channel");
00144                     delete chan;
00145                     return 0;
00146                 }
00147                 String response(buf);
00148                 if (response.substr(0, 15) == "NoPortAvailable")
00149                 {
00150                     ILOG_ERROR("No free port available; closing channel");
00151                     delete chan;
00152                     return 0;
00153                 }
00154                 if (response.substr(0, 13) != "PortAssigned=")
00155                 {
00156                     ILOG_ERROR("Unexpected response to port assignment request ("
00157                                << buf << "); closing channel");
00158                     delete chan;
00159                     return 0;
00160                 }
00161                 int portAssigned = atoi(response.substr(13));
00162                 ILOG_DEBUG("Port assigned is " << portAssigned);
00163                 delete chan;
00164                 ILOG_DEBUG("Closed channel to " << socketName);
00165 
00166                 chan = new Util::Channel(serverName, portAssigned);
00167                 if (! chan->Valid())
00168                 {
00169                     ILOG_ERROR("Failed to open channel to " << serverName << ":"
00170                                << MakeString(portAssigned));
00171                     delete chan;
00172                     return 0;
00173                 }
00174                 ILOG_DEBUG("Opened channel to " << serverName <<
00175                            " at assigned port " << portAssigned);
00176             }
00177             else
00178             {
00179                 ILOG_WARN("Unexpected response to port mode request (" << buf <<
00180                           "); server may not support dynamic port assignment");
00181                 ILOG_DEBUG("Opened channel to " << serverName << " at port " <<
00182                            port);
00183             }
00184 
00185             // Note that for dynamically assigned ports, the actual port number
00186             // is not the same as the port number used for the cache.
00187             ILOG_DEBUG("Adding chan to " << socketName << " : " << chan);
00188             mCache->AddElement(socketName, chan);
00189         }
00190         else
00191         {
00192             ILOG_DEBUG("Get: Returning channel from pool: " << chan);
00193         }
00194 
00195         return chan;
00196     }
00197 
00198     static const int CACHE_SIZE = 100;
00199 
00200     typedef Cache<String, Channel*> ChannelCache;
00201     ChannelCache* mCache;
00202 
00203     ILOG_VAR_DEC;
00204 
00205 }; // class
00206 
00207 ILOG_VAR_INIT(ChannelPool, Impala.Util);
00208 
00209 } // namespace Util
00210 } // namespace Impala
00211 
00212 #endif

Generated on Fri Mar 19 09:31:46 2010 for ImpalaSrc by  doxygen 1.5.1