Home || Architecture || Video Search || Visual Search || Scripts || Applications || Important Messages || OGL || Src

Channel.h

Go to the documentation of this file.
00001 #ifndef Impala_Util_Channel_h
00002 #define Impala_Util_Channel_h
00003 
00004 #ifdef unix
00005 #include <sys/types.h>
00006 #include <sys/socket.h>
00007 #include <netinet/in.h>
00008 #include <netdb.h>
00009 #include <stdio.h>
00010 #include <unistd.h>
00011 #include <sys/ioctl.h>
00012 #include <sys/signal.h>
00013 #include <errno.h>
00014 #ifdef sun
00015 #include <signal.h>
00016 #include <sys/filio.h>
00017 #endif
00018 #else
00019 #include <winsock2.h>
00020 #endif
00021 
00022 #include "Basis/String.h"
00023 #include <iostream>
00024 #include <vector>
00025 #include <cstring>
00026 
00027 #include "Basis/ILog.h"
00028 
00029 namespace Impala
00030 {
00031 namespace Util
00032 {
00033 
00035 class ChannelListener
00036 {
00037 public:
00038 
00039     virtual void HandleInfo(CString msg) = 0;
00040     virtual void HandleError(CString msg) = 0;
00041     virtual void HandleIdle() = 0;
00042     virtual int  HandleRequest(char* buf, int len, int bufSize) = 0;
00043     //virtual void HandleConnect(CString conn, int port) = 0;
00044     virtual void HandleDisconnect(int port) = 0;
00045 };
00046 
00047 class Channel
00048 {
00049 public: 
00050 
00052     Channel(int port, int nrPorts, ChannelListener* listener = 0)
00053     {
00054         mNrError = 0;
00055         mPort = port;
00056         mNrPorts = nrPorts;
00057         mIsServer = true;
00058         mFpInfo = 0;
00059         mFpError = 0;
00060         mListener = listener;
00061         mCallbackOnIdle = false;
00062 
00063         InitSocket();
00064 
00065         mBuffer = new char[DATA_BUFFER_SIZE + CTR_HEADER_SIZE];
00066         if (mBuffer == 0)
00067             HandleError("buffer allocation failed", false);
00068     }
00069 
00071     Channel(String serverName, int port)
00072     {
00073         mNrError = 0;
00074         mServerName = serverName;
00075         mPort = port;
00076         mNrPorts = 1;
00077         mIsServer = false;
00078         mFpInfo = 0;
00079         mFpError = 0;
00080         mListener = 0;
00081         mCallbackOnIdle = false;
00082 
00083         InitSocket();
00084 
00085         mBuffer = new char[DATA_BUFFER_SIZE + CTR_HEADER_SIZE];
00086         if (mBuffer == 0)
00087             HandleError("buffer allocation failed", false);
00088     }
00089 
00091     ~Channel()
00092     {
00093         Close();
00094         if (mBuffer)
00095             delete mBuffer;
00096     }
00097 
00099     int
00100     GetNrPorts() const
00101     {
00102         return mNrPorts;
00103     }
00104 
00106     int
00107     GetPortNumber(int idx) const
00108     {
00109         return mPort + idx;
00110     }
00111 
00113     String
00114     GetCurRequestConnection() const
00115     {
00116         return mConnection[mCurRequestSock];
00117     }
00118 
00120     int
00121     GetCurRequestPort() const
00122     {
00123         return mPort + mCurRequestSock;
00124     }
00125 
00127     String
00128     GetServerInfo()
00129     {
00130         return mServerName + ":" + MakeString(mPort);
00131     }
00132 
00134     bool
00135     Valid()
00136     {
00137         return mNrError == 0;
00138     }
00139 
00141     void
00142     SetListener(ChannelListener* listener)
00143     {
00144         mListener = listener;
00145     }
00146 
00150     typedef void (*FP_INFO)(CString msg);
00151 
00155     typedef void (*FP_ERROR)(CString msg);
00156 
00160     void
00161     SetMessageFunc(FP_INFO fpInfo, FP_ERROR fpError)
00162     {
00163         mFpInfo = fpInfo;
00164         mFpError = fpError;
00165     }
00166 
00168     static const int DATA_BUFFER_SIZE = 10485760; // 10 Mb
00169 
00173     char*
00174     Buffer()
00175     {
00176         return mBuffer + CTR_HEADER_SIZE;
00177     }
00178 
00185     typedef int (*FP_REQUEST)(char* buf, int len, int bufSize);
00186 
00190     typedef void (*FP_IDLE)();
00191 
00197     void
00198     Serve(FP_REQUEST fpRequest, FP_IDLE fpIdle = 0)
00199     {
00200         mFpRequest = fpRequest;
00201         mFpIdle = fpIdle;
00202         Serve(fpIdle != 0);
00203     }
00204 
00211     void
00212     Serve(bool callbackOnIdle = false)
00213     {
00214         if (!mListener && !mFpRequest)
00215         {
00216             String msg = "Either listener or function pointer(s) must be specified";
00217             ILOG_ERROR(msg);
00218             return;
00219         }
00220 
00221         mCallbackOnIdle = callbackOnIdle;
00222 
00223         // Start accepting connections.
00224         for (int i=0 ; i<mSock.size() ; i++)
00225             listen(mSock[i], 100);
00226         String msg = "Listening at port(s) " + MakeString(mPort);
00227         if (mNrPorts > 1)
00228             msg += "-" + MakeString(mPort + mNrPorts - 1);
00229         HandleInfo(msg);
00230         do
00231         {
00232             int res = WaitForSocketEvent();
00233             if (res == -1)
00234                 continue;
00235             for (int s=0 ; s<mSockEvent.size() ; s++)
00236             {
00237                 if (mSockEvent[s])
00238                 {
00239                     ILOG_DEBUG("Accepting connection at port " << mPort+s);
00240                     // org, no address info
00241                     //mMsgSock[s] = accept(mSock[s], 0, 0);
00242 
00243                     // new, obtain address
00244                     struct sockaddr_in address;
00245                     int addressLen = sizeof(address);
00246                     mMsgSock[s] = accept(mSock[s], (sockaddr *) &address,
00247                                          (socklen_t*) &addressLen);
00248                     //ILOG_DEBUG("accept on port " << ntohs(address.sin_port));
00249 
00250                     if (mMsgSock[s] == INVALID_SOCKET)
00251                     {
00252                         HandleError("Serve: accept", true);
00253                     }
00254                     else
00255                     {
00256                         mSockConnected[s] = true;
00257                         in_addr_t a = address.sin_addr.s_addr;
00258                         int a1 = (a & 0xff000000) >> 24;
00259                         int a2 = (a & 0x00ff0000) >> 16;
00260                         int a3 = (a & 0x0000ff00) >> 8;
00261                         int a4 = (a & 0x000000ff);
00262                         std::ostringstream oss;
00263                         oss << a4 << "." << a3 << "." << a2 << "." << a1;
00264                         mConnection[s] = oss.str();
00265                         HandleInfo("Established connection with " +
00266                                    mConnection[s] + " at port " +
00267                                    MakeString(mPort + s));
00268                     }
00269                 }
00270                 if (mMsgSockEvent[s])
00271                 {
00272                     ILOG_DEBUG("Receiving at port " << mPort+s);
00273                     mCurRequestSock = s;
00274                     int rval = RecvLarge(mMsgSock[s]);
00275                     if (rval > 0)
00276                     {
00277                         int nrToSend = HandleRequest(Buffer(),
00278                                                    rval - CTR_HEADER_SIZE,
00279                                                    DATA_BUFFER_SIZE);
00280                         rval = SendLarge(mMsgSock[s], nrToSend);
00281                     }
00282                     if (rval <= 0)
00283                     {
00284                         HandleInfo("Terminating connection with " +
00285                                    GetCurRequestConnection() + " at port " +
00286                                    MakeString(GetCurRequestPort()));
00287                         CloseSocket(mMsgSock[s]);
00288                         HandleDisconnect(GetCurRequestPort());
00289                         mSockConnected[s] = false;
00290                         mConnection[s] = "not connected";
00291                     }
00292                 }
00293             }
00294         }
00295         while(1);
00296     }
00297 
00301     int
00302     SendRequest(int len)
00303     {
00304         int rval;
00305         rval = SendLarge(mSock[0], len);
00306         rval = RecvLarge(mSock[0]);
00307         if ((rval > 5) && (strncmp(Buffer(), "ERROR", 5) == 0))
00308         {
00309             HandleError("SendRequest to " + mServerName + " got " +
00310                         String(Buffer()), false);
00311             mNrError--; // this was not a Channel error so stay Valid()
00312         }
00313         return rval - CTR_HEADER_SIZE;
00314     }
00315 
00317     bool
00318     LastSendHadError()
00319     {
00320         return strncmp(Buffer(), "ERROR", 5) == 0;
00321     }
00322 
00326     void
00327     MaximizeSendRecvBuffer()
00328     {
00329         int nOptVal;
00330         socklen_t nOptLen = sizeof(int);
00331         for (int i=0 ; i<mSock.size() ; i++)
00332         {
00333             int nRet;
00334             nRet = getsockopt(mSock[i], SOL_SOCKET, SO_SNDBUF, (char*)&nOptVal,
00335                               &nOptLen);
00336             if (nRet == SOCKET_ERROR)
00337                 HandleError("MaximizeSendRecvBuffer", true);
00338             else
00339                 HandleInfo("SNDBUF : " + MakeString(nOptVal));
00340             nRet = getsockopt(mSock[i], SOL_SOCKET, SO_RCVBUF, (char*)&nOptVal,
00341                               &nOptLen);
00342             if (nRet == SOCKET_ERROR)
00343                 HandleError("MaximizeSendRecvBuffer", true);
00344             else
00345                 HandleInfo("RCVBUF : " + MakeString(nOptVal));
00346         }
00347     }
00348 
00350     void
00351     Close()
00352     {
00353         for (int i=0 ; i<mSock.size() ; i++)
00354         {
00355             String conn = mServerName;
00356             int port = mPort + i;
00357             HandleInfo("Closing connection to " + conn + ":" + MakeString(port));
00358             CloseSocket(mSock[i]);
00359             HandleDisconnect(port);
00360         }
00361 
00362 #ifndef unix
00363         if (mIsServer)
00364             WSACleanup();
00365 #endif
00366     }
00367 
00368 private:
00369 
00370     static const int CTR_HEADER_SIZE = 20;
00371 
00372 #ifdef unix
00373     static const int INVALID_SOCKET = -1;
00374     static const int SOCKET_ERROR = -1;
00375     typedef int SOCKET;
00376 #else
00377     typedef int socklen_t;
00378     typedef u_long in_addr_t;
00379 #endif
00380 
00381     void
00382     InitSocket()
00383     {
00384 #ifdef unix
00385         signal(SIGPIPE, SIG_IGN); // block broken pipe signal
00386 #else
00387         WORD wVersionRequested = MAKEWORD(1,1);
00388         WSADATA wsaData;
00389 
00390         // Initialize WinSock and check version
00391         if (WSAStartup(wVersionRequested, &wsaData) != 0)
00392             HandleError("WSAStartup", true);
00393 
00394         // Check socket DLL supports 1.1 or higher
00395         double socklib_ver = HIBYTE(wsaData.wVersion) / 10.0;
00396         socklib_ver += LOBYTE(wsaData.wVersion);
00397         if (socklib_ver < 1.1)
00398             HandleError("socket library must support 1.1 or greater", true);
00399 #endif
00400 
00401         for (int i=0 ; i<mNrPorts ; i++)
00402         {
00403             struct sockaddr_in address;
00404             address.sin_family = AF_INET;
00405             address.sin_port = htons(mPort + i);
00406             struct hostent* hp;
00407             if (mIsServer)
00408             {
00409                 address.sin_addr.s_addr = htonl(INADDR_ANY);
00410             }
00411             else
00412             {
00413                 // gethostbyname returns a structure including the network
00414                 // address of the specified host.
00415                 if ((hp = gethostbyname(mServerName.c_str())) == NULL)
00416                     HandleError("InitSocket: gethostbyname", true);
00417                 // initialise the address structure with the resolved IP address
00418                 memcpy((char *) &address.sin_addr, (char *) hp->h_addr,
00419                        hp->h_length);
00420             }
00421 
00422             SOCKET sock;
00423             // Create socket.
00424             if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
00425                 HandleError("InitSocket: open stream socket", true);
00426             if (mIsServer)
00427             {
00428                 if (bind(sock, (struct sockaddr *) &address,
00429                          sizeof(address)) == SOCKET_ERROR)
00430                 {
00431                     HandleError("InitSocket: bind socket", true);
00432                 }
00433             }
00434             else
00435             {
00436                 if (connect(sock, (struct sockaddr *) &address,
00437                             sizeof(address)) == SOCKET_ERROR)
00438                 {
00439                     HandleError("InitSocket: connect socket", true);
00440                 }
00441             }
00442             mSock.push_back(sock);
00443             mSockConnected.push_back(false);
00444             mConnection.push_back("not connected");
00445         }
00446         mSockEvent.resize(mNrPorts);
00447         mMsgSock.resize(mNrPorts);
00448         mMsgSockEvent.resize(mNrPorts);
00449     }
00450 
00451     void
00452     CloseSocket(SOCKET sock)
00453     {
00454 #ifdef unix
00455         close(sock);
00456 #else
00457         closesocket(sock);
00458 #endif
00459     }
00460 
00463     int
00464     WaitForSocketEvent()
00465     {
00466         ILOG_DEBUG("Waiting for socket event");
00467         int nRet;
00468         int i = 0;
00469         do
00470         {
00471             // Wait for readable state. After accept() this indicates that
00472             // data has been received, so recv() will succeed.
00473             fd_set readSet;
00474             FD_ZERO(&readSet);
00475             for (int s=0 ; s<mSock.size() ; s++)
00476             {
00477                 if (mSockConnected[s])
00478                 {
00479                     FD_SET(mMsgSock[s], &readSet);
00480                     ILOG_DEBUG("  checking on " << mPort+s);
00481                 }
00482             }
00483             // Wait for writable state. After listen() this indicates that
00484             // a connection is pending, so accept() will succeed.
00485             // Oops, on linux this doesn't work, so use readSet.
00486             //fd_set writeSet;
00487             //FD_ZERO(&writeSet);
00488             for (int s=0 ; s<mSock.size() ; s++)
00489             {
00490                 if (!mSockConnected[s])
00491                 {
00492                     //FD_SET(mSock[s], &writeSet);
00493                     FD_SET(mSock[s], &readSet);
00494                     ILOG_DEBUG("  checking on " << mPort+s);
00495                 }
00496             }
00497             if (mCallbackOnIdle)
00498             {
00499                 struct timeval timeOut;
00500                 timeOut.tv_sec = 0;
00501                 timeOut.tv_usec = 100000 * i;
00502                 if (i < 9)
00503                     i++;
00504                 //nRet = select(FD_SETSIZE, &readSet, &writeSet, 0, &timeOut);
00505                 nRet = select(FD_SETSIZE, &readSet, 0, 0, &timeOut);
00506             }
00507             else
00508             {
00509                 //nRet = select(FD_SETSIZE, &readSet, &writeSet, 0, 0);
00510                 nRet = select(FD_SETSIZE, &readSet, 0, 0, 0);
00511             }
00512             if (nRet == SOCKET_ERROR)
00513             {
00514                 HandleError("WaitForSocketEvent: select", true);
00515                 return -1;
00516             }
00517             if (nRet == 0)
00518             {
00519                 if (mCallbackOnIdle)
00520                     HandleIdle();
00521             }
00522             else
00523             {
00524                 ILOG_DEBUG(nRet << " socket event(s)");
00525                 // find readable/writable socket, there may be more than one
00526                 for (int s=0 ; s<mSock.size() ; s++)
00527                 {
00528                     mSockEvent[s] = false;
00529                     mMsgSockEvent[s] = false;
00530                     if (!mSockConnected[s])
00531                     {
00532                         //if (FD_ISSET(mSock[s], &writeSet))
00533                         if (FD_ISSET(mSock[s], &readSet))
00534                             mSockEvent[s] = true;
00535                     }
00536                     else
00537                     {
00538                         if (FD_ISSET(mMsgSock[s], &readSet))
00539                             mMsgSockEvent[s] = true;
00540                     }
00541                 }
00542             }
00543         }
00544         while (nRet == 0);
00545         return nRet;
00546     }
00547 
00548     int
00549     SendLarge(SOCKET sock, int nrData)
00550     {
00551         int rval;
00552         ILOG_DEBUG("sending " << nrData << " data bytes");
00553 
00554         // assemble CTR header to tell receiver the number of bytes to expect
00555         int nrToSend = nrData + CTR_HEADER_SIZE;
00556         sprintf(mBuffer, "CTR %d\0", nrToSend);
00557         ILOG_DEBUG("send : CTR buf [" << mBuffer << "]");
00558 
00559         // now send the buffer
00560         int nrSend = 0;
00561         char* bufPtr = mBuffer;
00562         do
00563         {
00564             rval = send(sock, bufPtr, nrToSend - nrSend, 0);
00565             if (rval == SOCKET_ERROR)
00566             {
00567                 HandleError("SendLarge", true);
00568                 return -1;
00569             }
00570             ILOG_DEBUG("sent " << rval << " bytes");
00571             bufPtr += rval;
00572             nrSend += rval;
00573         }
00574         while (nrSend < nrToSend);
00575         if (nrSend != nrToSend)
00576         {
00577             HandleError("sent incorrect number of bytes!", true);
00578             return -1;
00579         }
00580         return nrSend;
00581     }
00582 
00583     int
00584     RecvLarge(SOCKET sock)
00585     {
00586         int rval;
00587 
00588         // first recv CTR message to find out the number of bytes to expect
00589         ILOG_DEBUG("receiving...");
00590         int nrToRecv = 0;
00591         int nrRecv = 0;
00592         char* bufPtr = mBuffer;
00593         int bufSize = DATA_BUFFER_SIZE + CTR_HEADER_SIZE;
00594         do
00595         {
00596             rval = recv(sock, bufPtr, bufSize-nrRecv, 0);
00597             if (rval == SOCKET_ERROR)
00598             {
00599                 HandleError("RecvLarge", true);
00600                 return -1;
00601             }
00602             if (rval == 0)
00603             {
00604                 ILOG_DEBUG("recv got a 0 bytes message");
00605                 return 0;
00606             }
00607             if (nrToRecv == 0)
00608             { // read CTR header for number of bytes to receive
00609                 sscanf(bufPtr, "CTR %d", &nrToRecv);
00610                 ILOG_DEBUG("recv expects " << nrToRecv << " bytes"
00611                              );
00612             }
00613             ILOG_DEBUG("    recv " << rval << " bytes");
00614             bufPtr += rval;
00615             nrRecv += rval;
00616         }
00617         while (nrRecv < nrToRecv);
00618 
00619         ILOG_DEBUG("received " << nrRecv << " bytes");
00620         if (nrRecv > nrToRecv)
00621         {
00622             HandleError("received MORE bytes than expected!", true);
00623             return -1;
00624         }
00625         return nrRecv;
00626     }
00627 
00628     int 
00629     HandleRequest(char* buf, int len, int bufSize)
00630     {
00631         if (mListener)
00632             return mListener->HandleRequest(buf, len, bufSize);
00633         else
00634             return (mFpRequest)(buf, len, bufSize);
00635     }
00636 
00637     void
00638     HandleIdle()
00639     {
00640         ILOG_DEBUG("Calling idle function");
00641 
00642         if (mListener)
00643             mListener->HandleIdle();
00644         else
00645             (mFpIdle)();
00646     }
00647 
00648     void
00649     HandleInfo(CString msg)
00650     {
00651         if (mListener)
00652             mListener->HandleInfo(msg);
00653         else if (mFpInfo)
00654             (mFpInfo)(msg);
00655         else
00656             ILOG_DEBUG(msg);
00657     }
00658 
00659     void
00660     HandleError(CString msg, bool isSockError)
00661     {
00662         mNrError++;
00663         String err;
00664 
00665 #ifdef unix
00666 
00667         err = String(strerror(errno));
00668 
00669 #else
00670 
00671         // Errors are handled by calling the WSAGetLastError routine which
00672         // will return the last error as one of the following.
00673         switch (WSAGetLastError())
00674         {
00675         case WSANOTINITIALISED :
00676             err = "Unable to initialise socket.";
00677             break;
00678         case WSAEAFNOSUPPORT :
00679             err = "The specified address family is not supported.";
00680             break;
00681         case WSAEADDRNOTAVAIL :
00682             err = "Specified address is not available from the local machine.";
00683             break;
00684         case WSAECONNREFUSED :
00685             err = "The attempt to connect was forcefully rejected.";
00686             break;
00687         case WSAEDESTADDRREQ :
00688             err = "Address destination address is required.";
00689             break;
00690         case WSAEFAULT :
00691             err = "The namelen argument is incorrect.";
00692             break;
00693         case WSAEINVAL :
00694             err = "The socket is not already bound to an address.";
00695             break;
00696         case WSAEISCONN :
00697             err = "The socket is already connected.";
00698             break;
00699         case WSAEADDRINUSE :
00700             err = "The specified address is already in use.";
00701             break;
00702         case WSAEMFILE :
00703             err = "No more file descriptors are available.";
00704             break;
00705         case WSAENOBUFS :
00706             err = "No buffer space available. The socket cannot be created.";
00707             break;
00708         case WSAEPROTONOSUPPORT :
00709             err = "The specified protocol is not supported.";
00710             break;
00711         case WSAEPROTOTYPE :
00712             err = "The specified protocol is the wrong type for this socket.";
00713             break;
00714         case WSAENETUNREACH :
00715             err = "The network can't be reached from this host at this time.";
00716             break;
00717         case WSAENOTSOCK :
00718             err = "The descriptor is not a socket.";
00719             break;
00720         case WSAETIMEDOUT :
00721             err = "Attempt timed out without establishing a connection.";
00722             break;
00723         case WSAESOCKTNOSUPPORT :
00724             err = "Socket type is not supported in this address family.";
00725             break;
00726         case WSAENETDOWN :
00727             err = "Network subsystem failure.";
00728             break;
00729         case WSAHOST_NOT_FOUND :
00730             err = "Authoritative Answer Host not found.";
00731             break;
00732         case WSATRY_AGAIN :
00733             err = "Non-Authoritative Host not found or SERVERFAIL.";
00734             break;
00735         case WSANO_RECOVERY :
00736             err = "Non recoverable errors, FORMERR, REFUSED, NOTIMP.";
00737             break;
00738         case WSANO_DATA :
00739             err = "Valid name, no data record of requested type.";
00740             break;
00741         case WSAEINPROGRESS :
00742             err = "Address blocking Windows Sockets operation is in progress.";
00743             break;
00744         case WSAEINTR :
00745             err = "The (blocking) call was canceled via WSACancelBlockingCall.";
00746             break;
00747         default :
00748             err = "Unknown error.";
00749             break;
00750         }
00751 
00752 #endif
00753 
00754         String finalMsg = msg;
00755         if (isSockError)
00756             finalMsg += ": " + err;
00757 
00758         if (mListener)
00759             mListener->HandleError(finalMsg);
00760         else if (mFpError)
00761             (mFpError)(finalMsg);
00762         else
00763             ILOG_ERROR(finalMsg);
00764     }
00765 
00766     //void
00767     //HandleConnect(CString conn, int port)
00768     //{
00769     //    if (mListener)
00770     //        mListener->HandleConnect(conn, port);
00771     //}
00772 
00773     void
00774     HandleDisconnect(int port)
00775     {
00776         if (mListener)
00777             mListener->HandleDisconnect(port);
00778     }
00779 
00780     bool                mIsServer;
00781     String              mServerName;
00782     int                 mPort;
00783     int                 mNrPorts;
00784     std::vector<SOCKET> mSock; // connection sockets
00785     std::vector<bool>   mSockConnected;
00786     std::vector<String> mConnection;
00787     std::vector<bool>   mSockEvent;
00788     std::vector<SOCKET> mMsgSock; // sockets for sending messages
00789     std::vector<bool>   mMsgSockEvent;
00790     char*               mBuffer;
00791     int                 mNrError;
00792     int                 mCurRequestSock;
00793     FP_INFO             mFpInfo;
00794     FP_ERROR            mFpError;
00795     FP_REQUEST          mFpRequest;
00796     FP_IDLE             mFpIdle;
00797     bool                mCallbackOnIdle;
00798     ChannelListener*    mListener;
00799 
00800     ILOG_VAR_DECL;
00801 
00802 }; // class
00803 
00804 ILOG_VAR_INIT(Channel, Impala.Util);
00805 
00806 } // namespace Util
00807 } // namespace Impala
00808 
00809 #endif

Generated on Fri Mar 19 09:31:46 2010 for ImpalaSrc by  doxygen 1.5.1