ASSA::Reactor Class Reference

#include <Reactor.h>

List of all members.

Public Member Functions

 Reactor ()
 Constructor.
 ~Reactor ()
 Destructor.
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor.
bool registerIOHandler (EventHandler *eh_, int fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor.
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both.
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue.
bool removeIOHandler (int fd_)
 Remove IO Event handler from reactor.
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events.
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified.
void stopReactor (void)
 Stop Reactor's activity.
void deactivate (void)
 Deactivate Reactor.

Private Member Functions

 Reactor (const Reactor &)
Reactoroperator= (const Reactor &)
 no cloning
bool handleError (void)
 Handle error in select(2) loop appropriately.
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured.
int isAnyReady (void)
 Return number of file descriptors ready accross all sets.
bool checkFDs (void)
 Check mask for bad file descriptors.
void dispatchHandler (FdSet &mask_, EventHandler **fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor.
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout.

Private Attributes

int m_noFiles
 Max number of open files per process.
int m_maxfd
 Max file descriptor number plus 1.
bool m_active
 Flag that indicates whether Reactor is active or had been stopped.
EventHandler ** m_readSet
 Event handlers awaiting on READ_EVENT.
EventHandler ** m_writeSet
 Event handlers awaiting on WRITE_EVENT.
EventHandler ** m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT.
MaskSet m_waitSet
 Handlers to wait for event on.
MaskSet m_readySet
 Handlers that are ready for processing.
TimerQueue m_tqueue
 The queue of Timers.


Detailed Description

Definition at line 53 of file Reactor.h.


Constructor & Destructor Documentation

Reactor::Reactor  ) 
 

Constructor.

Definition at line 23 of file Reactor.cpp.

References m_exceptSet, m_noFiles, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

00023            : 
00024     m_noFiles (1024), m_maxfd (0), m_active (true),
00025     m_readSet ((EventHandler**) NULL), 
00026     m_writeSet ((EventHandler**) NULL), 
00027     m_exceptSet ((EventHandler**) NULL)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030     
00031     struct rlimit rlim;
00032     rlim.rlim_max = 0;
00033 
00034     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00035         m_noFiles = rlim.rlim_cur;
00036     }
00037 
00038     m_readSet = new EventHandler* [m_noFiles];
00039     m_writeSet = new EventHandler* [m_noFiles];
00040     m_exceptSet = new EventHandler* [m_noFiles];
00041 
00042     for (int i = 0; i < m_noFiles; i++) {
00043         m_readSet[i] = NULL;
00044         m_writeSet[i] = NULL;
00045         m_exceptSet[i] = NULL;
00046     }
00047 }

Reactor::~Reactor  ) 
 

Destructor.

Definition at line 50 of file Reactor.cpp.

References m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

00051 {   
00052     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00053 
00054     delete [] m_readSet;
00055     delete [] m_writeSet;
00056     delete [] m_exceptSet;
00057 }

ASSA::Reactor::Reactor const Reactor  )  [private]
 


Member Function Documentation

void Reactor::calculateTimeout TimeVal *&  howlong_,
TimeVal maxwait_
[private]
 

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters:
maxwait_ (in) how long we are expected to wait for event(s).
howlong_ (out) how long we are going to wait.

Definition at line 329 of file Reactor.cpp.

References ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00330 {
00331     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00332 
00333     TimeVal now;
00334     TimeVal tv;
00335 
00336     if (m_tqueue.isEmpty () ) {
00337         howlong_ = maxwait_;
00338         goto done;
00339     }
00340     now = TimeVal::gettimeofday ();
00341     tv = m_tqueue.top ();
00342     
00343     if (tv < now) {
00344         /*--- 
00345           It took too long to get here (fraction of a millisecond), 
00346           and top timer had already expired. In this case,
00347           perform non-blocking select in order to drain the timer queue.
00348           ---*/
00349         *howlong_ = 0;
00350     }
00351     else {  
00352         DL((REACT,"--------- Timer Queue ----------\n"));
00353         m_tqueue.dump();
00354         DL((REACT,"--------------------------------\n"));
00355 
00356         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00357             *howlong_ = tv - now;
00358         }
00359         else {
00360             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00361         }
00362     }
00363 
00364  done:
00365     if (howlong_ != NULL) {
00366         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00367     }
00368     else {
00369         DL((REACT,"delay (forever)\n"));
00370     }
00371 }

bool Reactor::checkFDs void   )  [private]
 

Check mask for bad file descriptors.

Returns:
true if any fd(s) were found and removed; false otherwise

