00001 #ifndef Impala_Util_Channel_h
00002 #define Impala_Util_Channel_h
00003
00004 #ifdef unix
00005 #include <sys/types.h>
00006 #include <sys/socket.h>
00007 #include <netinet/in.h>
00008 #include <netdb.h>
00009 #include <stdio.h>
00010 #include <unistd.h>
00011 #include <sys/ioctl.h>
00012 #include <sys/signal.h>
00013 #include <errno.h>
00014 #ifdef sun
00015 #include <signal.h>
00016 #include <sys/filio.h>
00017 #endif
00018 #else
00019 #include <winsock2.h>
00020 #endif
00021
00022 #include "Basis/String.h"
00023 #include <iostream>
00024 #include <vector>
00025 #include <cstring>
00026
00027 #include "Basis/ILog.h"
00028
00029 namespace Impala
00030 {
00031 namespace Util
00032 {
00033
00035 class ChannelListener
00036 {
00037 public:
00038
00039 virtual void HandleInfo(CString msg) = 0;
00040 virtual void HandleError(CString msg) = 0;
00041 virtual void HandleIdle() = 0;
00042 virtual int HandleRequest(char* buf, int len, int bufSize) = 0;
00043
00044 virtual void HandleDisconnect(int port) = 0;
00045 };
00046
00047 class Channel
00048 {
00049 public:
00050
00052 Channel(int port, int nrPorts, ChannelListener* listener = 0)
00053 {
00054 mNrError = 0;
00055 mPort = port;
00056 mNrPorts = nrPorts;
00057 mIsServer = true;
00058 mFpInfo = 0;
00059 mFpError = 0;
00060 mListener = listener;
00061 mCallbackOnIdle = false;
00062
00063 InitSocket();
00064
00065 mBuffer = new char[DATA_BUFFER_SIZE + CTR_HEADER_SIZE];
00066 if (mBuffer == 0)
00067 HandleError("buffer allocation failed", false);
00068 }
00069
00071 Channel(String serverName, int port)
00072 {
00073 mNrError = 0;
00074 mServerName = serverName;
00075 mPort = port;
00076 mNrPorts = 1;
00077 mIsServer = false;
00078 mFpInfo = 0;
00079 mFpError = 0;
00080 mListener = 0;
00081 mCallbackOnIdle = false;
00082
00083 InitSocket();
00084
00085 mBuffer = new char[DATA_BUFFER_SIZE + CTR_HEADER_SIZE];
00086 if (mBuffer == 0)
00087 HandleError("buffer allocation failed", false);
00088 }
00089
00091 ~Channel()
00092 {
00093 Close();
00094 if (mBuffer)
00095 delete mBuffer;
00096 }
00097
00099 int
00100 GetNrPorts() const
00101 {
00102 return mNrPorts;
00103 }
00104
00106 int
00107 GetPortNumber(int idx) const
00108 {
00109 return mPort + idx;
00110 }
00111
00113 String
00114 GetCurRequestConnection() const
00115 {
00116 return mConnection[mCurRequestSock];
00117 }
00118
00120 int
00121 GetCurRequestPort() const
00122 {
00123 return mPort + mCurRequestSock;
00124 }
00125
00127 String
00128 GetServerInfo()
00129 {
00130 return mServerName + ":" + MakeString(mPort);
00131 }
00132
00134 bool
00135 Valid()
00136 {
00137 return mNrError == 0;
00138 }
00139
00141 void
00142 SetListener(ChannelListener* listener)
00143 {
00144 mListener = listener;
00145 }
00146
00150 typedef void (*FP_INFO)(CString msg);
00151
00155 typedef void (*FP_ERROR)(CString msg);
00156
00160 void
00161 SetMessageFunc(FP_INFO fpInfo, FP_ERROR fpError)
00162 {
00163 mFpInfo = fpInfo;
00164 mFpError = fpError;
00165 }
00166
00168 static const int DATA_BUFFER_SIZE = 10485760;
00169
00173 char*
00174 Buffer()
00175 {
00176 return mBuffer + CTR_HEADER_SIZE;
00177 }
00178
00185 typedef int (*FP_REQUEST)(char* buf, int len, int bufSize);
00186
00190 typedef void (*FP_IDLE)();
00191
00197 void
00198 Serve(FP_REQUEST fpRequest, FP_IDLE fpIdle = 0)
00199 {
00200 mFpRequest = fpRequest;
00201 mFpIdle = fpIdle;
00202 Serve(fpIdle != 0);
00203 }
00204
00211 void
00212 Serve(bool callbackOnIdle = false)
00213 {
00214 if (!mListener && !mFpRequest)
00215 {
00216 String msg = "Either listener or function pointer(s) must be specified";
00217 ILOG_ERROR(msg);
00218 return;
00219 }
00220
00221 mCallbackOnIdle = callbackOnIdle;
00222
00223
00224 for (int i=0 ; i<mSock.size() ; i++)
00225 listen(mSock[i], 100);
00226 String msg = "Listening at port(s) " + MakeString(mPort);
00227 if (mNrPorts > 1)
00228 msg += "-" + MakeString(mPort + mNrPorts - 1);
00229 HandleInfo(msg);
00230 do
00231 {
00232 int res = WaitForSocketEvent();
00233 if (res == -1)
00234 continue;
00235 for (int s=0 ; s<mSockEvent.size() ; s++)
00236 {
00237 if (mSockEvent[s])
00238 {
00239 ILOG_DEBUG("Accepting connection at port " << mPort+s);
00240
00241
00242
00243
00244 struct sockaddr_in address;
00245 int addressLen = sizeof(address);
00246 mMsgSock[s] = accept(mSock[s], (sockaddr *) &address,
00247 (socklen_t*) &addressLen);
00248
00249
00250 if (mMsgSock[s] == INVALID_SOCKET)
00251 {
00252 HandleError("Serve: accept", true);
00253 }
00254 else
00255 {
00256 mSockConnected[s] = true;
00257 in_addr_t a = address.sin_addr.s_addr;
00258 int a1 = (a & 0xff000000) >> 24;
00259 int a2 = (a & 0x00ff0000) >> 16;
00260 int a3 = (a & 0x0000ff00) >> 8;
00261 int a4 = (a & 0x000000ff);
00262 std::ostringstream oss;
00263 oss << a4 << "." << a3 << "." << a2 << "." << a1;
00264 mConnection[s] = oss.str();
00265 HandleInfo("Established connection with " +
00266 mConnection[s] + " at port " +
00267 MakeString(mPort + s));
00268 }
00269 }
00270 if (mMsgSockEvent[s])
00271 {
00272 ILOG_DEBUG("Receiving at port " << mPort+s);
00273 mCurRequestSock = s;
00274 int rval = RecvLarge(mMsgSock[s]);
00275 if (rval > 0)
00276 {
00277 int nrToSend = HandleRequest(Buffer(),
00278 rval - CTR_HEADER_SIZE,
00279 DATA_BUFFER_SIZE);
00280 rval = SendLarge(mMsgSock[s], nrToSend);
00281 }
00282 if (rval <= 0)
00283 {
00284 HandleInfo("Terminating connection with " +
00285 GetCurRequestConnection() + " at port " +
00286 MakeString(GetCurRequestPort()));
00287 CloseSocket(mMsgSock[s]);
00288 HandleDisconnect(GetCurRequestPort());
00289 mSockConnected[s] = false;
00290 mConnection[s] = "not connected";
00291 }
00292 }
00293 }
00294 }
00295 while(1);
00296 }
00297
00301 int
00302 SendRequest(int len)
00303 {
00304 int rval;
00305 rval = SendLarge(mSock[0], len);
00306 rval = RecvLarge(mSock[0]);
00307 if ((rval > 5) && (strncmp(Buffer(), "ERROR", 5) == 0))
00308 {
00309 HandleError("SendRequest to " + mServerName + " got " +
00310 String(Buffer()), false);
00311 mNrError--;
00312 }
00313 return rval - CTR_HEADER_SIZE;
00314 }
00315
00317 bool
00318 LastSendHadError()
00319 {
00320 return strncmp(Buffer(), "ERROR", 5) == 0;
00321 }
00322
00326 void
00327 MaximizeSendRecvBuffer()
00328 {
00329 int nOptVal;
00330 socklen_t nOptLen = sizeof(int);
00331 for (int i=0 ; i<mSock.size() ; i++)
00332 {
00333 int nRet;
00334 nRet = getsockopt(mSock[i], SOL_SOCKET, SO_SNDBUF, (char*)&nOptVal,
00335 &nOptLen);
00336 if (nRet == SOCKET_ERROR)
00337 HandleError("MaximizeSendRecvBuffer", true);
00338 else
00339 HandleInfo("SNDBUF : " + MakeString(nOptVal));
00340 nRet = getsockopt(mSock[i], SOL_SOCKET, SO_RCVBUF, (char*)&nOptVal,
00341 &nOptLen);
00342 if (nRet == SOCKET_ERROR)
00343 HandleError("MaximizeSendRecvBuffer", true);
00344 else
00345 HandleInfo("RCVBUF : " + MakeString(nOptVal));
00346 }
00347 }
00348
00350 void
00351 Close()
00352 {
00353 for (int i=0 ; i<mSock.size() ; i++)
00354 {
00355 String conn = mServerName;
00356 int port = mPort + i;
00357 HandleInfo("Closing connection to " + conn + ":" + MakeString(port));
00358 CloseSocket(mSock[i]);
00359 HandleDisconnect(port);
00360 }
00361
00362 #ifndef unix
00363 if (mIsServer)
00364 WSACleanup();
00365 #endif
00366 }
00367
00368 private:
00369
00370 static const int CTR_HEADER_SIZE = 20;
00371
00372 #ifdef unix
00373 static const int INVALID_SOCKET = -1;
00374 static const int SOCKET_ERROR = -1;
00375 typedef int SOCKET;
00376 #else
00377 typedef int socklen_t;
00378 typedef u_long in_addr_t;
00379 #endif
00380
00381 void
00382 InitSocket()
00383 {
00384 #ifdef unix
00385 signal(SIGPIPE, SIG_IGN);
00386 #else
00387 WORD wVersionRequested = MAKEWORD(1,1);
00388 WSADATA wsaData;
00389
00390
00391 if (WSAStartup(wVersionRequested, &wsaData) != 0)
00392 HandleError("WSAStartup", true);
00393
00394
00395 double socklib_ver = HIBYTE(wsaData.wVersion) / 10.0;
00396 socklib_ver += LOBYTE(wsaData.wVersion);
00397 if (socklib_ver < 1.1)
00398 HandleError("socket library must support 1.1 or greater", true);
00399 #endif
00400
00401 for (int i=0 ; i<mNrPorts ; i++)
00402 {
00403 struct sockaddr_in address;
00404 address.sin_family = AF_INET;
00405 address.sin_port = htons(mPort + i);
00406 struct hostent* hp;
00407 if (mIsServer)
00408 {
00409 address.sin_addr.s_addr = htonl(INADDR_ANY);
00410 }
00411 else
00412 {
00413
00414
00415 if ((hp = gethostbyname(mServerName.c_str())) == NULL)
00416 HandleError("InitSocket: gethostbyname", true);
00417
00418 memcpy((char *) &address.sin_addr, (char *) hp->h_addr,
00419 hp->h_length);
00420 }
00421
00422 SOCKET sock;
00423
00424 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
00425 HandleError("InitSocket: open stream socket", true);
00426 if (mIsServer)
00427 {
00428 if (bind(sock, (struct sockaddr *) &address,
00429 sizeof(address)) == SOCKET_ERROR)
00430 {
00431 HandleError("InitSocket: bind socket", true);
00432 }
00433 }
00434 else
00435 {
00436 if (connect(sock, (struct sockaddr *) &address,
00437 sizeof(address)) == SOCKET_ERROR)
00438 {
00439 HandleError("InitSocket: connect socket", true);
00440 }
00441 }
00442 mSock.push_back(sock);
00443 mSockConnected.push_back(false);
00444 mConnection.push_back("not connected");
00445 }
00446 mSockEvent.resize(mNrPorts);
00447 mMsgSock.resize(mNrPorts);
00448 mMsgSockEvent.resize(mNrPorts);
00449 }
00450
00451 void
00452 CloseSocket(SOCKET sock)
00453 {
00454 #ifdef unix
00455 close(sock);
00456 #else
00457 closesocket(sock);
00458 #endif
00459 }
00460
00463 int
00464 WaitForSocketEvent()
00465 {
00466 ILOG_DEBUG("Waiting for socket event");
00467 int nRet;
00468 int i = 0;
00469 do
00470 {
00471
00472
00473 fd_set readSet;
00474 FD_ZERO(&readSet);
00475 for (int s=0 ; s<mSock.size() ; s++)
00476 {
00477 if (mSockConnected[s])
00478 {
00479 FD_SET(mMsgSock[s], &readSet);
00480 ILOG_DEBUG(" checking on " << mPort+s);
00481 }
00482 }
00483
00484
00485
00486
00487
00488 for (int s=0 ; s<mSock.size() ; s++)
00489 {
00490 if (!mSockConnected[s])
00491 {
00492
00493 FD_SET(mSock[s], &readSet);
00494 ILOG_DEBUG(" checking on " << mPort+s);
00495 }
00496 }
00497 if (mCallbackOnIdle)
00498 {
00499 struct timeval timeOut;
00500 timeOut.tv_sec = 0;
00501 timeOut.tv_usec = 100000 * i;
00502 if (i < 9)
00503 i++;
00504
00505 nRet = select(FD_SETSIZE, &readSet, 0, 0, &timeOut);
00506 }
00507 else
00508 {
00509
00510 nRet = select(FD_SETSIZE, &readSet, 0, 0, 0);
00511 }
00512 if (nRet == SOCKET_ERROR)
00513 {
00514 HandleError("WaitForSocketEvent: select", true);
00515 return -1;
00516 }
00517 if (nRet == 0)
00518 {
00519 if (mCallbackOnIdle)
00520 HandleIdle();
00521 }
00522 else
00523 {
00524 ILOG_DEBUG(nRet << " socket event(s)");
00525
00526 for (int s=0 ; s<mSock.size() ; s++)
00527 {
00528 mSockEvent[s] = false;
00529 mMsgSockEvent[s] = false;
00530 if (!mSockConnected[s])
00531 {
00532
00533 if (FD_ISSET(mSock[s], &readSet))
00534 mSockEvent[s] = true;
00535 }
00536 else
00537 {
00538 if (FD_ISSET(mMsgSock[s], &readSet))
00539 mMsgSockEvent[s] = true;
00540 }
00541 }
00542 }
00543 }
00544 while (nRet == 0);
00545 return nRet;
00546 }
00547
00548 int
00549 SendLarge(SOCKET sock, int nrData)
00550 {
00551 int rval;
00552 ILOG_DEBUG("sending " << nrData << " data bytes");
00553
00554
00555 int nrToSend = nrData + CTR_HEADER_SIZE;
00556 sprintf(mBuffer, "CTR %d\0", nrToSend);
00557 ILOG_DEBUG("send : CTR buf [" << mBuffer << "]");
00558
00559
00560 int nrSend = 0;
00561 char* bufPtr = mBuffer;
00562 do
00563 {
00564 rval = send(sock, bufPtr, nrToSend - nrSend, 0);
00565 if (rval == SOCKET_ERROR)
00566 {
00567 HandleError("SendLarge", true);
00568 return -1;
00569 }
00570 ILOG_DEBUG("sent " << rval << " bytes");
00571 bufPtr += rval;
00572 nrSend += rval;
00573 }
00574 while (nrSend < nrToSend);
00575 if (nrSend != nrToSend)
00576 {
00577 HandleError("sent incorrect number of bytes!", true);
00578 return -1;
00579 }
00580 return nrSend;
00581 }
00582
00583 int
00584 RecvLarge(SOCKET sock)
00585 {
00586 int rval;
00587
00588
00589 ILOG_DEBUG("receiving...");
00590 int nrToRecv = 0;
00591 int nrRecv = 0;
00592 char* bufPtr = mBuffer;
00593 int bufSize = DATA_BUFFER_SIZE + CTR_HEADER_SIZE;
00594 do
00595 {
00596 rval = recv(sock, bufPtr, bufSize-nrRecv, 0);
00597 if (rval == SOCKET_ERROR)
00598 {
00599 HandleError("RecvLarge", true);
00600 return -1;
00601 }
00602 if (rval == 0)
00603 {
00604 ILOG_DEBUG("recv got a 0 bytes message");
00605 return 0;
00606 }
00607 if (nrToRecv == 0)
00608 {
00609 sscanf(bufPtr, "CTR %d", &nrToRecv);
00610 ILOG_DEBUG("recv expects " << nrToRecv << " bytes"
00611 );
00612 }
00613 ILOG_DEBUG(" recv " << rval << " bytes");
00614 bufPtr += rval;
00615 nrRecv += rval;
00616 }
00617 while (nrRecv < nrToRecv);
00618
00619 ILOG_DEBUG("received " << nrRecv << " bytes");
00620 if (nrRecv > nrToRecv)
00621 {
00622 HandleError("received MORE bytes than expected!", true);
00623 return -1;
00624 }
00625 return nrRecv;
00626 }
00627
00628 int
00629 HandleRequest(char* buf, int len, int bufSize)
00630 {
00631 if (mListener)
00632 return mListener->HandleRequest(buf, len, bufSize);
00633 else
00634 return (mFpRequest)(buf, len, bufSize);
00635 }
00636
00637 void
00638 HandleIdle()
00639 {
00640 ILOG_DEBUG("Calling idle function");
00641
00642 if (mListener)
00643 mListener->HandleIdle();
00644 else
00645 (mFpIdle)();
00646 }
00647
00648 void
00649 HandleInfo(CString msg)
00650 {
00651 if (mListener)
00652 mListener->HandleInfo(msg);
00653 else if (mFpInfo)
00654 (mFpInfo)(msg);
00655 else
00656 ILOG_DEBUG(msg);
00657 }
00658
00659 void
00660 HandleError(CString msg, bool isSockError)
00661 {
00662 mNrError++;
00663 String err;
00664
00665 #ifdef unix
00666
00667 err = String(strerror(errno));
00668
00669 #else
00670
00671
00672
00673 switch (WSAGetLastError())
00674 {
00675 case WSANOTINITIALISED :
00676 err = "Unable to initialise socket.";
00677 break;
00678 case WSAEAFNOSUPPORT :
00679 err = "The specified address family is not supported.";
00680 break;
00681 case WSAEADDRNOTAVAIL :
00682 err = "Specified address is not available from the local machine.";
00683 break;
00684 case WSAECONNREFUSED :
00685 err = "The attempt to connect was forcefully rejected.";
00686 break;
00687 case WSAEDESTADDRREQ :
00688 err = "Address destination address is required.";
00689 break;
00690 case WSAEFAULT :
00691 err = "The namelen argument is incorrect.";
00692 break;
00693 case WSAEINVAL :
00694 err = "The socket is not already bound to an address.";
00695 break;
00696 case WSAEISCONN :
00697 err = "The socket is already connected.";
00698 break;
00699 case WSAEADDRINUSE :
00700 err = "The specified address is already in use.";
00701 break;
00702 case WSAEMFILE :
00703 err = "No more file descriptors are available.";
00704 break;
00705 case WSAENOBUFS :
00706 err = "No buffer space available. The socket cannot be created.";
00707 break;
00708 case WSAEPROTONOSUPPORT :
00709 err = "The specified protocol is not supported.";
00710 break;
00711 case WSAEPROTOTYPE :
00712 err = "The specified protocol is the wrong type for this socket.";
00713 break;
00714 case WSAENETUNREACH :
00715 err = "The network can't be reached from this host at this time.";
00716 break;
00717 case WSAENOTSOCK :
00718 err = "The descriptor is not a socket.";
00719 break;
00720 case WSAETIMEDOUT :
00721 err = "Attempt timed out without establishing a connection.";
00722 break;
00723 case WSAESOCKTNOSUPPORT :
00724 err = "Socket type is not supported in this address family.";
00725 break;
00726 case WSAENETDOWN :
00727 err = "Network subsystem failure.";
00728 break;
00729 case WSAHOST_NOT_FOUND :
00730 err = "Authoritative Answer Host not found.";
00731 break;
00732 case WSATRY_AGAIN :
00733 err = "Non-Authoritative Host not found or SERVERFAIL.";
00734 break;
00735 case WSANO_RECOVERY :
00736 err = "Non recoverable errors, FORMERR, REFUSED, NOTIMP.";
00737 break;
00738 case WSANO_DATA :
00739 err = "Valid name, no data record of requested type.";
00740 break;
00741 case WSAEINPROGRESS :
00742 err = "Address blocking Windows Sockets operation is in progress.";
00743 break;
00744 case WSAEINTR :
00745 err = "The (blocking) call was canceled via WSACancelBlockingCall.";
00746 break;
00747 default :
00748 err = "Unknown error.";
00749 break;
00750 }
00751
00752 #endif
00753
00754 String finalMsg = msg;
00755 if (isSockError)
00756 finalMsg += ": " + err;
00757
00758 if (mListener)
00759 mListener->HandleError(finalMsg);
00760 else if (mFpError)
00761 (mFpError)(finalMsg);
00762 else
00763 ILOG_ERROR(finalMsg);
00764 }
00765
00766
00767
00768
00769
00770
00771
00772
00773 void
00774 HandleDisconnect(int port)
00775 {
00776 if (mListener)
00777 mListener->HandleDisconnect(port);
00778 }
00779
00780 bool mIsServer;
00781 String mServerName;
00782 int mPort;
00783 int mNrPorts;
00784 std::vector<SOCKET> mSock;
00785 std::vector<bool> mSockConnected;
00786 std::vector<String> mConnection;
00787 std::vector<bool> mSockEvent;
00788 std::vector<SOCKET> mMsgSock;
00789 std::vector<bool> mMsgSockEvent;
00790 char* mBuffer;
00791 int mNrError;
00792 int mCurRequestSock;
00793 FP_INFO mFpInfo;
00794 FP_ERROR mFpError;
00795 FP_REQUEST mFpRequest;
00796 FP_IDLE mFpIdle;
00797 bool mCallbackOnIdle;
00798 ChannelListener* mListener;
00799
00800 ILOG_VAR_DECL;
00801
00802 };
00803
00804 ILOG_VAR_INIT(Channel, Impala.Util);
00805
00806 }
00807 }
00808
00809 #endif