Definition at line 85 of file ChannelPool.h. References Impala::Util::Cache< IdxT, ElemT >::AddElement(), Impala::atoi(), Impala::Util::Channel::Buffer(), Impala::Util::Cache< IdxT, ElemT >::GetElement(), Impala::Util::ChannelAuthorization::GetReadPassword(), Impala::Util::ChannelAuthorization::GetWritePassword(), ILOG_DEBUG, ILOG_ERROR, ILOG_WARN, Impala::Util::MakeString(), mCache, Impala::Util::Channel::SendRequest(), Impala::Util::ChannelAuthorization::Valid(), and Impala::Util::Channel::Valid(). 00086 { 00087 Channel* chan = 0; 00088 ILOG_DEBUG("Get for " << socketName); 00089 if (! mCache->GetElement(socketName, chan)) 00090 { 00091 ILOG_DEBUG("Get: trying new Channel"); 00092 chan = new Util::Channel(serverName, port); 00093 if (! chan->Valid()) 00094 { 00095 ILOG_ERROR("Failed to open channel to " << socketName); 00096 delete chan; 00097 return 0; 00098 } 00099 ILOG_DEBUG("Get: Opened channel to " << socketName); 00100 00101 ChannelAuthorization auth(passwordFile); 00102 if (!auth.Valid()) 00103 { 00104 ILOG_ERROR("Cannot open Channel without valid authorization"); 00105 delete chan; 00106 return 0; 00107 } 00108 char* buf = chan->Buffer(); 00109 sprintf(buf, "PortMode read=\"%s\" write=\"%s\"\0", 00110 auth.GetReadPassword().c_str(), 00111 auth.GetWritePassword().c_str()); 00112 int len = chan->SendRequest(strlen(buf) + 1); 00113 if (len < 0) 00114 { 00115 ILOG_ERROR("Port mode request failed (" << len << 00116 "); closing channel"); 00117 delete chan; 00118 return 0; 00119 } 00120 00121 buf[len] = 0; 00122 ILOG_DEBUG("Port mode response from server: " << buf); 00123 if (String(buf) == "Permission denied") 00124 { 00125 ILOG_ERROR("Permission denied; closing channel"); 00126 delete chan; 00127 return 0; 00128 } 00129 else if (String(buf) == "Static") 00130 { 00131 // do nothing; the current channel/port is used for serving 00132 // further requests 00133 } 00134 else if (String(buf) == "Dynamic") 00135 { 00136 ILOG_DEBUG("Requesting port assignment"); 00137 sprintf(buf, "PortAssignment write=\"%s\"\0", 00138 auth.GetWritePassword().c_str()); 00139 len = chan->SendRequest(strlen(buf) + 1); 00140 if (len < 0) 00141 { 00142 ILOG_ERROR("Port assignment request failed (" << len << 00143 "); closing channel"); 00144 delete chan; 00145 return 0; 00146 } 00147 String response(buf); 00148 if (response.substr(0, 15) == "NoPortAvailable") 00149 { 00150 ILOG_ERROR("No free port available; closing channel"); 00151 delete chan; 00152 return 0; 00153 } 00154 if (response.substr(0, 13) != "PortAssigned=") 00155 { 00156 ILOG_ERROR("Unexpected response to port assignment request (" 00157 << buf << "); closing channel"); 00158 delete chan; 00159 return 0; 00160 } 00161 int portAssigned = atoi(response.substr(13)); 00162 ILOG_DEBUG("Port assigned is " << portAssigned); 00163 delete chan; 00164 ILOG_DEBUG("Closed channel to " << socketName); 00165 00166 chan = new Util::Channel(serverName, portAssigned); 00167 if (! chan->Valid()) 00168 { 00169 ILOG_ERROR("Failed to open channel to " << serverName << ":" 00170 << MakeString(portAssigned)); 00171 delete chan; 00172 return 0; 00173 } 00174 ILOG_DEBUG("Opened channel to " << serverName << 00175 " at assigned port " << portAssigned); 00176 } 00177 else 00178 { 00179 ILOG_WARN("Unexpected response to port mode request (" << buf << 00180 "); server may not support dynamic port assignment"); 00181 ILOG_DEBUG("Opened channel to " << serverName << " at port " << 00182 port); 00183 } 00184 00185 // Note that for dynamically assigned ports, the actual port number 00186 // is not the same as the port number used for the cache. 00187 ILOG_DEBUG("Adding chan to " << socketName << " : " << chan); 00188 mCache->AddElement(socketName, chan); 00189 } 00190 else 00191 { 00192 ILOG_DEBUG("Get: Returning channel from pool: " << chan); 00193 } 00194 00195 return chan; 00196 }
Here is the call graph for this function:
|