Definition at line 230 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, m_noFiles, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

00231 {
00232     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00233     
00234     bool num_removed = false;
00235     FdSet mask;
00236     timeval poll = { 0, 0 };
00237 
00238     for (int fd = 0; fd < m_noFiles; fd++) {
00239         if ( m_readSet[fd] != NULL ) {
00240             mask.setFd (fd);
00241             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00242                 removeIOHandler (fd);
00243                 num_removed = true;
00244                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00245             }
00246             mask.clear (fd);
00247         }
00248     }
00249     return (num_removed);
00250 }

void ASSA::Reactor::deactivate void   )  [inline]
 

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 219 of file Reactor.h.

References m_active.

Referenced by ASSA::GenServer::handle_signal(), and ASSA::GenServer::stop_service().

00219 {  m_active = false; }

bool Reactor::dispatch int  minimum_  )  [private]
 

Notify all EventHandlers registered on respecful events occured.

Parameters:
minimum_ number of file descriptors ready.

Definition at line 494 of file Reactor.cpp.

References dispatchHandler(), DL, EL, ASSA::ERROR, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00495 {
00496     /*---
00497       Many UNIX systems will count a particular file descriptor in the 
00498       ready_ only ONCE, even if it was flagged by ::select(2) in, say, 
00499       both read and write masks.
00500       ---*/
00501     trace_with_mask("Reactor::dispatch", REACTTRACE);
00502 
00503     m_tqueue.expire (TimeVal::gettimeofday ());
00504 
00505     if ( ready_ < 0 ) {
00506         EL((ERROR,"::select(3) error\n"));
00507         return (false);
00508     }
00509     if ( ready_ == 0 ) {
00510         return (true);
00511     }
00512     DL((REACT,"Dispatching %d FDs\n",ready_));
00513 
00514     /*--- Writes first ---*/
00515     dispatchHandler (m_readySet.m_wset, 
00516                      m_writeSet, 
00517                      &EventHandler::handle_write);
00518 
00519     /*--- Exceptions next ---*/
00520     dispatchHandler (m_readySet.m_eset, 
00521                      m_exceptSet, 
00522                      &EventHandler::handle_except);
00523 
00524     /*--- Finally, the Reads ---*/
00525     dispatchHandler (m_readySet.m_rset, 
00526                      m_readSet, 
00527                      &EventHandler::handle_read);
00528     return (true);
00529 }

void Reactor::dispatchHandler FdSet mask_,
EventHandler **  fdSet_,
EH_IO_Callback  callback_
[private]
 

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

Definition at line 456 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, ASSA::FdSet::isSet(), m_active, m_maxfd, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

00457 {
00458     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00459 
00460     register int fd;
00461     register int ret;
00462 
00463     /*---
00464       This spot needs re-thinking. When you have several high data-rate
00465       connections sending data at the same time, the one that had
00466       connected first would get lower FD number and would get data
00467       transfer preference over everybody else who has connected later on.
00468       ---*/
00469 
00470     for (fd = 0; m_active && fd < m_maxfd; fd++) {
00471         if (mask_.isSet (fd) && fdSet_[fd] != NULL) {
00472 
00473             DL((REACT,"Data detected on connection %s (FD=%d)\n",
00474                 fdSet_[fd]->get_id ().c_str (), fd));
00475 
00476             if ((ret = (fdSet_[fd]->*callback_) (fd)) == -1) {
00477                 removeIOHandler (fd);
00478             }
00479             else if (ret > 0) {
00480                 DL((REACT,"More data (%d bytes) are pending on FD=%d\n",
00481                     ret,fd));
00482                 //return;   <-- would starve other connections
00483             }
00484             else {
00485                 DL((REACT,"All data are consumed from FD=%d\n", fd));
00486                 mask_.clear (fd);
00487             }
00488         }
00489     }
00490 }

bool Reactor::handleError void   )  [private]
 

Handle error in select(2) loop appropriately.

Definition at line 254 of file Reactor.cpp.

