Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

net.C

Go to the documentation of this file.
00001 /* Copyright 1995, Brown Computer Graphics Group.  All Rights Reserved. */
00002 
00003 /* -------------------------------------------------------------------------
00004  *
00005  *                <     File description here    >
00006  *
00007  * ------------------------------------------------------------------------- */
00008 
00009 #include "std/config.H"
00010 
00011 /* ANSI includes */
00012 #ifdef macosx
00013 #include <sys/ioctl.h>
00014 #endif
00015 #include <cstdlib>
00016 #include <cstdio>
00017 #include <cstring>
00018 #include <cassert>
00019 #include <cctype>
00020 #include <cerrno>
00021 
00022 #include "std/fstream.H"
00023 
00024 #ifdef WIN32
00025 #define signal(x,y)
00026 #else
00027 #include <csignal>
00028 #endif
00029 
00030 #include "std/support.H"
00031 #include "std/time.H"
00032 #include "net.H"
00033 #include "pack.H"
00034 
00035 /* Includes for open()*/
00036 #include <sys/stat.h>
00037 #include <fcntl.h>
00038 
00039 
00040 /* Includes for ioctl (for num_bytes_to_read()) */
00041 #if defined(linux) || defined(_AIX)
00042 #include <sys/ioctl.h>
00043 #elif !defined(WIN32)
00044 #include <sys/filio.h>
00045 #else
00046 /* #include "net/net.H" */
00047 #endif
00048 
00049 
00050 /* include for TCP_NODELAY*/
00051 #ifndef WIN32
00052 #include <netinet/tcp.h>
00053 #endif
00054 
00055 struct sockaddr_in;
00056 struct hostent;
00057 
00058 #ifdef sun
00059 extern "C" int gethostname(char *, int);
00060 #endif
00061 
00062 // AIX, Linux, and SunOS 5.7 have socklen_t, others don't
00063 #if !defined(_AIX) && !defined(_SOCKLEN_T) && !defined(linux)
00064 typedef int socklen_t;
00065 #endif
00066 
00067 #ifdef WIN32
00068 
00069 //XXX - This stomped the ability to
00070 //reference StreamFlags::write, read, etc.
00071 //#define write write_win32
00072 //#define read read_win32
00073 
00074 ssize_t
00075 write_win32(int fildes, const void *buf, size_t nbyte)
00076 {
00077    DWORD val=0;
00078    if (GetFileType((HANDLE)fildes) == FILE_TYPE_DISK) 
00079    {
00080       if (!WriteFile((HANDLE)fildes, buf, nbyte, &val, NULL))
00081       {
00082          //cerr << "write_win32: error " << GetLastError() << endl;
00083 
00084          LPVOID lpMsgBuf;
00085          FormatMessage( 
00086              FORMAT_MESSAGE_ALLOCATE_BUFFER | 
00087              FORMAT_MESSAGE_FROM_SYSTEM | 
00088              FORMAT_MESSAGE_IGNORE_INSERTS,
00089              NULL,
00090              GetLastError(),
00091              MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
00092              (LPTSTR) &lpMsgBuf,
00093              0,
00094              NULL 
00095          );
00096 
00097          cerr << "write_win32() - Error! Message: " << (LPCTSTR)lpMsgBuf << "\n";
00098          // Free the buffer.
00099          LocalFree( lpMsgBuf );
00100       }
00101    } 
00102    else 
00103    {
00104       OVERLAPPED overlap;
00105       overlap.hEvent = (HANDLE)NULL;
00106       if (!WriteFile((HANDLE)fildes, buf, nbyte, &val, &overlap))
00107       {
00108          if (!GetOverlappedResult((HANDLE)fildes, &overlap, &val, TRUE))
00109          {
00110             LPVOID lpMsgBuf;
00111             FormatMessage( 
00112                 FORMAT_MESSAGE_ALLOCATE_BUFFER | 
00113                 FORMAT_MESSAGE_FROM_SYSTEM | 
00114                 FORMAT_MESSAGE_IGNORE_INSERTS,
00115                 NULL,
00116                 GetLastError(),
00117                 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
00118                 (LPTSTR) &lpMsgBuf,
00119                 0,
00120                 NULL 
00121             );
00122 
00123             cerr << "write_win32() - Error! Message: " << (LPCTSTR)lpMsgBuf << "\n";
00124             // Free the buffer.
00125             LocalFree( lpMsgBuf );
00126 
00127          }
00128       }
00129    }
00130    return val;
00131 }
00132 
00133 ssize_t
00134 read_win32(int fildes, void *buf, size_t nbyte)
00135 {
00136    DWORD val=0;
00137 
00138    DWORD filetype = GetFileType((HANDLE) fildes);
00139 
00140    if (fildes == fileno(stdin))
00141    {
00142       ReadConsole(GetStdHandle(STD_INPUT_HANDLE), buf, nbyte, &val, NULL);
00143    }
00144    else if (filetype == FILE_TYPE_DISK) 
00145    {
00146       ReadFile((HANDLE)fildes, buf, nbyte, &val, NULL);
00147    } 
00148    else if (filetype == FILE_TYPE_CHAR) 
00149    {
00150       ReadConsole(GetStdHandle(STD_INPUT_HANDLE), buf, nbyte, &val, NULL);
00151    } 
00152    else 
00153    {
00154       OVERLAPPED overlap;
00155       overlap.hEvent = (HANDLE)NULL;
00156       if (ReadFile((HANDLE) fildes, buf, nbyte, &val, &overlap)==FALSE) 
00157       {
00158          if (!GetOverlappedResult((HANDLE) fildes, &overlap, &val, TRUE)) 
00159          {
00160             const DWORD error = GetLastError();
00161             cerr << "read_win32, error is: " << error << " - "
00162                << " - " << fildes << endl;
00163          }
00164       }
00165    }
00166    return val;
00167 }
00168 #endif
00169 
00170 int
00171 num_bytes_to_read(int fildes)
00172 {
00173 #ifdef WIN32
00174    // ioctlsocket() is a Win32 ioctl() replacement that only works for
00175    // sockets
00176    unsigned long winnum;
00177    int retval = ioctlsocket(fildes, FIONREAD, &winnum);
00178    if (retval) 
00179    {
00180       const int error = WSAGetLastError();
00181       if (error == WSAENOTSOCK) 
00182       {
00183          HANDLE hndl = (HANDLE) _get_osfhandle(fildes);
00184          DWORD filetype = GetFileType(hndl);
00185          if (filetype == FILE_TYPE_CHAR) 
00186          {
00187             DWORD numevents;
00188             if (GetNumberOfConsoleInputEvents(hndl, &numevents)) 
00189             {
00190                INPUT_RECORD *irec = new INPUT_RECORD[numevents];
00191                DWORD numread;
00192                PeekConsoleInput(hndl, irec, numevents, &numread);
00193                winnum = 0;
00194                static bool PRINT_ERRS = Config::get_var_bool("PRINT_ERRS",false,true);
00195                if (PRINT_ERRS) cerr << "num_bytes_to_read - # Events=" << numevents << "\n";
00196                for (int i = 0; i < (int)numread; i++) 
00197                {
00198                   if (PRINT_ERRS)
00199                   {
00200                      if      (irec[i].EventType == KEY_EVENT)
00201                      {
00202                         cerr << "num_bytes_to_read - KEY_EVENT\n";
00203                         cerr << "                       " << 
00204                            "bKeyDown="; 
00205                            if (irec[i].Event.KeyEvent.bKeyDown) cerr << "DOWN\n";
00206                            else cerr << "UP\n";
00207                         cerr << "                       " << 
00208                         "wRepeatCount=" << 
00209                         (irec[i].Event.KeyEvent.wRepeatCount) << 
00210                         "\n";
00211                         cerr << "                       " << 
00212                            "wVirtualKeyCode=" << 
00213                            (irec[i].Event.KeyEvent.wVirtualKeyCode) << 
00214                            "\n";
00215                         cerr << "                       " << 
00216                            "wVirtualScanCode=" << 
00217                            (irec[i].Event.KeyEvent.wVirtualScanCode) << 
00218                            "\n";
00219                         cerr << "                       " << 
00220                            "uChar=" << 
00221                            (irec[i].Event.KeyEvent.uChar.AsciiChar) << 
00222                            "\n";
00223                         cerr << "                       " << 
00224                            "(int)uChar=" << 
00225                            int((irec[i].Event.KeyEvent.uChar.AsciiChar)) << 
00226                            "\n";
00227                         cerr << "                       " << 
00228                            "dwControlKeyState=" << 
00229                            (irec[i].Event.KeyEvent.dwControlKeyState) << 
00230                            "\n";
00231                      }
00232                      else if (irec[i].EventType == MOUSE_EVENT)               
00233                         cerr << "num_bytes_to_read - MOUSE_EVENT\n";
00234                      else if (irec[i].EventType == WINDOW_BUFFER_SIZE_EVENT) 
00235                         cerr << "num_bytes_to_read - WINDOW_BUFFER_SIZE_EVENT\n";
00236                      else if (irec[i].EventType == MENU_EVENT)                
00237                         cerr << "num_bytes_to_read - MENU_EVENT\n";
00238                      else if (irec[i].EventType == FOCUS_EVENT)               
00239                         cerr << "num_bytes_to_read - FOCUS_EVENT\n";
00240                      else                                                     
00241                         cerr << "num_bytes_to_read - Unknown event!!!!\n";
00242                   }
00243                   if (irec[i].EventType == KEY_EVENT &&
00244                       //Catch the down events
00245                       irec[i].Event.KeyEvent.bKeyDown &&
00246                       //Ignore keys that don't make 
00247                       //chars on the console stream
00248                       //So far, we trap 0 and 27 which
00249                       //traps modifiers (shft, ctrl, etc)
00250                       //and esc, though more may exist...
00251                       //There ought to be a better way!
00252                       int((irec[i].Event.KeyEvent.uChar.AsciiChar)) &&
00253                       (int((irec[i].Event.KeyEvent.uChar.AsciiChar)) != 27))
00254                   {
00255                      winnum += irec[i].Event.KeyEvent.wRepeatCount;
00256                   }
00257                }
00258                delete [] irec;
00259                if (PRINT_ERRS&&(winnum))  
00260                   cerr << "num_bytes_to_read - Num=" << winnum << endl;
00261                return winnum;
00262             }
00263             return 0;
00264          }
00265 
00266          // This isn't a socket - assume at least 1 byte to read
00267          cerr << "Returning 1" << endl;
00268          return 1;
00269       } 
00270       else 
00271       {
00272          cerr <<"::num_bytes_to_read() - ioctlsocket() returned " 
00273               << retval << ", error:" << error << endl;
00274          WSASetLastError(0);
00275          return -1;
00276       }
00277    }
00278    return (int) winnum;
00279 #else
00280    int num = 0;
00281    int retval = ioctl(fildes, FIONREAD, &num);
00282    if (retval < 0) {
00283       return -1;
00284    }
00285    return num;
00286 #endif
00287 }
00288 
00289 
00290 static char *
00291 get_host_print_name(
00292    int         port,
00293    const char *hname = 0
00294    )
00295 {
00296    static char nbuff[255];
00297    static char buff[255];
00298    if (!hname) {
00299       gethostname(buff, 255);
00300       hname = buff;
00301    }
00302 
00303    struct hostent *entry = gethostbyname(hname);
00304    sprintf(nbuff, "%s(%d)", entry ? entry->h_name : hname, port);
00305 
00306    return nbuff;
00307 }
00308 
00309 static int NET_exception = 0;
00310 
00311 
00312 extern "C" {
00313 static 
00314 void
00315 net_exception_handler(int)
00316 {
00317    NET_exception = 1;
00318    signal(SIGPIPE, net_exception_handler);
00319 }
00320 }
00321 
00322 /* -----------------------  NetHost Class   ------------------------------- */
00323 //**********************************************************************
00324 //
00325 //  CLASS:  NetHost
00326 //  DESCR:  Provides information about a particular machine
00327 //          on the network.
00328 //
00329 //  USAGE:
00330 // 
00331 //     NetHost aHost("markov");
00332 //     NetHost sameHost("128.148.31.79");
00333 //     
00334 #define CNetHost const NetHost
00335 class NetHost {
00336  protected:
00337    unsigned long addr_;
00338    str_ptr       name_;
00339    int           port_;
00340 
00341  public:
00342 
00343             NetHost   (const  char    *hostname);
00344             NetHost   (struct sockaddr_in *addr);
00345             NetHost   (CNetHost &rhs) : addr_(rhs.addr_),name_(rhs.name_),port_(rhs.port_) { }
00346    NetHost& operator= (CNetHost &rhs) { addr_ = rhs.addr_; name_ = rhs.name_;
00347                                         port_ = rhs.port_; return *this; }
00348 
00349    int     port(void)       const     { return port_; }
00350    str_ptr name(void)       const     { return name_; }
00351    void    get_address(struct sockaddr_in *addr) const {
00352       // DON'T free this memory; copy it
00353       // returns architecture-dependent address information
00354       memset(addr, 0, sizeof(sockaddr_in));
00355       addr->sin_family = AF_INET;
00356       addr->sin_addr.s_addr = addr_;
00357    }
00358 };
00359 
00360 NetHost::NetHost(
00361    const char *hostname
00362    )
00363 {
00364    struct hostent *entry;
00365 
00366    assert(hostname != NULL);
00367 
00368    if (isdigit(hostname[0])) {
00369       unsigned long netAddr = inet_addr(hostname);
00370       entry = gethostbyaddr((const char*) &netAddr, sizeof(netAddr), AF_INET);
00371       if (entry) {
00372          name_ = str_ptr(entry->h_name);
00373       } else name_ = hostname;
00374       addr_ = netAddr;
00375       port_ = -1;
00376    } else {
00377       entry = gethostbyname(hostname);
00378 
00379       if (entry == NULL) {
00380          cerr << "NetHost: Could not resolve hostname!" << endl;
00381          exit(1);
00382       }
00383 
00384       name_ = str_ptr(entry->h_name);
00385       addr_ = *(unsigned long*)(entry->h_addr_list[0]);
00386       port_ = -1;
00387    }
00388 
00389 }
00390 
00391 NetHost::NetHost(
00392    struct sockaddr_in *addr
00393    )
00394 {
00395    assert(addr != NULL);
00396 
00397    struct hostent *entry;
00398 
00399    entry = gethostbyaddr((const char*) &addr->sin_addr,
00400                           sizeof(addr->sin_addr),
00401                           addr->sin_family);
00402 
00403    if (entry == NULL) {
00404       perror("NetHost(sockaddr): gethostbyaddr");
00405       exit(1);
00406    }
00407 
00408    port_ = ntohs(addr->sin_port);
00409    name_ = str_ptr(entry->h_name);
00410    addr_ = *(unsigned long*)(entry->h_addr_list[0]);
00411 }
00412 
00413 
00414 /* -----------------------  NetStream Class ------------------------------- */
00415 
00416 void
00417 NetStream::set_port(
00418    int p
00419    )
00420 {
00421    port_ = p;
00422    print_name_ = get_host_print_name(port(), **name());
00423 }
00424 
00425 NetStream::NetStream(
00426    int            port, 
00427    const char    *name
00428    ) : name_(name), port_(port), msgSize_(-1), 
00429        processing_(0), print_name_(get_host_print_name(port,name))
00430 {
00431    NetHost            host(name);
00432    struct sockaddr_in serv_addr;
00433 
00434    host.get_address(&serv_addr);
00435    serv_addr.sin_port = htons((short) port);
00436 
00437    if ((_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00438       _die("socket");
00439 
00440    else if (connect(_fd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
00441 
00442       _die("connect");
00443 
00444    else
00445       no_linger(_fd);
00446 
00447    block(STD_FALSE);
00448 
00449    if (_fd != -1) {
00450       set_blocking(false);
00451       no_tcp_delay(); // don't wait for large packets before sending a message
00452    }
00453 
00454 }
00455 
00456 NetStream::NetStream(
00457    int                 fd, 
00458    struct sockaddr_in *client,
00459    bool                should_block
00460    ): name_(""), port_(-1), msgSize_(-1), processing_(0)
00461 {
00462    _fd = fd;
00463    if (!should_block) set_blocking(false);
00464 
00465    if (client == 0) 
00466    {
00467       name_ = str_ptr(fd);
00468       port_ = 0;
00469       print_name_ = str_ptr("file descriptor ") + name_;
00470       // XXX - assumes fd is not a socket and doesn't need no_tcp_delay()
00471    } 
00472    else 
00473    {
00474       NetHost host(client);
00475       name_       = host.name();
00476       port_       = -1;  // host.port();
00477       print_name_ = str_ptr(get_host_print_name(port_, **name_));
00478 
00479       no_linger(_fd);
00480       if (_fd != -1) 
00481       {
00482          // don't wait for large packets before sending a message
00483          no_tcp_delay();
00484       }
00485    }
00486 
00487    block(STD_FALSE);
00488 }
00489 
00490 NetStream::NetStream(
00491    Cstr_ptr     &name, 
00492    NetStream::StreamFlags   flags) :
00493       name_(name), 
00494       port_(0), 
00495       msgSize_(-1), 
00496       processing_(0), 
00497       print_name_(name)
00498 {
00499 #ifdef WIN32
00500    int readable  = flags & StreamFlags::read;
00501    int writeable = flags & StreamFlags::write;
00502    int do_ascii  = flags & StreamFlags::ascii;
00503 #else
00504    int readable  = flags & read;
00505    int writeable = flags & write;
00506    int do_ascii  = flags & ascii;
00507 #endif
00508 
00509 #if (defined(__GNUC__) && (__GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 97)))
00510    if (do_ascii) {
00511       fstream* fs = 0;
00512       if (readable && writeable) 
00513       {
00514          // We don't expect this to happen...
00515          // Because it's writeable, we'll truncate the file.
00516          // But it's also readable; is truncating the desired behavior?
00517          cerr << "NetStream::NetStream() -  Stream is readable AND writeable." << endl
00518               << "Warning: truncating stream for writing." << endl;
00519          fs = new fstream(**name, fstream::out | fstream::trunc | fstream::in);
00520       } 
00521       else if (writeable)
00522          fs = new fstream(**name, fstream::out | fstream::trunc);
00523       else if (readable)
00524          fs = new fstream(**name, fstream::in);
00525       
00526       if(fs && !fs->is_open())
00527       {
00528          delete fs;
00529          fs = 0;
00530       }
00531       _iostream = fs;
00532       // XXX - Can't get the file descriptor from the stream.
00533       // Hopefully we don't need it:
00534       _fd = -1; 
00535    } 
00536    else
00537 #endif
00538    {
00539 #ifndef WIN32
00540       _fd = open(**name,
00541                  writeable ? O_WRONLY | O_CREAT | O_TRUNC : O_RDONLY, 0666);
00542       set_blocking(false);
00543       if (_fd == -1) 
00544       {
00545          err_ret("NetStream::NetStream() - File: '%s'", **name);
00546       }
00547 #else
00548       if (writeable)
00549       {
00550          _fd = (int)CreateFile(**name, GENERIC_WRITE, FILE_SHARE_READ, NULL,
00551                                CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
00552       }
00553       else
00554       {
00555          //XXX - Bug, this will create even though OPEN_EXISTING is
00556          //being specified... Looking into a good workaround...
00557          //This is the chief reason that loading a non-existing .jot
00558          //file hangs in win32
00559          _fd = (int)CreateFile(**name, GENERIC_READ, FILE_SHARE_READ, NULL, 
00560                                OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
00561       }
00562       if (_fd == (int)INVALID_HANDLE_VALUE)
00563       {
00564          err_mesg(ERR_LEV_WARN | ERR_INCL_ERRNO, "NetStream::NetStream() - Failed opening: '%s'", **name);
00565       }
00566       else
00567       {
00568          if (do_ascii) 
00569          {
00570             int old_fd = _fd;
00571             _fd = _open_osfhandle (_fd,           (_O_APPEND)        | // <--- need?
00572                                     ((!writeable)?(_O_RDONLY): (0))  | 
00573                                       ((do_ascii)?(_O_TEXT  ): (0)));
00574 
00575             err_adv(Config::get_var_bool("PRINT_ERRS",false,true), 
00576                "NetStream::NetStream() - Was OS Handle: %d  --> Now using C fd: %d", old_fd, _fd);
00577 
00578             if (_fd == -1)
00579                err_msg("NetStream::NetStream() - Failed _open_osfhandle: '%s'", **name);            
00580          }
00581       }
00582 #endif
00583    }
00584 
00585 #if (defined(__GNUC__) && (__GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 97)))
00586 #elif (defined(_MSC_VER) && (_MSC_VER >= 1300))
00587 #else 
00588    if (do_ascii) 
00589    {
00590      fstream *fs = new fstream;
00591      fs->attach(_fd); 
00592      if (fs->is_open()) _iostream = fs;
00593    }
00594 #endif
00595 
00596    block(STD_FALSE);
00597 }
00598 
00599 
00600 NetStream::~NetStream()
00601 { 
00602 
00603    if (_fd >= 0) 
00604    {
00605       if (port_ > 0) 
00606       {
00607          // Only print out the following when reading from the net
00608          cerr <<  "NetStream : returning file descriptor :" << _fd << endl;
00609       }
00610       // Without the following line, jot kills certain shells on exit, since
00611       // stdin is left in non-blocking mode
00612       set_blocking(true);
00613 #ifndef WIN32
00614       close(_fd);
00615 #else
00616       //XXX - Pretty sure this is right... That is, 
00617       //though we use CreateFile to get a HANDLE,
00618       //_open_osfhandle returns a handle we close
00619       //with _close, which also closes the underlying
00620       //HANDLE (or so it seems)
00621       int ret;
00622       if ((_fd == fileno(stdin)) || (_iostream)) //ascii
00623       {
00624          ret = _close(_fd);
00625          assert(ret==0);
00626       }
00627       else
00628       {
00629          ret = CloseHandle((HANDLE)_fd);
00630          assert(ret!=0);
00631       }
00632 #endif
00633    }
00634 }
00635 
00636 void
00637 NetStream::remove_me()
00638 {
00639    network_->remove_stream(this);
00640 }
00641 
00642 void
00643 NetStream::no_tcp_delay(
00644    )
00645 {
00646    // DON'T "BLOCK" WHEN USING TCP
00647    //"on the sender's side, to insure that TCP doesn't send too many small
00648    //packets, it refuses to send a packet if there is an ack outstanding
00649    //unless it is has good-sized (i.e., big) packet to send. This feature
00650    //can be turned off (e.g., X turns it off) by specifying the TCP_NODELAY
00651    //option to setsockopt -- see the man pages for setsockopt and tcp." -twd
00652    if (!Config::get_var_bool("DO_TCP_DELAY",false,true)) {
00653       int on=1;
00654       if (setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, (char *)&on, sizeof(on))) {
00655          cerr << "NetStream::no_tcp_delay-  setsockopt(TCP_NODELAY) on " <<
00656             print_name() << " (" << _fd<< ")";
00657          perror("");
00658       }
00659    }
00660 }
00661 
00662 void 
00663 NetStream::_die(
00664    const char *msg
00665    )
00666 {
00667    if (!Config::get_var_bool("NO_CONNECT_ERRS",false,true)) {
00668       cerr << "NetStream(" << name_ << ":" << port_ << "): " << msg << ": ";
00669       perror(NULL);
00670    }
00671    _fd = -1;
00672 }
00673 
00674 ssize_t 
00675 NetStream::read_from_net(
00676    void   *buf, 
00677    size_t  nbytes
00678    ) const
00679 {
00680    char  *tmpbuf  = (char*) buf;
00681    int    numread = 0;
00682    double stime   = 0;
00683 
00684    while (nbytes) {
00685 #ifdef WIN32
00686       int readb = read_win32(_fd, tmpbuf, nbytes);
00687 #else
00688       int readb = ::read(_fd, tmpbuf, nbytes);
00689 #endif
00690 
00691       if (errno == EAGAIN) { // if nothing's left to read, then
00692          if (Config::get_var_bool("PRINT_ERRS",false,true)) 
00693             cerr << "  bytes read from network (EAGAIN) = " << numread << endl;
00694          return numread + (readb == -1 ? 0:readb);  //just return what we have
00695      }
00696 
00697       if (readb < 0) {
00698          perror("NetStream::read_from_net : Warning - ");
00699          return -1;
00700       }
00701 
00702       if (readb == 0 && nbytes > 0) 
00703       {
00704 #ifdef WIN32
00705          //XXX - errno not set on WIN32 when there's not enough
00706          //to read on a non-blocking fd... should prolly
00707          //set some state in read_win32 to reflect this, but
00708          //for now just assume this is the reason and
00709          //return without error...
00710          return numread + (readb == -1 ? 0:readb);
00711 #else
00712          if (stime == 0)
00713             stime = the_time();
00714 
00715          if (the_time() - stime > 1 || errno != EAGAIN) {
00716             if (port_ > 0) {
00717                // Only print out a message if reading from the net
00718                cerr << "NetStream::read_from_net - read error: peer reset"
00719                     << endl;
00720             }
00721             return -1;
00722          }
00723 #endif
00724       }
00725 
00726       nbytes -= readb;
00727       tmpbuf += readb;
00728       numread+= readb;
00729    }
00730    if (Config::get_var_bool("PRINT_ERRS",false,true)) 
00731       cerr << "  bytes read from network = " << numread << endl;
00732    return numread;
00733 }
00734 
00735 void
00736 NetStream::set_blocking(bool val) const
00737 {
00738 #ifdef WIN32
00739    // XXX - add support for non-blocking i/o
00740    if (Config::get_var_bool("PRINT_ERRS",false,true)) 
00741       cerr << "NetStream::set_blocking - not supported" << endl;
00742 #else
00743    int flags;
00744    if((flags = fcntl(_fd, F_GETFL, 0))<0) {
00745       err_ret("NetStream::set_blocking: fcntl(..,F_GETFL)");
00746       return;
00747    }
00748    if (val) {
00749       flags &= ~O_NDELAY;
00750    } else {
00751       flags |= O_NDELAY;
00752    }
00753    if (fcntl(_fd, F_SETFL, flags)<0) {
00754       err_ret("NetStream::set_blocking: fcntl(..,F_GETFL)");
00755       return;
00756    }
00757 #endif
00758 }
00759 
00760 ssize_t 
00761 NetStream::write_to_net(
00762    const void *buf, 
00763    size_t      nbytes
00764    ) const
00765 {
00766    set_blocking(true);
00767 #ifdef WIN32
00768    ssize_t   bytes_written = write_win32(_fd, buf, nbytes);
00769 #else
00770    ssize_t   bytes_written = ::write(_fd, buf, nbytes);
00771 #endif
00772    set_blocking(false);
00773 
00774    if (bytes_written < 0 || NET_exception) {
00775       perror("NetStream::write_to_net: Warning: ");
00776       NET_exception = 0;
00777    } else if (bytes_written < (ssize_t)nbytes)
00778       cerr << "Couldn't flush the buffer.  Some data wasn't written. (nbytes="
00779            << nbytes << " written=" << bytes_written << ")\n";
00780    return bytes_written;
00781 }
00782 
00783 void
00784 NetStream::no_linger(int fd)
00785 {
00786    int           reuse = 1;
00787    struct linger ling;
00788 
00789    ling.l_onoff  = 0;
00790    ling.l_linger = 0;
00791 
00792    if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&ling, sizeof(ling))) {
00793       perror("NetStream::no_linger.  setsockopt - SO_LINGER :");
00794    }
00795    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse))) {
00796       perror("NetStream::no_linger.  setsockopt - SO_REUSEADDR:");
00797    }
00798 }
00799 
00800 int
00801 NetStream::interpret()
00802 {
00803    char    buff_[256], *buff = buff_;
00804    NETenum code;
00805    int     port;
00806    processing_ = 1;
00807    int     ret = 0;
00808 
00809    while (_in_queue.count()) {
00810       *this >> code;
00811       switch (code) {
00812             case NETadd_connection: {
00813                *this >> buff >> port;  
00814                NetStream *s = new NetStream(port, buff);
00815                if (s->fd() != -1)
00816                   network_->add_stream(s);
00817                network_->interpret(code, this);
00818             }
00819             brcase NETquit: {
00820                network_->interpret(code, this);
00821                network_->remove_stream(this);
00822                ret = 1;  // this return value should terminate this NetStream
00823             }
00824             brcase NETidentify :
00825                *this >> port;  
00826                set_port(port);
00827                if (network_->first_) {
00828                   cerr << "NetStream accepts server -->" << print_name()<<endl;
00829                   for (int i=0; i<network_->nStreams_; i++)  {
00830                      NetStream *s = network_->streams_[i];
00831                      if (s->port() != -1 && s != this) {
00832                         *this << NETadd_connection 
00833                               << s->name()
00834                               << s->port()
00835                               << NETflush;
00836                      }
00837                   }
00838                }
00839             brcase NETtext:
00840                *this >> buff;
00841                cerr << "(* " << print_name() << " *) " << buff << endl;
00842             brcase NETbroadcast: { 
00843                str_ptr flag;
00844                *this >> flag;
00845                int i;
00846                for (i = 0; i < tags_.num(); i++)
00847                   if (flag == tags_[i])
00848                      break; 
00849                if (i == tags_.num()) {
00850                   _in_queue.remove_all();
00851                   if (Config::get_var_bool("PRINT_ERRS",false,true))
00852                      cerr << "Ignoring broadcast " << flag << endl;
00853                }
00854             }
00855             brcase NETbarrier: network_->_at_barrier++;
00856             brdefault : 
00857                //XXX - Added a way to bail out of bad stream...
00858                if (network_->interpret(code, this)) return 1;
00859          }
00860    }
00861    processing_ = 0;
00862 
00863    return ret;
00864 }
00865 
00866 
00867 void NetStream::flush_data()
00868 {
00869    int count = _out_queue.count();
00870    if (!count) return;
00871    if (Config::get_var_bool("PRINT_ERRS",false,true)) 
00872       cerr << "NetStream: sending message to net of length " << count << endl;
00873    
00874    // Pack up 
00875    int packcount = 0;
00876    char packbuf_space[sizeof(int)];
00877    char *packbuf = packbuf_space;
00878    UGA_PACK_WORD(count, packbuf, packcount);
00879 
00880    write_to_net(packbuf_space, packcount);
00881    flush();
00882 }
00883 
00884 
00885 int
00886 NetStream::read_stuff()
00887 {
00888 
00889    const unsigned int BUFSIZE= 666666;
00890    char buff[BUFSIZE];
00891    int  num_read = 0;
00892 
00893    if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: ReadStuff called\n";
00894 
00895    // If we do not have a message size (msgSize_), read it from the network
00896    if (msgSize_ == -1) {
00897 
00898       char packbuf_space[sizeof(int) + 1];
00899       char *packbuf = packbuf_space;
00900       int nread = read_from_net(packbuf, sizeof(int));
00901       if (nread < 0)
00902          return nread;
00903       int count = 0;
00904       UGA_UNPACK_INTEGER(msgSize_, packbuf, count);
00905       if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: msgSize is " << msgSize_ << endl;
00906    }
00907 
00908    // After we know the message size, we read everything that is available on
00909    // the network, in blocks of BUFSIZE bytes
00910    do {
00911       int nread = read_from_net(buff, BUFSIZE);
00912       if (nread <= 0) 
00913            return nread;
00914       else num_read = nread;
00915       if (msgSize_ > (int)BUFSIZE) {
00916          //XXX - Better error checking... (I hope)
00917          if (num_read != (int)BUFSIZE) return num_read;
00918          _in_queue.put((UGAptr)buff, BUFSIZE);
00919          msgSize_ -= BUFSIZE;
00920          if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: Big message, storing first BUFSIZE bytes (msgSize = " << msgSize_ << endl;
00921          num_read  = 0;
00922       }
00923    } while (num_read == 0);
00924 
00925    // If we have read at least one full message...
00926    char *tbuf = buff; 
00927    if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: processing num_read " << num_read << endl;
00928    if (num_read >= msgSize_) {
00929       // For each full message...
00930       while (num_read && num_read >= msgSize_) {
00931         _in_queue.put((UGAptr)tbuf, msgSize_); // Stuff the message onto queue
00932         num_read -= msgSize_;                  // skip to end of this message
00933         tbuf     += msgSize_;
00934 
00935         if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: processing full_message " << msgSize_ << " (num_read = " << num_read << endl;
00936 
00937         if (interpret() != 0)                  // Let app decode message
00938            return 1;  // this flag terminates this NetStream connection
00939 
00940         // If we still have more data to process read the next message size
00941         if (num_read > 0) {
00942            int count = 0;
00943            UGA_UNPACK_INTEGER(msgSize_, tbuf, count); // tbuf is updated
00944            num_read -= count;
00945            if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: next message" << msgSize_ << " (num_read = " << num_read << endl;
00946         } else
00947            msgSize_ = -1; // Otherwise, clear the message size
00948       }
00949    }
00950    // Anything left over is less than a complete message, so we store it
00951    // away in _in_queue and decrease our msgSize_ request accordingly
00952    _in_queue.put(tbuf, num_read);
00953    msgSize_ -= num_read;
00954    if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: saved for next time " << num_read << " (msgSize = " << msgSize_ << endl;
00955    return 0;
00956 }
00957 
00958 void
00959 NetStream::sample()
00960 {
00961    if (read_stuff() != 0)
00962       network()->remove_stream(this);
00963 }
00964 
00965 /* -----------------------  Network Class   ------------------------------- */
00966 
00967 // Default number of connections the operating system should queue.
00968 int Network::NETWORK_SERVER_BACKLOG = 5;
00969 
00970 void
00971 Network::connect_to(
00972    NetStream *s
00973    )
00974 {
00975    if (s && s->fd() != -1) {
00976       add_stream(s); 
00977       if (Config::get_var_bool("PRINT_ERRS",false,true))
00978           cerr << "Network::connect_to - sending identity to server" << endl;
00979       *s << NETidentify << port_ << NETflush;
00980   }
00981 }
00982 
00983 void
00984 Network::barrier()
00985 {
00986    for (int i=0; i<nStreams_; i++)
00987       *streams_[i] << NETbarrier << NETflush;
00988 
00989    while (_at_barrier < nStreams_)
00990       _manager->loop(0);
00991    _at_barrier-=nStreams_;
00992 }
00993 
00994 int
00995 Network::processing(void) const
00996 {
00997    int yes = 0; 
00998    for (int i=0; !yes && i < nStreams_; i++) 
00999       yes = streams_[i]->processing(); 
01000    return yes;
01001 }
01002 
01003 NetStream *
01004 Network::wait_for_connect()
01005 {
01006    struct sockaddr_in  cli_addr;
01007    socklen_t           clilen = sizeof(cli_addr);
01008    int                 newFd;
01009    NetStream          *newStream;
01010 
01011    if ((newFd = accept(_fd, (struct sockaddr*) &cli_addr, &clilen)) < 0)
01012       _die("accept");
01013 
01014    // Should really handle EWOULDBLOCK...
01015 
01016    if (!(newStream = new NetStream(newFd, &cli_addr)))
01017       _die("out of memory");
01018 
01019    return newStream;
01020 }
01021 
01022 void 
01023 Network::remove_stream(
01024    NetStream *s
01025    ) 
01026 {
01027    // notify observers.  (Do this before deleting the stream)
01028    notify_net(Network_obs::remove_str, s);
01029 
01030    int i=0;
01031    while (i<nStreams_ && streams_[i] != s) ++i;
01032    if (i < nStreams_) {
01033       Unregister(s);
01034       streams_[i] = streams_[--nStreams_];
01035       delete s;
01036    }
01037 }
01038 
01039 void 
01040 Network::accept_stream(void) 
01041 {
01042    NetStream *s = wait_for_connect();
01043    cerr << "Network   accept_stream from ---->" << s->name() << endl;
01044    add_stream(s);
01045    add_client(s);
01046 
01047    // notify observers
01048    notify_net(Network_obs::accept_str, s);
01049 }
01050 
01051 void
01052 Network::add_client(
01053    NetStream *cli
01054    )
01055 { 
01056    *cli << NETtext << "Initialize World" << NETflush; 
01057 }
01058 
01059 
01060 void 
01061 Network::_die(
01062    const char *msg
01063    )
01064 {
01065    cerr << "Network(" << name_ << "): " << msg << ": ";
01066    perror(NULL);
01067    exit(1);
01068 }
01069 
01070 
01071 char *
01072 Network::configure(
01073    int port, 
01074    int backlog
01075    )
01076 {
01077    struct sockaddr_in serv_addr;
01078    char   buff[255];
01079 
01080    port_ = port;
01081    gethostname(buff, 255);
01082    name_ = str_ptr(buff);
01083 
01084    // Make the socket
01085    if ((_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
01086       return "socket";
01087 
01088    // Set up our address
01089    memset(&serv_addr, 0, sizeof(serv_addr));
01090    serv_addr.sin_family = AF_INET;
01091    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
01092    serv_addr.sin_port = htons((short) port);
01093 
01094    // Tell the socket to not linger if the process is killed.
01095    NetStream::no_linger(_fd);
01096 
01097    if (bind(_fd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
01098       return "bind";
01099 
01100    if (port == 0) {
01101       socklen_t foo(sizeof(struct sockaddr_in));
01102       getsockname(_fd,  (struct sockaddr *)&serv_addr, &foo);
01103       port_ = int(ntohs(serv_addr.sin_port));
01104    }
01105 
01106    // set port up to queue up to 'backlog' connection requests
01107    if (listen(_fd, backlog) < 0)
01108       return "listen";
01109 
01110    // now that our file descriptor, _fd, has been setup, we need
01111    // to register ourself (really our _fd) with the file descriptor
01112    // manager.
01113    Register(); // register the master socket that waits for new connections
01114 
01115    cerr << "Network: server "<<name_<<" on port "<< port_<< endl;
01116 
01117    signal(SIGPIPE, net_exception_handler);
01118 
01119    return 0;
01120 }
01121 
01122 
01123 void
01124 Network::start(
01125    int myPort
01126    )
01127 { 
01128    char *msg;
01129 
01130    // if myPort is 0, then we arbitrarily determine that this Network
01131    // will be responsible for configuring all new clients when they
01132    // arrive -- see accept_stream()
01133    first_ = (myPort == 0 ? 0 : 1);
01134    // setup master socket to wait for connections:
01135    if ((msg = configure(myPort))) 
01136       _die(msg);
01137 }
01138 
01139 void  
01140 Network::flush_data()
01141 { 
01142    for (int i=0; i<nStreams_; i++) 
01143       streams_[i]->flush_data(); 
01144 }
01145 
01146 
01147 STDdstream &
01148 operator >> (
01149    STDdstream &ds,  
01150    NETenum    &m
01151    )
01152 {
01153    int x;
01154    ds >> x;
01155    m = NETenum(x);
01156    return ds;
01157 }
01158 
01159 
01160 STDdstream &
01161 operator << (
01162    STDdstream &ds,  
01163    NETenum     m
01164    ) 
01165 {
01166    switch (m) {
01167        case NETflush  : if (ds.ascii()) {
01168                             *ds.ostr() << endl;
01169                             ds.ostr()->flush();
01170                         }
01171                         else ((NetStream &)ds).flush_data();
01172      brdefault        : { int x(m);
01173                           ds << x;
01174                         }
01175    }
01176    return ds;
01177 }
01178 
01179 

Generated on Mon Sep 18 11:39:32 2006 for jot by  doxygen 1.4.4