Reactor.cpp

Go to the documentation of this file.
00001 // -*- c++ -*-
00002 //------------------------------------------------------------------------------
00003 //                          Reactor.cpp
00004 //------------------------------------------------------------------------------
00005 //  Copyright (C) 1997-2002,2005  Vladislav Grinchenko
00006 //
00007 //  This library is free software; you can redistribute it and/or
00008 //  modify it under the terms of the GNU Library General Public
00009 //  License as published by the Free Software Foundation; either
00010 //  version 2 of the License, or (at your option) any later version.
00011 //----------------------------------------------------------------------------- 
00012 //  Created: 05/25/1999
00013 //----------------------------------------------------------------------------- 
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017 
00018 #include "assa/Reactor.h"
00019 
00020 using namespace ASSA;
00021 
00022 Reactor::
00023 Reactor () : 
00024     m_noFiles (1024), m_maxfd (0), m_active (true),
00025     m_readSet ((EventHandler**) NULL), 
00026     m_writeSet ((EventHandler**) NULL), 
00027     m_exceptSet ((EventHandler**) NULL)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030     
00031     struct rlimit rlim;
00032     rlim.rlim_max = 0;
00033 
00034     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00035         m_noFiles = rlim.rlim_cur;
00036     }
00037 
00038     m_readSet = new EventHandler* [m_noFiles];
00039     m_writeSet = new EventHandler* [m_noFiles];
00040     m_exceptSet = new EventHandler* [m_noFiles];
00041 
00042     for (int i = 0; i < m_noFiles; i++) {
00043         m_readSet[i] = NULL;
00044         m_writeSet[i] = NULL;
00045         m_exceptSet[i] = NULL;
00046     }
00047 }
00048 
00049 Reactor::
00050 ~Reactor()
00051 {   
00052     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00053 
00054     delete [] m_readSet;
00055     delete [] m_writeSet;
00056     delete [] m_exceptSet;
00057 }
00058 
00059 TimerId
00060 Reactor::
00061 registerTimerHandler (EventHandler* eh_, 
00062                       const TimeVal& timeout_,
00063                       const std::string& name_)
00064 {
00065     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00066     Assure_return (eh_);
00067 
00068     TimeVal now (TimeVal::gettimeofday());
00069     TimeVal t (now + timeout_);
00070 
00071     DL((REACT,"TIMEOUT_EVENT: (%d,%d)\n",  timeout_.sec(),timeout_.msec()));
00072     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00073     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00074 
00075     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00076 
00077     DL((REACT,"---Modified Timer Queue----\n"));
00078     m_tqueue.dump();
00079     DL((REACT,"---------------------------\n"));
00080 
00081     return (tid);
00082 }
00083 
00084 bool 
00085 Reactor::
00086 registerIOHandler (EventHandler* eh_, int fd_, EventType et_)
00087 {
00088     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00089 
00090     std::ostringstream msg;
00091     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00092 
00093     if ( isReadEvent (et_) ) {
00094 
00095         if ( !m_waitSet.m_rset.setFd (fd_) ) {
00096             DL((ERROR,"readset: fd %d out of range\n", fd_));
00097             return (false);
00098         }
00099         m_readSet[fd_] = eh_;
00100         msg << "READ_EVENT";
00101     }
00102 
00103     if ( isWriteEvent (et_) ) {
00104 
00105         if ( !m_waitSet.m_wset.setFd (fd_) ) {
00106             DL((ERROR,"writeset: fd %d out of range\n", fd_));
00107             return (false);
00108         }
00109         m_writeSet[fd_] = eh_;
00110         msg << " WRITE_EVENT";
00111     }
00112     if ( isExceptEvent (et_) ) {
00113         if ( !m_waitSet.m_eset.setFd (fd_) ) {
00114             DL((ERROR,"exceptset: fd %d out of range\n", fd_));
00115             return (false);
00116         }
00117         m_exceptSet[fd_] = eh_;
00118         msg << " EXCEPT_EVENT";
00119     }
00120     msg << std::ends;
00121     DL((REACT,"Registered EventHandler 0x%x FD (%d) for event(s) %s\n", 
00122         (u_long)eh_, fd_, msg.str ().c_str () ));
00123 
00124     if ( m_maxfd < fd_+1 ) {
00125         m_maxfd = fd_+1;
00126         DL((REACT,"maxfd+1 adjusted to %d\n",m_maxfd));
00127     }
00128     DL((REACT,"Modified waitSet:\n"));
00129     m_waitSet.dump ();
00130 
00131     return (true);
00132 }
00133 
00134 bool 
00135 Reactor::
00136 removeTimerHandler (TimerId tid_)
00137 {
00138     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00139     bool ret;
00140 
00141     if ((ret = m_tqueue.remove (tid_))) {
00142         DL((REACT,"---Modified Timer Queue----\n"));
00143         m_tqueue.dump();
00144         DL((REACT,"---------------------------\n"));
00145     }
00146     else {
00147         EL((ERROR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00148     }
00149     return (ret);
00150 }
00151 
00152 bool 
00153 Reactor::
00154 removeHandler (EventHandler* eh_, EventType et_)
00155 {
00156     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00157 
00158     if (eh_ == NULL) {
00159         return false;
00160     }
00161     bool ret = false;
00162     register int fd;
00163 
00164     if (isTimeoutEvent (et_)) {
00165         ret = m_tqueue.remove (eh_);
00166     }
00167 
00168     if (isReadEvent (et_) || isWriteEvent (et_) || isExceptEvent (et_)) {
00169         for (fd = 0; fd < m_maxfd; fd++) {
00170             if (m_readSet[fd] == eh_ ||  m_writeSet[fd] == eh_ ||
00171                 m_exceptSet[fd] == eh_) {
00172                 ret = removeIOHandler (fd);
00173             }
00174         }
00175     }
00176     return (ret);
00177 }
00178 
00179 bool
00180 Reactor::
00181 removeIOHandler (int fd_)
00182 {
00183     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00184 
00185     Assure_return (fd_ >= 0 && fd_ < m_noFiles);
00186 
00187     DL((REACT,"Removing Handler fd = %d\n",fd_));
00188 
00189     EventHandler* eh = NULL;
00190 
00191     if      ( m_readSet[fd_] )   eh = m_readSet[fd_];
00192     else if ( m_writeSet[fd_] )  eh = m_writeSet[fd_];
00193     else if ( m_exceptSet[fd_] ) eh = m_exceptSet[fd_];
00194 
00195     if (eh) {
00196         DL((REACT,"Found EvtHandler 0x%x\n",(u_long)eh));
00197         eh->handle_close (fd_);
00198     }
00199 
00200     m_readSet[fd_] = NULL;
00201     m_writeSet[fd_] = NULL;
00202     m_exceptSet[fd_] = NULL;
00203 
00204     m_waitSet.m_rset.clear (fd_);
00205     m_waitSet.m_wset.clear (fd_);
00206     m_waitSet.m_eset.clear (fd_);
00207 
00208     m_readySet.m_rset.clear (fd_);
00209     m_readySet.m_wset.clear (fd_);
00210     m_readySet.m_eset.clear (fd_);
00211 
00212     if ( m_maxfd == fd_+1 ) {
00213         while ( m_maxfd > 0                    &&
00214                 m_readSet  [m_maxfd-1] == NULL &&
00215                 m_writeSet [m_maxfd-1] == NULL &&
00216                 m_exceptSet[m_maxfd-1] == NULL )
00217             {
00218                 m_maxfd--;
00219             }
00220     }
00221     DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd));
00222     DL((REACT,"Modifies waitSet:\n"));
00223     m_waitSet.dump ();
00224 
00225     return (true);
00226 }
00227 
00228 bool
00229 Reactor::
00230 checkFDs (void)
00231 {
00232     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00233     
00234     bool num_removed = false;
00235     FdSet mask;
00236     timeval poll = { 0, 0 };
00237 
00238     for (int fd = 0; fd < m_noFiles; fd++) {
00239         if ( m_readSet[fd] != NULL ) {
00240             mask.setFd (fd);
00241             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00242                 removeIOHandler (fd);
00243                 num_removed = true;
00244                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00245             }
00246             mask.clear (fd);
00247         }
00248     }
00249     return (num_removed);
00250 }
00251 
00252 bool
00253 Reactor::
00254 handleError (void)
00255 {
00256     trace_with_mask("Reactor::handleError",REACTTRACE);
00257 
00258     /*---  If commanded to stop, do so --*/
00259     if ( !m_active ) {
00260         DL((REACT,"Received cmd to stop Reactor\n"));
00261         return (false);
00262     }
00263 
00264     /*---
00265       TODO: If select(2) returns before time expires, with
00266       a descriptor ready or with EINTR, timeval is not
00267       going to be updated with number of seconds remaining.
00268       This is true for all systems except Linux, which will
00269       do so. Therefore, to restart correctly in case of
00270       EINTR, we ought to take time measurement before and
00271       after select, and try to select() for remaining time.
00272     
00273       For now, we restart with the initial timing value.
00274       ---*/
00275     /*---
00276       BSD kernel never restarts select(2). SVR4 will restart if
00277       the SA_RESTART flag is specified when the signal handler
00278       for the signal delivered is installed. This means taht for
00279       portability, we must handle signal interrupts.
00280       ---*/
00281 
00282     if ( errno == EINTR ) {
00283         EL((REACT,"EINTR: interrupted select(2)\n"));
00284         /*
00285           If I was sitting in select(2) and received SIGTERM,
00286           the signal handler would have set m_active to 'false',
00287           and this function would have returned 'false' as above.
00288           For any other non-critical signals (USR1,...),
00289           we retry select.
00290         */
00291         return (true);
00292     }
00293     /*
00294       EBADF - bad file number. One of the file descriptors does
00295       not reference an open file to open(), close(), ioctl().
00296       This can happen if user closed fd and forgot to remove
00297       handler from Reactor.
00298     */
00299     if ( errno == EBADF ) {
00300         DL((REACT,"EBADF: bad file descriptor\n"));
00301         return (checkFDs ());
00302     }
00303     /*
00304       Any other error from select
00305     */
00306     EL((ERROR,"select(3) error\n"));
00307     return (false);
00308 }
00309 
00310 int
00311 Reactor::
00312 isAnyReady (void)
00313 {
00314     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00315 
00316     int n = m_readySet.m_rset.numSet () +
00317         m_readySet.m_wset.numSet () +
00318         m_readySet.m_eset.numSet ();
00319 
00320     if ( n > 0 ) {
00321         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00322         m_readySet.dump ();
00323     }
00324     return (n);
00325 }
00326 
00327 void 
00328 Reactor::
00329 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00330 {
00331     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00332 
00333     TimeVal now;
00334     TimeVal tv;
00335 
00336     if (m_tqueue.isEmpty () ) {
00337         howlong_ = maxwait_;
00338         goto done;
00339     }
00340     now = TimeVal::gettimeofday ();
00341     tv = m_tqueue.top ();
00342     
00343     if (tv < now) {
00344         /*--- 
00345           It took too long to get here (fraction of a millisecond), 
00346           and top timer had already expired. In this case,
00347           perform non-blocking select in order to drain the timer queue.
00348           ---*/
00349         *howlong_ = 0;
00350     }
00351     else {  
00352         DL((REACT,"--------- Timer Queue ----------\n"));
00353         m_tqueue.dump();
00354         DL((REACT,"--------------------------------\n"));
00355 
00356         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00357             *howlong_ = tv - now;
00358         }
00359         else {
00360             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00361         }
00362     }
00363 
00364  done:
00365     if (howlong_ != NULL) {
00366         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00367     }
00368     else {
00369         DL((REACT,"delay (forever)\n"));
00370     }
00371 }
00372 
00373 void
00374 Reactor::
00375 waitForEvents (void)
00376 {
00377     while ( m_active ) {
00378         waitForEvents ((TimeVal*) NULL);
00379     }
00380 }
00381 
00382 /*******************************************************************************
00383    =====================================================================
00384    | select() | errno |      Events         | Behavior                 |
00385    |===================================================================|
00386    |    < 0   | EINTR | Interrup by signal  | Retry                    |
00387    +----------+-------+---------------------+--------------------------+
00388    |    < 0   | EBADF | Bad file descriptor | Remove bad fds and retry |
00389    |          |       |                     | and retry                |
00390    +----------+-------+---------------------+--------------------------+
00391    |    < 0   | others| Some other error    | Fall through             |
00392    +----------+-------+---------------------+--------------------------+
00393    |   == 0   |   0   | Timed out           | Fall through             |
00394    +----------+-------+---------------------+--------------------------+
00395    |    > 0   |   0   | Got some work to do | Fall through             |
00396    +-------------------------------------------------------------------+ 
00397 *******************************************************************************/
00398 void
00399 Reactor::
00400 waitForEvents (TimeVal* tv_)
00401 {
00402     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00403 
00404     TimerCountdown traceTime (tv_);
00405     DL((REACT,"======================================\n"));
00406 
00407     /*--- Expire all stale Timers ---*/
00408     m_tqueue.expire (TimeVal::gettimeofday ());
00409 
00410     /* Test to see if Reactor has been deactivated as a result
00411      * of processing done by any TimerHandlers.
00412      */
00413     if (!m_active) {
00414         return;
00415     }
00416 
00417     int nReady;
00418     TimeVal  delay;
00419     TimeVal* dlp = &delay;
00420 
00421     /*---
00422       In case if not all data are processed by the EventHandler,
00423       and EventHandler stated so in its callback's return value
00424       to dispatcher (), it will be called again. This way 
00425       underlying file/socket stream can efficiently utilize its
00426       buffering mechaninsm.
00427       ---*/
00428     if ((nReady = isAnyReady ())) {
00429         DL((REACT,"isAnyReady returned: %d\n",nReady));
00430         dispatch (nReady);
00431         return;
00432     }
00433 
00434     DL((REACT,"=== m_waitSet ===\n"));
00435     m_waitSet.dump ();
00436 
00437     do {
00438         m_readySet.reset ();
00439         m_readySet = m_waitSet;
00440         calculateTimeout (dlp, tv_);
00441 
00442         nReady = ::select (m_maxfd, 
00443                            &m_readySet.m_rset,
00444                            &m_readySet.m_wset, 
00445                            &m_readySet.m_eset, 
00446                            dlp);
00447         DL((REACT,"::select() returned: %d\n",nReady));
00448     } 
00449     while (nReady < 0 && handleError ());
00450 
00451     dispatch (nReady);
00452 }
00453 
00454 void 
00455 Reactor::
00456 dispatchHandler (FdSet& mask_, EventHandler** fdSet_, EH_IO_Callback callback_)
00457 {
00458     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00459 
00460     register int fd;
00461     register int ret;
00462 
00463     /*---
00464       This spot needs re-thinking. When you have several high data-rate
00465       connections sending data at the same time, the one that had
00466       connected first would get lower FD number and would get data
00467       transfer preference over everybody else who has connected later on.
00468       ---*/
00469 
00470     for (fd = 0; m_active && fd < m_maxfd; fd++) {
00471         if (mask_.isSet (fd) && fdSet_[fd] != NULL) {
00472 
00473             DL((REACT,"Data detected on connection %s (FD=%d)\n",
00474                 fdSet_[fd]->get_id ().c_str (), fd));
00475 
00476             if ((ret = (fdSet_[fd]->*callback_) (fd)) == -1) {
00477                 removeIOHandler (fd);
00478             }
00479             else if (ret > 0) {
00480                 DL((REACT,"More data (%d bytes) are pending on FD=%d\n",
00481                     ret,fd));
00482                 //return;   <-- would starve other connections
00483             }
00484             else {
00485                 DL((REACT,"All data are consumed from FD=%d\n", fd));
00486                 mask_.clear (fd);
00487             }
00488         }
00489     }
00490 }
00491 
00492 bool
00493 Reactor::
00494 dispatch (int ready_)
00495 {
00496     /*---
00497       Many UNIX systems will count a particular file descriptor in the 
00498       ready_ only ONCE, even if it was flagged by ::select(2) in, say, 
00499       both read and write masks.
00500       ---*/
00501     trace_with_mask("Reactor::dispatch", REACTTRACE);
00502 
00503     m_tqueue.expire (TimeVal::gettimeofday ());
00504 
00505     if ( ready_ < 0 ) {
00506         EL((ERROR,"::select(3) error\n"));
00507         return (false);
00508     }
00509     if ( ready_ == 0 ) {
00510         return (true);
00511     }
00512     DL((REACT,"Dispatching %d FDs\n",ready_));
00513 
00514     /*--- Writes first ---*/
00515     dispatchHandler (m_readySet.m_wset, 
00516                      m_writeSet, 
00517                      &EventHandler::handle_write);
00518 
00519     /*--- Exceptions next ---*/
00520     dispatchHandler (m_readySet.m_eset, 
00521                      m_exceptSet, 
00522                      &EventHandler::handle_except);
00523 
00524     /*--- Finally, the Reads ---*/
00525     dispatchHandler (m_readySet.m_rset, 
00526                      m_readSet, 
00527                      &EventHandler::handle_read);
00528     return (true);
00529 }
00530 
00531 void 
00532 Reactor::
00533 stopReactor (void) 
00534 { 
00535     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00536 
00537     m_active = false; 
00538     register int i;
00539     
00540     for (i = 0; i < m_maxfd; i++) {
00541         removeIOHandler (i);
00542     }
00543 }
00544 
00545 

Generated on Mon Dec 19 15:55:15 2005 for libassa by  doxygen 1.4.5