References checkFDs(), DL, EL, ASSA::ERROR, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00255 {
00256     trace_with_mask("Reactor::handleError",REACTTRACE);
00257 
00258     /*---  If commanded to stop, do so --*/
00259     if ( !m_active ) {
00260         DL((REACT,"Received cmd to stop Reactor\n"));
00261         return (false);
00262     }
00263 
00264     /*---
00265       TODO: If select(2) returns before time expires, with
00266       a descriptor ready or with EINTR, timeval is not
00267       going to be updated with number of seconds remaining.
00268       This is true for all systems except Linux, which will
00269       do so. Therefore, to restart correctly in case of
00270       EINTR, we ought to take time measurement before and
00271       after select, and try to select() for remaining time.
00272     
00273       For now, we restart with the initial timing value.
00274       ---*/
00275     /*---
00276       BSD kernel never restarts select(2). SVR4 will restart if
00277       the SA_RESTART flag is specified when the signal handler
00278       for the signal delivered is installed. This means taht for
00279       portability, we must handle signal interrupts.
00280       ---*/
00281 
00282     if ( errno == EINTR ) {
00283         EL((REACT,"EINTR: interrupted select(2)\n"));
00284         /*
00285           If I was sitting in select(2) and received SIGTERM,
00286           the signal handler would have set m_active to 'false',
00287           and this function would have returned 'false' as above.
00288           For any other non-critical signals (USR1,...),
00289           we retry select.
00290         */
00291         return (true);
00292     }
00293     /*
00294       EBADF - bad file number. One of the file descriptors does
00295       not reference an open file to open(), close(), ioctl().
00296       This can happen if user closed fd and forgot to remove
00297       handler from Reactor.
00298     */
00299     if ( errno == EBADF ) {
00300         DL((REACT,"EBADF: bad file descriptor\n"));
00301         return (checkFDs ());
00302     }
00303     /*
00304       Any other error from select
00305     */
00306     EL((ERROR,"select(3) error\n"));
00307     return (false);
00308 }

int Reactor::isAnyReady void   )  [private]
 

Return number of file descriptors ready accross all sets.

Definition at line 312 of file Reactor.cpp.

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00313 {
00314     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00315 
00316     int n = m_readySet.m_rset.numSet () +
00317         m_readySet.m_wset.numSet () +
00318         m_readySet.m_eset.numSet ();
00319 
00320     if ( n > 0 ) {
00321         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00322         m_readySet.dump ();
00323     }
00324     return (n);
00325 }

Reactor& ASSA::Reactor::operator= const Reactor  )  [private]
 

no cloning

bool Reactor::registerIOHandler EventHandler eh_,
int  fd_,
EventType  et_ = RWE_EVENTS
 

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
fd_ File descriptor
et_ Event Type
Returns:
true if success, false if error

Definition at line 86 of file Reactor.cpp.

References Assure_return, DL, ASSA::ERROR, ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), m_readSet, ASSA::MaskSet::m_rset, m_waitSet, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open().

00087 {
00088     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00089 
00090     std::ostringstream msg;
00091     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00092 
00093     if ( isReadEvent (et_) ) {
00094 
00095         if ( !m_waitSet.m_rset.setFd (fd_) ) {
00096             DL((ERROR,"readset: fd %d out of range\n", fd_));
00097             return (false);
00098         }
00099         m_readSet[fd_] = eh_;
00100         msg << "READ_EVENT";
00101     }
00102 
00103     if ( isWriteEvent (et_) ) {
00104 
00105         if ( !m_waitSet.m_wset.setFd (fd_) ) {
00106             DL((ERROR,"writeset: fd %d out of range\n", fd_));
00107             return (false);
00108         }
00109         m_writeSet[fd_] = eh_;
00110         msg << " WRITE_EVENT";
00111     }
00112     if ( isExceptEvent (et_) ) {
00113         if ( !m_waitSet.m_eset.setFd (fd_) ) {
00114             DL((ERROR,"exceptset: fd %d out of range\n", fd_));
00115             return (false);
00116         }
00117         m_exceptSet[fd_] = eh_;
00118         msg << " EXCEPT_EVENT";
00119     }
00120     msg << std::ends;
00121     DL((REACT,"Registered EventHandler 0x%x FD (%d) for event(s) %s\n", 
00122         (u_long)eh_, fd_, msg.str ().c_str () ));
00123 
00124     if ( m_maxfd < fd_+1 ) {
00125         m_maxfd = fd_+1;
00126         DL((REACT,"maxfd+1 adjusted to %d\n",m_maxfd));
00127     }
00128     DL((REACT,"Modified waitSet:\n"));
00129     m_waitSet.dump ();
00130 
00131     return (true);
00132 }

TimerId Reactor::registerTimerHandler EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>"
 

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
tv_ Timeout value
name_ Name of the timer
Returns:
Timer ID that can be used to cancel timer and find out its name.

