Start acting as a server and forward requests to either mFpRequest or to the listener. In case callIdle is true, the server is non-blocking and will call either mFpIdle or the listener's idle method in case there is no communication going on. Definition at line 212 of file Channel.h. References Buffer(), CloseSocket(), CTR_HEADER_SIZE, DATA_BUFFER_SIZE, GetCurRequestConnection(), GetCurRequestPort(), HandleDisconnect(), HandleError(), HandleInfo(), HandleRequest(), ILOG_DEBUG, ILOG_ERROR, Impala::Util::MakeString(), mCallbackOnIdle, mConnection, mCurRequestSock, mFpRequest, mListener, mMsgSock, mMsgSockEvent, mNrPorts, mPort, mSock, mSockConnected, mSockEvent, RecvLarge(), SendLarge(), and WaitForSocketEvent(). 00213 { 00214 if (!mListener && !mFpRequest) 00215 { 00216 String msg = "Either listener or function pointer(s) must be specified"; 00217 ILOG_ERROR(msg); 00218 return; 00219 } 00220 00221 mCallbackOnIdle = callbackOnIdle; 00222 00223 // Start accepting connections. 00224 for (int i=0 ; i<mSock.size() ; i++) 00225 listen(mSock[i], 100); 00226 String msg = "Listening at port(s) " + MakeString(mPort); 00227 if (mNrPorts > 1) 00228 msg += "-" + MakeString(mPort + mNrPorts - 1); 00229 HandleInfo(msg); 00230 do 00231 { 00232 int res = WaitForSocketEvent(); 00233 if (res == -1) 00234 continue; 00235 for (int s=0 ; s<mSockEvent.size() ; s++) 00236 { 00237 if (mSockEvent[s]) 00238 { 00239 ILOG_DEBUG("Accepting connection at port " << mPort+s); 00240 // org, no address info 00241 //mMsgSock[s] = accept(mSock[s], 0, 0); 00242 00243 // new, obtain address 00244 struct sockaddr_in address; 00245 int addressLen = sizeof(address); 00246 mMsgSock[s] = accept(mSock[s], (sockaddr *) &address, 00247 (socklen_t*) &addressLen); 00248 //ILOG_DEBUG("accept on port " << ntohs(address.sin_port)); 00249 00250 if (mMsgSock[s] == INVALID_SOCKET) 00251 { 00252 HandleError("Serve: accept", true); 00253 } 00254 else 00255 { 00256 mSockConnected[s] = true; 00257 in_addr_t a = address.sin_addr.s_addr; 00258 int a1 = (a & 0xff000000) >> 24; 00259 int a2 = (a & 0x00ff0000) >> 16; 00260 int a3 = (a & 0x0000ff00) >> 8; 00261 int a4 = (a & 0x000000ff); 00262 std::ostringstream oss; 00263 oss << a4 << "." << a3 << "." << a2 << "." << a1; 00264 mConnection[s] = oss.str(); 00265 HandleInfo("Established connection with " + 00266 mConnection[s] + " at port " + 00267 MakeString(mPort + s)); 00268 } 00269 } 00270 if (mMsgSockEvent[s]) 00271 { 00272 ILOG_DEBUG("Receiving at port " << mPort+s); 00273 mCurRequestSock = s; 00274 int rval = RecvLarge(mMsgSock[s]); 00275 if (rval > 0) 00276 { 00277 int nrToSend = HandleRequest(Buffer(), 00278 rval - CTR_HEADER_SIZE, 00279 DATA_BUFFER_SIZE); 00280 rval = SendLarge(mMsgSock[s], nrToSend); 00281 } 00282 if (rval <= 0) 00283 { 00284 HandleInfo("Terminating connection with " + 00285 GetCurRequestConnection() + " at port " + 00286 MakeString(GetCurRequestPort())); 00287 CloseSocket(mMsgSock[s]); 00288 HandleDisconnect(GetCurRequestPort()); 00289 mSockConnected[s] = false; 00290 mConnection[s] = "not connected"; 00291 } 00292 } 00293 } 00294 } 00295 while(1); 00296 }
Here is the call graph for this function:
|