00001 #ifndef Impala_Util_ChannelPool_h
00002 #define Impala_Util_ChannelPool_h
00003
00004 #include "Basis/ILog.h"
00005 #include "Basis/String.h"
00006 #include "Util/Channel.h"
00007 #include "Util/ChannelAuthorization.h"
00008 #include "Util/Cache.h"
00009
00010 #ifdef MPI_USED
00011 #include "Link/Mpi/MpiFuncs.h"
00012 #endif
00013
00014 namespace Impala
00015 {
00016 namespace Util
00017 {
00018
00019
00020 class ChannelPool
00021 {
00022 public:
00023
00024 ~ChannelPool()
00025 {
00026 delete mCache;
00027 }
00028
00029 static ChannelPool&
00030 Instance()
00031 {
00032 static ChannelPool theSystem;
00033 return theSystem;
00034 }
00035
00036
00037 Channel*
00038 Get(CString socketName, CString passwordFile)
00039 {
00040 String::size_type p = socketName.find(":");
00041 String serverName = socketName.substr(0, p);
00042 int port = atoi(socketName.substr(p+1, String::npos));
00043 return Get(serverName, port, socketName, passwordFile);
00044 }
00045
00046
00047 Channel*
00048 Get(CString serverName, int port, CString passwordFile)
00049 {
00050 return Get(serverName, port, serverName + ":" + MakeString(port),
00051 passwordFile);
00052 }
00053
00054 void
00055 Remove(CString serverName, int port)
00056 {
00057 String socketName = serverName + ":" + MakeString(port);
00058 mCache->RemoveElement(socketName);
00059 }
00060
00061 void
00062 Dump() const
00063 {
00064 mCache->Dump("ChannelPool - ");
00065 }
00066
00067 private:
00068
00069 ChannelPool()
00070 {
00071 mCache = new ChannelCache(CACHE_SIZE, true);
00072 }
00073
00074 ChannelPool(const ChannelPool&)
00075 {
00076 }
00077
00078 ChannelPool&
00079 operator=(const ChannelPool&);
00080
00081
00082
00083
00084 Channel*
00085 Get(CString serverName, int port, CString socketName, CString passwordFile)
00086 {
00087 Channel* chan = 0;
00088 ILOG_DEBUG("Get for " << socketName);
00089 if (! mCache->GetElement(socketName, chan))
00090 {
00091 ILOG_DEBUG("Get: trying new Channel");
00092 chan = new Util::Channel(serverName, port);
00093 if (! chan->Valid())
00094 {
00095 ILOG_ERROR("Failed to open channel to " << socketName);
00096 delete chan;
00097 return 0;
00098 }
00099 ILOG_DEBUG("Get: Opened channel to " << socketName);
00100
00101 ChannelAuthorization auth(passwordFile);
00102 if (!auth.Valid())
00103 {
00104 ILOG_ERROR("Cannot open Channel without valid authorization");
00105 delete chan;
00106 return 0;
00107 }
00108 char* buf = chan->Buffer();
00109 sprintf(buf, "PortMode read=\"%s\" write=\"%s\"\0",
00110 auth.GetReadPassword().c_str(),
00111 auth.GetWritePassword().c_str());
00112 int len = chan->SendRequest(strlen(buf) + 1);
00113 if (len < 0)
00114 {
00115 ILOG_ERROR("Port mode request failed (" << len <<
00116 "); closing channel");
00117 delete chan;
00118 return 0;
00119 }
00120
00121 buf[len] = 0;
00122 ILOG_DEBUG("Port mode response from server: " << buf);
00123 if (String(buf) == "Permission denied")
00124 {
00125 ILOG_ERROR("Permission denied; closing channel");
00126 delete chan;
00127 return 0;
00128 }
00129 else if (String(buf) == "Static")
00130 {
00131
00132
00133 }
00134 else if (String(buf) == "Dynamic")
00135 {
00136 ILOG_DEBUG("Requesting port assignment");
00137 sprintf(buf, "PortAssignment write=\"%s\"\0",
00138 auth.GetWritePassword().c_str());
00139 len = chan->SendRequest(strlen(buf) + 1);
00140 if (len < 0)
00141 {
00142 ILOG_ERROR("Port assignment request failed (" << len <<
00143 "); closing channel");
00144 delete chan;
00145 return 0;
00146 }
00147 String response(buf);
00148 if (response.substr(0, 15) == "NoPortAvailable")
00149 {
00150 ILOG_ERROR("No free port available; closing channel");
00151 delete chan;
00152 return 0;
00153 }
00154 if (response.substr(0, 13) != "PortAssigned=")
00155 {
00156 ILOG_ERROR("Unexpected response to port assignment request ("
00157 << buf << "); closing channel");
00158 delete chan;
00159 return 0;
00160 }
00161 int portAssigned = atoi(response.substr(13));
00162 ILOG_DEBUG("Port assigned is " << portAssigned);
00163 delete chan;
00164 ILOG_DEBUG("Closed channel to " << socketName);
00165
00166 chan = new Util::Channel(serverName, portAssigned);
00167 if (! chan->Valid())
00168 {
00169 ILOG_ERROR("Failed to open channel to " << serverName << ":"
00170 << MakeString(portAssigned));
00171 delete chan;
00172 return 0;
00173 }
00174 ILOG_DEBUG("Opened channel to " << serverName <<
00175 " at assigned port " << portAssigned);
00176 }
00177 else
00178 {
00179 ILOG_WARN("Unexpected response to port mode request (" << buf <<
00180 "); server may not support dynamic port assignment");
00181 ILOG_DEBUG("Opened channel to " << serverName << " at port " <<
00182 port);
00183 }
00184
00185
00186
00187 ILOG_DEBUG("Adding chan to " << socketName << " : " << chan);
00188 mCache->AddElement(socketName, chan);
00189 }
00190 else
00191 {
00192 ILOG_DEBUG("Get: Returning channel from pool: " << chan);
00193 }
00194
00195 return chan;
00196 }
00197
00198 static const int CACHE_SIZE = 100;
00199
00200 typedef Cache<String, Channel*> ChannelCache;
00201 ChannelCache* mCache;
00202
00203 ILOG_VAR_DEC;
00204
00205 };
00206
00207 ILOG_VAR_INIT(ChannelPool, Impala.Util);
00208
00209 }
00210 }
00211
00212 #endif