Definition at line 61 of file Reactor.cpp.

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00064 {
00065     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00066     Assure_return (eh_);
00067 
00068     TimeVal now (TimeVal::gettimeofday());
00069     TimeVal t (now + timeout_);
00070 
00071     DL((REACT,"TIMEOUT_EVENT: (%d,%d)\n",  timeout_.sec(),timeout_.msec()));
00072     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00073     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00074 
00075     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00076 
00077     DL((REACT,"---Modified Timer Queue----\n"));
00078     m_tqueue.dump();
00079     DL((REACT,"---------------------------\n"));
00080 
00081     return (tid);
00082 }

bool Reactor::removeHandler EventHandler eh_,
EventType  et_ = ALL_EVENTS
 

Remove Event handler from reactor for either all I/O events or timeout event or both.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters:
eh_ Pointer to the EventHandler
et_ Event Type to remove. Default will remove Event Handler for all events.
Returns:
true if success, false if wasn't registered for any events.

Definition at line 154 of file Reactor.cpp.

References ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), m_exceptSet, m_maxfd, m_readSet, m_tqueue, m_writeSet, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), removeIOHandler(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), and ASSA::RemoteLogger::log_close().

00155 {
00156     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00157 
00158     if (eh_ == NULL) {
00159         return false;
00160     }
00161     bool ret = false;
00162     register int fd;
00163 
00164     if (isTimeoutEvent (et_)) {
00165         ret = m_tqueue.remove (eh_);
00166     }
00167 
00168     if (isReadEvent (et_) || isWriteEvent (et_) || isExceptEvent (et_)) {
00169         for (fd = 0; fd < m_maxfd; fd++) {
00170             if (m_readSet[fd] == eh_ ||  m_writeSet[fd] == eh_ ||
00171                 m_exceptSet[fd] == eh_) {
00172                 ret = removeIOHandler (fd);
00173             }
00174         }
00175     }
00176     return (ret);
00177 }

bool Reactor::removeIOHandler int  fd_  ) 
 

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters:
fd_ File descriptor
Returns:
true on success, false if fd_ is out of range

Definition at line 181 of file Reactor.cpp.

References Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd, m_noFiles, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), dispatchHandler(), removeHandler(), and stopReactor().

00182 {
00183     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00184 
00185     Assure_return (fd_ >= 0 && fd_ < m_noFiles);
00186 
00187     DL((REACT,"Removing Handler fd = %d\n",fd_));
00188 
00189     EventHandler* eh = NULL;
00190 
00191     if      ( m_readSet[fd_] )   eh = m_readSet[fd_];
00192     else if ( m_writeSet[fd_] )  eh = m_writeSet[fd_];
00193     else if ( m_exceptSet[fd_] ) eh = m_exceptSet[fd_];
00194 
00195     if (eh) {
00196         DL((REACT,"Found EvtHandler 0x%x\n",(u_long)eh));
00197         eh->handle_close (fd_);
00198     }
00199 
00200     m_readSet[fd_] = NULL;
00201     m_writeSet[fd_] = NULL;
00202     m_exceptSet[fd_] = NULL;
00203 
00204     m_waitSet.m_rset.clear (fd_);
00205     m_waitSet.m_wset.clear (fd_);
00206     m_waitSet.m_eset.clear (fd_);
00207 
00208     m_readySet.m_rset.clear (fd_);
00209     m_readySet.m_wset.clear (fd_);
00210     m_readySet.m_eset.clear (fd_);
00211 
00212     if ( m_maxfd == fd_+1 ) {
00213         while ( m_maxfd > 0                    &&
00214                 m_readSet  [m_maxfd-1] == NULL &&
00215                 m_writeSet [m_maxfd-1] == NULL &&
00216                 m_exceptSet[m_maxfd-1] == NULL )
00217             {
00218                 m_maxfd--;
00219             }
00220     }
00221     DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd));
00222     DL((REACT,"Modifies waitSet:\n"));
00223     m_waitSet.dump ();
00224 
00225     return (true);
00226 }

bool Reactor::removeTimerHandler TimerId  id_  ) 
 

Remove Timer event from the queue.

This removes particular event.

Parameters:
id_ Timer Id returned by registerTimer.
Returns:
true if timer found and removed; false otherwise

Definition at line 136 of file Reactor.cpp.

References DL, ASSA::TimerQueue::dump(), EL, ASSA::ERROR, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write().

00137 {
00138     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00139     bool ret;
00140 
00141     if ((ret = m_tqueue.remove (tid_))) {
00142         DL((REACT,"---Modified Timer Queue----\n"));
00143         m_tqueue.dump();
00144         DL((REACT,"---------------------------\n"));
00145     }
00146     else {
00147         EL((ERROR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00148     }
00149     return (ret);
00150 }

void Reactor::stopReactor void   ) 
 

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 533 of file Reactor.cpp.

References m_active, m_maxfd, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

00534 { 
00535     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00536 
00537     m_active = false; 
00538     register int i;
00539     
00540     for (i = 0; i < m_maxfd; i++) {
00541         removeIOHandler (i);
00542     }
00543 }

void Reactor::waitForEvents TimeVal tv_  ) 
 

