Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

Channel* Impala::Util::ChannelPool::Get ( CString  serverName,
int  port,
CString  socketName,
CString  passwordFile 
) [inline, private]

Definition at line 85 of file ChannelPool.h.

References Impala::Util::Cache< IdxT, ElemT >::AddElement(), Impala::atoi(), Impala::Util::Channel::Buffer(), Impala::Util::Cache< IdxT, ElemT >::GetElement(), Impala::Util::ChannelAuthorization::GetReadPassword(), Impala::Util::ChannelAuthorization::GetWritePassword(), ILOG_DEBUG, ILOG_ERROR, ILOG_WARN, Impala::Util::MakeString(), mCache, Impala::Util::Channel::SendRequest(), Impala::Util::ChannelAuthorization::Valid(), and Impala::Util::Channel::Valid().

00086     {
00087         Channel* chan = 0;
00088         ILOG_DEBUG("Get for " << socketName);
00089         if (! mCache->GetElement(socketName, chan))
00090         {
00091             ILOG_DEBUG("Get: trying new Channel");
00092             chan = new Util::Channel(serverName, port);
00093             if (! chan->Valid())
00094             {
00095                 ILOG_ERROR("Failed to open channel to " << socketName);
00096                 delete chan;
00097                 return 0;
00098             }
00099             ILOG_DEBUG("Get: Opened channel to " << socketName);
00100 
00101             ChannelAuthorization auth(passwordFile);
00102             if (!auth.Valid())
00103             {
00104                 ILOG_ERROR("Cannot open Channel without valid authorization");
00105                 delete chan;
00106                 return 0;
00107             }
00108             char* buf = chan->Buffer();
00109             sprintf(buf, "PortMode read=\"%s\" write=\"%s\"\0",
00110                     auth.GetReadPassword().c_str(),
00111                     auth.GetWritePassword().c_str());
00112             int len = chan->SendRequest(strlen(buf) + 1);
00113             if (len < 0)
00114             {
00115                 ILOG_ERROR("Port mode request failed (" << len <<
00116                            "); closing channel");
00117                 delete chan;
00118                 return 0;
00119             }
00120 
00121             buf[len] = 0;
00122             ILOG_DEBUG("Port mode response from server: " << buf);
00123             if (String(buf) == "Permission denied")
00124             {
00125                 ILOG_ERROR("Permission denied; closing channel");
00126                 delete chan;
00127                 return 0;
00128             }
00129             else if (String(buf) == "Static")
00130             {
00131                 // do nothing; the current channel/port is used for serving
00132                 // further requests
00133             }
00134             else if (String(buf) == "Dynamic")
00135             {
00136                 ILOG_DEBUG("Requesting port assignment");
00137                 sprintf(buf, "PortAssignment write=\"%s\"\0",
00138                         auth.GetWritePassword().c_str());
00139                 len = chan->SendRequest(strlen(buf) + 1);
00140                 if (len < 0)
00141                 {
00142                     ILOG_ERROR("Port assignment request failed (" << len <<
00143                                "); closing channel");
00144                     delete chan;
00145                     return 0;
00146                 }
00147                 String response(buf);
00148                 if (response.substr(0, 15) == "NoPortAvailable")
00149                 {
00150                     ILOG_ERROR("No free port available; closing channel");
00151                     delete chan;
00152                     return 0;
00153                 }
00154                 if (response.substr(0, 13) != "PortAssigned=")
00155                 {
00156                     ILOG_ERROR("Unexpected response to port assignment request ("
00157                                << buf << "); closing channel");
00158                     delete chan;
00159                     return 0;
00160                 }
00161                 int portAssigned = atoi(response.substr(13));
00162                 ILOG_DEBUG("Port assigned is " << portAssigned);
00163                 delete chan;
00164                 ILOG_DEBUG("Closed channel to " << socketName);
00165 
00166                 chan = new Util::Channel(serverName, portAssigned);
00167                 if (! chan->Valid())
00168                 {
00169                     ILOG_ERROR("Failed to open channel to " << serverName << ":"
00170                                << MakeString(portAssigned));
00171                     delete chan;
00172                     return 0;
00173                 }
00174                 ILOG_DEBUG("Opened channel to " << serverName <<
00175                            " at assigned port " << portAssigned);
00176             }
00177             else
00178             {
00179                 ILOG_WARN("Unexpected response to port mode request (" << buf <<
00180                           "); server may not support dynamic port assignment");
00181                 ILOG_DEBUG("Opened channel to " << serverName << " at port " <<
00182                            port);
00183             }
00184 
00185             // Note that for dynamically assigned ports, the actual port number
00186             // is not the same as the port number used for the cache.
00187             ILOG_DEBUG("Adding chan to " << socketName << " : " << chan);
00188             mCache->AddElement(socketName, chan);
00189         }
00190         else
00191         {
00192             ILOG_DEBUG("Get: Returning channel from pool: " << chan);
00193         }
00194 
00195         return chan;
00196     }

Here is the call graph for this function:


Generated on Fri Mar 19 11:39:35 2010 for ImpalaSrc by  doxygen 1.5.1