00001 #ifndef Impala_Util_Channel_h
00002 #define Impala_Util_Channel_h
00003
00004 #ifdef unix
00005 #include <sys/types.h>
00006 #include <sys/socket.h>
00007 #include <netinet/in.h>
00008 #include <netdb.h>
00009 #include <stdio.h>
00010 #include <unistd.h>
00011 #include <sys/ioctl.h>
00012 #include <sys/signal.h>
00013 #include <errno.h>
00014 #ifdef sun
00015 #include <signal.h>
00016 #include <sys/filio.h>
00017 #endif
00018 #else
00019 #include <winsock2.h>
00020 #endif
00021
00022 #include "Basis/String.h"
00023 #include <iostream>
00024 #include <vector>
00025 #include <cstring>
00026
00027 #include "Basis/ILog.h"
00028
00029 namespace Impala
00030 {
00031 namespace Util
00032 {
00033
00035 class ChannelListener
00036 {
00037 public:
00038
00039 virtual void HandleInfo(CString msg) = 0;
00040 virtual void HandleError(CString msg) = 0;
00041 virtual void HandleIdle() = 0;
00042 virtual int HandleRequest(char* buf, int len, int bufSize) = 0;
00043
00044 virtual void HandleDisconnect(int port) = 0;
00045 };
00046
00047 class Channel
00048 {
00049 public:
00050
00052 Channel(int port, int nrPorts, ChannelListener* listener = 0)
00053 {
00054 mNrError = 0;
00055 mPort = port;
00056 mNrPorts = nrPorts;
00057 mIsServer = true;
00058 mFpInfo = 0;
00059 mFpError = 0;
00060 mListener = listener;
00061 mCallbackOnIdle = false;
00062
00063 InitSocket();
00064
00065 mBuffer = new char[DATA_BUFFER_SIZE + CTR_HEADER_SIZE];
00066 if (mBuffer == 0)
00067 HandleError("buffer allocation failed", false);
00068 }
00069
00071 Channel(String serverName, int port)
00072 {
00073 mNrError = 0;
00074 mServerName = serverName;
00075 mPort = port;
00076 mNrPorts = 1;
00077 mIsServer = false;
00078 mFpInfo = 0;
00079 mFpError = 0;
00080 mListener = 0;
00081 mCallbackOnIdle = false;
00082
00083 InitSocket();
00084
00085 mBuffer = new char[DATA_BUFFER_SIZE + CTR_HEADER_SIZE];
00086 if (mBuffer == 0)
00087 HandleError("buffer allocation failed", false);
00088 }
00089
00091 ~Channel()
00092 {
00093 Close();
00094 if (mBuffer)
00095 delete mBuffer;
00096 }
00097
00099 int
00100 GetNrPorts() const
00101 {
00102 return mNrPorts;
00103 }
00104
00106 int
00107 GetPortNumber(int idx) const
00108 {
00109 return mPort + idx;
00110 }
00111
00113 String
00114 GetCurRequestConnection() const
00115 {
00116 return mConnection[mCurRequestSock];
00117 }
00118
00120 int
00121 GetCurRequestPort() const
00122 {
00123 return mPort + mCurRequestSock;
00124 }
00125
00127 String
00128 GetServerInfo()
00129 {
00130 return mServerName + ":" + MakeString(mPort);
00131 }
00132
00134 bool
00135 Valid()
00136 {
00137 return mNrError == 0;
00138 }
00139
00141 void
00142 SetListener(ChannelListener* listener)
00143 {
00144 mListener = listener;
00145 }
00146
00150 typedef void (*FP_INFO)(CString msg);
00151
00155 typedef void (*FP_ERROR)(CString msg);
00156
00160 void
00161 SetMessageFunc(FP_INFO fpInfo, FP_ERROR fpError)
00162 {
00163 mFpInfo = fpInfo;
00164 mFpError = fpError;
00165 }
00166
00168 static const int DATA_BUFFER_SIZE = 10485760;
00169
00173 char*
00174 Buffer()
00175 {
00176 return mBuffer + CTR_HEADER_SIZE;
00177 }
00178
00185 typedef int (*FP_REQUEST)(char* buf, int len, int bufSize);
00186
00190 typedef void (*FP_IDLE)();
00191
00197 void
00198 Serve(FP_REQUEST fpRequest, FP_IDLE fpIdle = 0)
00199 {
00200 mFpRequest = fpRequest;
00201 mFpIdle = fpIdle;
00202 Serve(fpIdle != 0);
00203 }
00204
00211 void
00212 Serve(bool callbackOnIdle = false)
00213 {
00214 if (!mListener && !mFpRequest)
00215 {
00216 String msg = "Either listener or function pointer(s) must be specified";
00217 ILOG_ERROR(msg);
00218 return;
00219 }
00220
00221 mCallbackOnIdle = callbackOnIdle;
00222
00223
00224 for (int i=0 ; i<mSock.size() ; i++)
00225 listen(mSock[i], 100);
00226 String msg = "Listening at port(s) " + MakeString(mPort);
00227 if (mNrPorts > 1)
00228 msg += "-" + MakeString(mPort + mNrPorts - 1);
00229 HandleInfo(msg);
00230 do
00231 {
00232 int res = WaitForSocketEvent();
00233 if (res == -1)
00234 continue;
00235 for (int s=0 ; s<mSockEvent.size() ; s++)
00236 {
00237 if (mSockEvent[s])
00238 {
00239 ILOG_DEBUG("Accepting connection at port " << mPort+s);
00240
00241
00242
00243
00244 struct sockaddr_in address;
00245 int addressLen = sizeof(address);
00246 mMsgSock[s] = accept(mSock[s], (sockaddr *) &address,
00247 (socklen_t*) &addressLen);
00248
00249
00250 if (mMsgSock[s] == INVALID_SOCKET)
00251 {
00252 HandleError("Serve: accept", true);
00253 }
00254 else
00255 {
00256 mSockConnected[s] = true;
00257 in_addr_t a = address.sin_addr.s_addr;
00258 int a1 = (a & 0xff000000) >> 24;
00259 int a2 = (a & 0x00ff0000) >> 16;
00260 int a3 = (a & 0x0000ff00) >> 8;
00261 int a4 = (a & 0x000000ff);
00262 std::ostringstream oss;
00263 oss << a4 << "." << a3 << "." << a2 << "." << a1;
00264 mConnection[s] = oss.str();
00265 HandleInfo("Established connection with " +
00266 mConnection[s] + " at port " +
00267 MakeString(mPort + s));
00268 }
00269 }
00270 if (mMsgSockEvent[s])
00271 {
00272 ILOG_DEBUG("Receiving at port " << mPort+s);
00273 mCurRequestSock = s;
00274 int rval = RecvLarge(mMsgSock[s]);
00275 if (rval > 0)
00276 {
00277 int nrToSend = HandleRequest(Buffer(),
00278 rval - CTR_HEADER_SIZE,
00279 DATA_BUFFER_SIZE);
00280 rval = SendLarge(mMsgSock[s], nrToSend);
00281 }
00282 if (rval <= 0)
00283 {
00284 HandleInfo("Terminating connection with " +
00285 GetCurRequestConnection() + " at port " +
00286 MakeString(GetCurRequestPort()));
00287 CloseSocket(mMsgSock[s]);
00288 HandleDisconnect(GetCurRequestPort());
00289 mSockConnected[s] = false;
00290 mConnection[s] = "not connected";
00291 }
00292 }
00293 }
00294 }
00295 while(1);
00296 }
00297
00301 int
00302 SendRequest(int len)
00303 {
00304 int rval;
00305 rval = SendLarge(mSock[0], len);
00306 rval = RecvLarge(mSock[0]);
00307 if ((rval > 5) && (strncmp(Buffer(), "ERROR", 5) == 0))
00308 {
00309 HandleError("SendRequest to " + mServerName + " got " +
00310 String(Buffer()), false);
00311 mNrError--;
00312 }
00313 return rval - CTR_HEADER_SIZE;
00314 }
00315
00317 bool
00318 LastSendHadError()
00319 {
00320 return strncmp(Buffer(), "ERROR", 5) == 0;
00321 }
00322
00326 void
00327 MaximizeSendRecvBuffer()
00328 {
00329 int nOptVal;
00330 socklen_t nOptLen = sizeof(int);
00331 for (int i=0 ; i<mSock.size() ; i++)
00332 {
00333 int nRet;
00334 nRet = getsockopt(mSock[i], SOL_SOCKET, SO_SNDBUF, (char*)&nOptVal,
00335 &nOptLen);
00336 if (nRet == SOCKET_ERROR)
00337 HandleError("MaximizeSendRecvBuffer", true);
00338 else
00339 HandleInfo("SNDBUF : " + MakeString(nOptVal));
00340 nRet = getsockopt(mSock[i], SOL_SOCKET, SO_RCVBUF, (char*)&nOptVal,
00341 &nOptLen);
00342 if (nRet == SOCKET_ERROR)
00343 HandleError("MaximizeSendRecvBuffer", true);
00344 else
00345 HandleInfo("RCVBUF : " + MakeString(nOptVal));
00346 }
00347 }
00348
00350 void
00351 Close()
00352 {
00353 for (int i=0 ; i<mSock.size() ; i++)
00354 {
00355 String conn = mServerName;
00356 int port = mPort + i;
00357 HandleInfo("Closing connection to " + conn + ":" + MakeString(port));
00358 CloseSocket(mSock[i]);
00359 HandleDisconnect(port);
00360 }
00361
00362 #ifndef unix
00363 if (mIsServer)
00364 WSACleanup();
00365 #endif
00366 }
00367
00368 private:
00369
00370 static const int CTR_HEADER_SIZE = 20;
00371
00372 #ifdef unix
00373 static const int INVALID_SOCKET = -1;
00374 static const int SOCKET_ERROR = -1;
00375 typedef int SOCKET;
00376 #else
00377 typedef int socklen_t;
00378 typedef u_long in_addr_t;
00379 #endif
00380
00381 void
00382 InitSocket()
00383 {
00384 #ifdef unix
00385 signal(SIGPIPE, SIG_IGN);
00386 #else
00387 WORD wVersionRequested = MAKEWORD(1,1);
00388 WSADATA wsaData;
00389
00390
00391 if (WSAStartup(wVersionRequested, &wsaData) != 0)
00392 HandleError("WSAStartup", true);
00393
00394
00395 double socklib_ver = HIBYTE(wsaData.wVersion) / 10.0;
00396 socklib_ver += LOBYTE(wsaData.wVersion);
00397 if (socklib_ver < 1.1)
00398 HandleError("socket library must support 1.1 or greater", true);
00399 #endif
00400
00401 for (int i=0 ; i<mNrPorts ; i++)
00402 {
00403 struct sockaddr_in address;
00404 address.sin_family = AF_INET;
00405 address.sin_port = htons(mPort + i);
00406 struct hostent* hp;
00407 if (mIsServer)
00408 {
00409 address.sin_addr.s_addr = htonl(INADDR_ANY);
00410 }
00411 else
00412 {
00413
00414
00415 if ((hp = gethostbyname(mServerName.c_str())) == NULL)
00416 HandleError("InitSocket: gethostbyname", true);
00417
00418 memcpy((char *) &address.sin_addr, (char *) hp->h_addr,
00419 hp->h_length);
00420 }
00421
00422 SOCKET sock;
00423
00424 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
00425 HandleError("InitSocket: open stream socket", true);
00426 if (mIsServer)
00427 {
00428 if (bind(sock, (struct sockaddr *) &address,
00429 sizeof(address)) == SOCKET_ERROR)
00430 {
00431 HandleError("InitSocket: bind socket", true);
00432 }
00433 }
00434 else
00435 {
00436 int nrTries = 5;
00437 bool done = false;
00438 while (!done)
00439 {
00440 int res = connect(sock, (struct sockaddr *) &address,
00441 sizeof(address));
00442 if (res == SOCKET_ERROR)
00443 {
00444 nrTries--;
00445 if (nrTries < 0)
00446 {
00447 HandleError("InitSocket: connect socket", true);
00448 done = true;
00449 }
00450 else
00451 {
00452 ILOG_INFO("Connect failed, re-trying " << nrTries);
00453 }
00454 }
00455 else
00456 {
00457 done = true;
00458 }
00459 }
00460 }
00461 mSock.push_back(sock);
00462 mSockConnected.push_back(false);
00463 mConnection.push_back("not connected");
00464 }
00465 mSockEvent.resize(mNrPorts);
00466 mMsgSock.resize(mNrPorts);
00467 mMsgSockEvent.resize(mNrPorts);
00468 }
00469
00470 void
00471 CloseSocket(SOCKET sock)
00472 {
00473 #ifdef unix
00474 close(sock);
00475 #else
00476 closesocket(sock);
00477 #endif
00478 }
00479
00482 int
00483 WaitForSocketEvent()
00484 {
00485 ILOG_DEBUG("Waiting for socket event");
00486 int nRet;
00487 int i = 0;
00488 do
00489 {
00490
00491
00492 fd_set readSet;
00493 FD_ZERO(&readSet);
00494 for (int s=0 ; s<mSock.size() ; s++)
00495 {
00496 if (mSockConnected[s])
00497 {
00498 FD_SET(mMsgSock[s], &readSet);
00499 ILOG_DEBUG(" checking on " << mPort+s);
00500 }
00501 }
00502
00503
00504
00505
00506
00507 for (int s=0 ; s<mSock.size() ; s++)
00508 {
00509 if (!mSockConnected[s])
00510 {
00511
00512 FD_SET(mSock[s], &readSet);
00513 ILOG_DEBUG(" checking on " << mPort+s);
00514 }
00515 }
00516 if (mCallbackOnIdle)
00517 {
00518 struct timeval timeOut;
00519 timeOut.tv_sec = 0;
00520 timeOut.tv_usec = 100000 * i;
00521 if (i < 9)
00522 i++;
00523
00524 nRet = select(FD_SETSIZE, &readSet, 0, 0, &timeOut);
00525 }
00526 else
00527 {
00528
00529 nRet = select(FD_SETSIZE, &readSet, 0, 0, 0);
00530 }
00531 if (nRet == SOCKET_ERROR)
00532 {
00533 HandleError("WaitForSocketEvent: select", true);
00534 return -1;
00535 }
00536 if (nRet == 0)
00537 {
00538 if (mCallbackOnIdle)
00539 HandleIdle();
00540 }
00541 else
00542 {
00543 ILOG_DEBUG(nRet << " socket event(s)");
00544
00545 for (int s=0 ; s<mSock.size() ; s++)
00546 {
00547 mSockEvent[s] = false;
00548 mMsgSockEvent[s] = false;
00549 if (!mSockConnected[s])
00550 {
00551
00552 if (FD_ISSET(mSock[s], &readSet))
00553 mSockEvent[s] = true;
00554 }
00555 else
00556 {
00557 if (FD_ISSET(mMsgSock[s], &readSet))
00558 mMsgSockEvent[s] = true;
00559 }
00560 }
00561 }
00562 }
00563 while (nRet == 0);
00564 return nRet;
00565 }
00566
00567 int
00568 SendLarge(SOCKET sock, int nrData)
00569 {
00570 int rval;
00571 ILOG_DEBUG("sending " << nrData << " data bytes");
00572
00573
00574 int nrToSend = nrData + CTR_HEADER_SIZE;
00575 sprintf(mBuffer, "CTR %d\0", nrToSend);
00576 ILOG_DEBUG("send : CTR buf [" << mBuffer << "]");
00577
00578
00579 int nrSend = 0;
00580 char* bufPtr = mBuffer;
00581 do
00582 {
00583 rval = send(sock, bufPtr, nrToSend - nrSend, 0);
00584 if (rval == SOCKET_ERROR)
00585 {
00586 HandleError("SendLarge", true);
00587 return -1;
00588 }
00589 ILOG_DEBUG("sent " << rval << " bytes");
00590 bufPtr += rval;
00591 nrSend += rval;
00592 }
00593 while (nrSend < nrToSend);
00594 if (nrSend != nrToSend)
00595 {
00596 HandleError("sent incorrect number of bytes!", true);
00597 return -1;
00598 }
00599 return nrSend;
00600 }
00601
00602 int
00603 RecvLarge(SOCKET sock)
00604 {
00605 int rval;
00606
00607
00608 ILOG_DEBUG("receiving...");
00609 int nrToRecv = 0;
00610 int nrRecv = 0;
00611 char* bufPtr = mBuffer;
00612 int bufSize = DATA_BUFFER_SIZE + CTR_HEADER_SIZE;
00613 do
00614 {
00615 rval = recv(sock, bufPtr, bufSize-nrRecv, 0);
00616 if (rval == SOCKET_ERROR)
00617 {
00618 HandleError("RecvLarge", true);
00619 return -1;
00620 }
00621 if (rval == 0)
00622 {
00623 ILOG_DEBUG("recv got a 0 bytes message");
00624 return 0;
00625 }
00626 if (nrToRecv == 0)
00627 {
00628 sscanf(bufPtr, "CTR %d", &nrToRecv);
00629 ILOG_DEBUG("recv expects " << nrToRecv << " bytes"
00630 );
00631 }
00632 ILOG_DEBUG(" recv " << rval << " bytes");
00633 bufPtr += rval;
00634 nrRecv += rval;
00635 }
00636 while (nrRecv < nrToRecv);
00637
00638 ILOG_DEBUG("received " << nrRecv << " bytes");
00639 if (nrRecv > nrToRecv)
00640 {
00641 HandleError("received MORE bytes than expected!", true);
00642 return -1;
00643 }
00644 return nrRecv;
00645 }
00646
00647 int
00648 HandleRequest(char* buf, int len, int bufSize)
00649 {
00650 if (mListener)
00651 return mListener->HandleRequest(buf, len, bufSize);
00652 else
00653 return (mFpRequest)(buf, len, bufSize);
00654 }
00655
00656 void
00657 HandleIdle()
00658 {
00659 ILOG_DEBUG("Calling idle function");
00660
00661 if (mListener)
00662 mListener->HandleIdle();
00663 else
00664 (mFpIdle)();
00665 }
00666
00667 void
00668 HandleInfo(CString msg)
00669 {
00670 if (mListener)
00671 mListener->HandleInfo(msg);
00672 else if (mFpInfo)
00673 (mFpInfo)(msg);
00674 else
00675 ILOG_DEBUG(msg);
00676 }
00677
00678 void
00679 HandleError(CString msg, bool isSockError)
00680 {
00681 mNrError++;
00682 String err;
00683
00684 #ifdef unix
00685
00686 err = String(strerror(errno));
00687
00688 #else
00689
00690
00691
00692 switch (WSAGetLastError())
00693 {
00694 case WSANOTINITIALISED :
00695 err = "Unable to initialise socket.";
00696 break;
00697 case WSAEAFNOSUPPORT :
00698 err = "The specified address family is not supported.";
00699 break;
00700 case WSAEADDRNOTAVAIL :
00701 err = "Specified address is not available from the local machine.";
00702 break;
00703 case WSAECONNREFUSED :
00704 err = "The attempt to connect was forcefully rejected.";
00705 break;
00706 case WSAEDESTADDRREQ :
00707 err = "Address destination address is required.";
00708 break;
00709 case WSAEFAULT :
00710 err = "The namelen argument is incorrect.";
00711 break;
00712 case WSAEINVAL :
00713 err = "The socket is not already bound to an address.";
00714 break;
00715 case WSAEISCONN :
00716 err = "The socket is already connected.";
00717 break;
00718 case WSAEADDRINUSE :
00719 err = "The specified address is already in use.";
00720 break;
00721 case WSAEMFILE :
00722 err = "No more file descriptors are available.";
00723 break;
00724 case WSAENOBUFS :
00725 err = "No buffer space available. The socket cannot be created.";
00726 break;
00727 case WSAEPROTONOSUPPORT :
00728 err = "The specified protocol is not supported.";
00729 break;
00730 case WSAEPROTOTYPE :
00731 err = "The specified protocol is the wrong type for this socket.";
00732 break;
00733 case WSAENETUNREACH :
00734 err = "The network can't be reached from this host at this time.";
00735 break;
00736 case WSAENOTSOCK :
00737 err = "The descriptor is not a socket.";
00738 break;
00739 case WSAETIMEDOUT :
00740 err = "Attempt timed out without establishing a connection.";
00741 break;
00742 case WSAESOCKTNOSUPPORT :
00743 err = "Socket type is not supported in this address family.";
00744 break;
00745 case WSAENETDOWN :
00746 err = "Network subsystem failure.";
00747 break;
00748 case WSAHOST_NOT_FOUND :
00749 err = "Authoritative Answer Host not found.";
00750 break;
00751 case WSATRY_AGAIN :
00752 err = "Non-Authoritative Host not found or SERVERFAIL.";
00753 break;
00754 case WSANO_RECOVERY :
00755 err = "Non recoverable errors, FORMERR, REFUSED, NOTIMP.";
00756 break;
00757 case WSANO_DATA :
00758 err = "Valid name, no data record of requested type.";
00759 break;
00760 case WSAEINPROGRESS :
00761 err = "Address blocking Windows Sockets operation is in progress.";
00762 break;
00763 case WSAEINTR :
00764 err = "The (blocking) call was canceled via WSACancelBlockingCall.";
00765 break;
00766 default :
00767 err = "Unknown error.";
00768 break;
00769 }
00770
00771 #endif
00772
00773 String finalMsg = msg;
00774 if (isSockError)
00775 finalMsg += ": " + err;
00776
00777 if (mListener)
00778 mListener->HandleError(finalMsg);
00779 else if (mFpError)
00780 (mFpError)(finalMsg);
00781 else
00782 ILOG_ERROR(finalMsg);
00783 }
00784
00785
00786
00787
00788
00789
00790
00791
00792 void
00793 HandleDisconnect(int port)
00794 {
00795 if (mListener)
00796 mListener->HandleDisconnect(port);
00797 }
00798
00799 bool mIsServer;
00800 String mServerName;
00801 int mPort;
00802 int mNrPorts;
00803 std::vector<SOCKET> mSock;
00804 std::vector<bool> mSockConnected;
00805 std::vector<String> mConnection;
00806 std::vector<bool> mSockEvent;
00807 std::vector<SOCKET> mMsgSock;
00808 std::vector<bool> mMsgSockEvent;
00809 char* mBuffer;
00810 int mNrError;
00811 int mCurRequestSock;
00812 FP_INFO mFpInfo;
00813 FP_ERROR mFpError;
00814 FP_REQUEST mFpRequest;
00815 FP_IDLE mFpIdle;
00816 bool mCallbackOnIdle;
00817 ChannelListener* mListener;
00818
00819 ILOG_VAR_DECL;
00820
00821 };
00822
00823 ILOG_VAR_INIT(Channel, Impala.Util);
00824
00825 }
00826 }
00827
00828 #endif