Wait for events for time specified.

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters:
tv_ [RW] is time to wait for.

Definition at line 400 of file Reactor.cpp.

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), and trace_with_mask.

00401 {
00402     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00403 
00404     TimerCountdown traceTime (tv_);
00405     DL((REACT,"======================================\n"));
00406 
00407     /*--- Expire all stale Timers ---*/
00408     m_tqueue.expire (TimeVal::gettimeofday ());
00409 
00410     /* Test to see if Reactor has been deactivated as a result
00411      * of processing done by any TimerHandlers.
00412      */
00413     if (!m_active) {
00414         return;
00415     }
00416 
00417     int nReady;
00418     TimeVal  delay;
00419     TimeVal* dlp = &delay;
00420 
00421     /*---
00422       In case if not all data are processed by the EventHandler,
00423       and EventHandler stated so in its callback's return value
00424       to dispatcher (), it will be called again. This way 
00425       underlying file/socket stream can efficiently utilize its
00426       buffering mechaninsm.
00427       ---*/
00428     if ((nReady = isAnyReady ())) {
00429         DL((REACT,"isAnyReady returned: %d\n",nReady));
00430         dispatch (nReady);
00431         return;
00432     }
00433 
00434     DL((REACT,"=== m_waitSet ===\n"));
00435     m_waitSet.dump ();
00436 
00437     do {
00438         m_readySet.reset ();
00439         m_readySet = m_waitSet;
00440         calculateTimeout (dlp, tv_);
00441 
00442         nReady = ::select (m_maxfd, 
00443                            &m_readySet.m_rset,
00444                            &m_readySet.m_wset, 
00445                            &m_readySet.m_eset, 
00446                            dlp);
00447         DL((REACT,"::select() returned: %d\n",nReady));
00448     } 
00449     while (nReady < 0 && handleError ());
00450 
00451     dispatch (nReady);
00452 }

void Reactor::waitForEvents void   ) 
 

Main waiting loop that blocks indefinitely processing events.

Definition at line 375 of file Reactor.cpp.

References m_active.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00376 {
00377     while ( m_active ) {
00378         waitForEvents ((TimeVal*) NULL);
00379     }
00380 }


Member Data Documentation

bool ASSA::Reactor::m_active [private]
 

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 195 of file Reactor.h.

Referenced by deactivate(), dispatchHandler(), handleError(), stopReactor(), and waitForEvents().

EventHandler** ASSA::Reactor::m_exceptSet [private]
 

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 204 of file Reactor.h.

Referenced by dispatch(), Reactor(), removeHandler(), removeIOHandler(), and ~Reactor().

int ASSA::Reactor::m_maxfd [private]
 

Max file descriptor number plus 1.

Definition at line 192 of file Reactor.h.

Referenced by dispatchHandler(), removeHandler(), removeIOHandler(), stopReactor(), and waitForEvents().

int ASSA::Reactor::m_noFiles [private]
 

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 189 of file Reactor.h.

Referenced by checkFDs(), Reactor(), and removeIOHandler().

EventHandler** ASSA::Reactor::m_readSet [private]
 

Event handlers awaiting on READ_EVENT.

Definition at line 198 of file Reactor.h.

Referenced by checkFDs(), dispatch(), Reactor(), registerIOHandler(), removeHandler(), removeIOHandler(), and ~Reactor().

MaskSet ASSA::Reactor::m_readySet [private]
 

Handlers that are ready for processing.

Definition at line 210 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

TimerQueue ASSA::Reactor::m_tqueue [private]
 

The queue of Timers.

Definition at line 213 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().

MaskSet ASSA::Reactor::m_waitSet [private]
 

Handlers to wait for event on.

Definition at line 207 of file Reactor.h.

Referenced by registerIOHandler(), removeIOHandler(), and waitForEvents().

EventHandler** ASSA::Reactor::m_writeSet [private]
 

Event handlers awaiting on WRITE_EVENT.

Definition at line 201 of file Reactor.h.

Referenced by dispatch(), Reactor(), removeHandler(), removeIOHandler(), and ~Reactor().


The documentation for this class was generated from the following files:
Generated on Wed Jun 21 01:42:52 2006 for libassa by  doxygen 1.4.6