00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017
00018 #include "assa/Reactor.h"
00019
00020 using namespace ASSA;
00021
00022 Reactor::
00023 Reactor () :
00024 m_noFiles (1024), m_maxfd (0), m_active (true),
00025 m_readSet ((EventHandler**) NULL),
00026 m_writeSet ((EventHandler**) NULL),
00027 m_exceptSet ((EventHandler**) NULL)
00028 {
00029 trace_with_mask("Reactor::Reactor",REACTTRACE);
00030
00031 struct rlimit rlim;
00032 rlim.rlim_max = 0;
00033
00034 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00035 m_noFiles = rlim.rlim_cur;
00036 }
00037
00038 m_readSet = new EventHandler* [m_noFiles];
00039 m_writeSet = new EventHandler* [m_noFiles];
00040 m_exceptSet = new EventHandler* [m_noFiles];
00041
00042 for (int i = 0; i < m_noFiles; i++) {
00043 m_readSet[i] = NULL;
00044 m_writeSet[i] = NULL;
00045 m_exceptSet[i] = NULL;
00046 }
00047 }
00048
00049 Reactor::
00050 ~Reactor()
00051 {
00052 trace_with_mask("Reactor::~Reactor",REACTTRACE);
00053
00054 delete [] m_readSet;
00055 delete [] m_writeSet;
00056 delete [] m_exceptSet;
00057 }
00058
00059 TimerId
00060 Reactor::
00061 registerTimerHandler (EventHandler* eh_,
00062 const TimeVal& timeout_,
00063 const std::string& name_)
00064 {
00065 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00066 Assure_return (eh_);
00067
00068 TimeVal now (TimeVal::gettimeofday());
00069 TimeVal t (now + timeout_);
00070
00071 DL((REACT,"TIMEOUT_EVENT: (%d,%d)\n", timeout_.sec(),timeout_.msec()));
00072 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00073 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00074
00075 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_);
00076
00077 DL((REACT,"---Modified Timer Queue----\n"));
00078 m_tqueue.dump();
00079 DL((REACT,"---------------------------\n"));
00080
00081 return (tid);
00082 }
00083
00084 bool
00085 Reactor::
00086 registerIOHandler (EventHandler* eh_, int fd_, EventType et_)
00087 {
00088 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00089
00090 std::ostringstream msg;
00091 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00092
00093 if ( isReadEvent (et_) ) {
00094
00095 if ( !m_waitSet.m_rset.setFd (fd_) ) {
00096 DL((ERROR,"readset: fd %d out of range\n", fd_));
00097 return (false);
00098 }
00099 m_readSet[fd_] = eh_;
00100 msg << "READ_EVENT";
00101 }
00102
00103 if ( isWriteEvent (et_) ) {
00104
00105 if ( !m_waitSet.m_wset.setFd (fd_) ) {
00106 DL((ERROR,"writeset: fd %d out of range\n", fd_));
00107 return (false);
00108 }
00109 m_writeSet[fd_] = eh_;
00110 msg << " WRITE_EVENT";
00111 }
00112 if ( isExceptEvent (et_) ) {
00113 if ( !m_waitSet.m_eset.setFd (fd_) ) {
00114 DL((ERROR,"exceptset: fd %d out of range\n", fd_));
00115 return (false);
00116 }
00117 m_exceptSet[fd_] = eh_;
00118 msg << " EXCEPT_EVENT";
00119 }
00120 msg << std::ends;
00121 DL((REACT,"Registered EventHandler 0x%x FD (%d) for event(s) %s\n",
00122 (u_long)eh_, fd_, msg.str ().c_str () ));
00123
00124 if ( m_maxfd < fd_+1 ) {
00125 m_maxfd = fd_+1;
00126 DL((REACT,"maxfd+1 adjusted to %d\n",m_maxfd));
00127 }
00128 DL((REACT,"Modified waitSet:\n"));
00129 m_waitSet.dump ();
00130
00131 return (true);
00132 }
00133
00134 bool
00135 Reactor::
00136 removeTimerHandler (TimerId tid_)
00137 {
00138 trace_with_mask("Reactor::removeTimer",REACTTRACE);
00139 bool ret;
00140
00141 if ((ret = m_tqueue.remove (tid_))) {
00142 DL((REACT,"---Modified Timer Queue----\n"));
00143 m_tqueue.dump();
00144 DL((REACT,"---------------------------\n"));
00145 }
00146 else {
00147 EL((ERROR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00148 }
00149 return (ret);
00150 }
00151
00152 bool
00153 Reactor::
00154 removeHandler (EventHandler* eh_, EventType et_)
00155 {
00156 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00157
00158 if (eh_ == NULL) {
00159 return false;
00160 }
00161 bool ret = false;
00162 register int fd;
00163
00164 if (isTimeoutEvent (et_)) {
00165 ret = m_tqueue.remove (eh_);
00166 }
00167
00168 if (isReadEvent (et_) || isWriteEvent (et_) || isExceptEvent (et_)) {
00169 for (fd = 0; fd < m_maxfd; fd++) {
00170 if (m_readSet[fd] == eh_ || m_writeSet[fd] == eh_ ||
00171 m_exceptSet[fd] == eh_) {
00172 ret = removeIOHandler (fd);
00173 }
00174 }
00175 }
00176 return (ret);
00177 }
00178
00179 bool
00180 Reactor::
00181 removeIOHandler (int fd_)
00182 {
00183 trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00184
00185 Assure_return (fd_ >= 0 && fd_ < m_noFiles);
00186
00187 DL((REACT,"Removing Handler fd = %d\n",fd_));
00188
00189 EventHandler* eh = NULL;
00190
00191 if ( m_readSet[fd_] ) eh = m_readSet[fd_];
00192 else if ( m_writeSet[fd_] ) eh = m_writeSet[fd_];
00193 else if ( m_exceptSet[fd_] ) eh = m_exceptSet[fd_];
00194
00195 if (eh) {
00196 DL((REACT,"Found EvtHandler 0x%x\n",(u_long)eh));
00197 eh->handle_close (fd_);
00198 }
00199
00200 m_readSet[fd_] = NULL;
00201 m_writeSet[fd_] = NULL;
00202 m_exceptSet[fd_] = NULL;
00203
00204 m_waitSet.m_rset.clear (fd_);
00205 m_waitSet.m_wset.clear (fd_);
00206 m_waitSet.m_eset.clear (fd_);
00207
00208 m_readySet.m_rset.clear (fd_);
00209 m_readySet.m_wset.clear (fd_);
00210 m_readySet.m_eset.clear (fd_);
00211
00212 if ( m_maxfd == fd_+1 ) {
00213 while ( m_maxfd > 0 &&
00214 m_readSet [m_maxfd-1] == NULL &&
00215 m_writeSet [m_maxfd-1] == NULL &&
00216 m_exceptSet[m_maxfd-1] == NULL )
00217 {
00218 m_maxfd--;
00219 }
00220 }
00221 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd));
00222 DL((REACT,"Modifies waitSet:\n"));
00223 m_waitSet.dump ();
00224
00225 return (true);
00226 }
00227
00228 bool
00229 Reactor::
00230 checkFDs (void)
00231 {
00232 trace_with_mask("Reactor::checkFDs",REACTTRACE);
00233
00234 bool num_removed = false;
00235 FdSet mask;
00236 timeval poll = { 0, 0 };
00237
00238 for (int fd = 0; fd < m_noFiles; fd++) {
00239 if ( m_readSet[fd] != NULL ) {
00240 mask.setFd (fd);
00241 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00242 removeIOHandler (fd);
00243 num_removed = true;
00244 DL((REACT,"Detected BAD FD: %d\n", fd ));
00245 }
00246 mask.clear (fd);
00247 }
00248 }
00249 return (num_removed);
00250 }
00251
00252 bool
00253 Reactor::
00254 handleError (void)
00255 {
00256 trace_with_mask("Reactor::handleError",REACTTRACE);
00257
00258
00259 if ( !m_active ) {
00260 DL((REACT,"Received cmd to stop Reactor\n"));
00261 return (false);
00262 }
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282 if ( errno == EINTR ) {
00283 EL((REACT,"EINTR: interrupted select(2)\n"));
00284
00285
00286
00287
00288
00289
00290
00291 return (true);
00292 }
00293
00294
00295
00296
00297
00298
00299 if ( errno == EBADF ) {
00300 DL((REACT,"EBADF: bad file descriptor\n"));
00301 return (checkFDs ());
00302 }
00303
00304
00305
00306 EL((ERROR,"select(3) error\n"));
00307 return (false);
00308 }
00309
00310 int
00311 Reactor::
00312 isAnyReady (void)
00313 {
00314 trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00315
00316 int n = m_readySet.m_rset.numSet () +
00317 m_readySet.m_wset.numSet () +
00318 m_readySet.m_eset.numSet ();
00319
00320 if ( n > 0 ) {
00321 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00322 m_readySet.dump ();
00323 }
00324 return (n);
00325 }
00326
00327 void
00328 Reactor::
00329 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00330 {
00331 trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00332
00333 TimeVal now;
00334 TimeVal tv;
00335
00336 if (m_tqueue.isEmpty () ) {
00337 howlong_ = maxwait_;
00338 goto done;
00339 }
00340 now = TimeVal::gettimeofday ();
00341 tv = m_tqueue.top ();
00342
00343 if (tv < now) {
00344
00345
00346
00347
00348
00349 *howlong_ = 0;
00350 }
00351 else {
00352 DL((REACT,"--------- Timer Queue ----------\n"));
00353 m_tqueue.dump();
00354 DL((REACT,"--------------------------------\n"));
00355
00356 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00357 *howlong_ = tv - now;
00358 }
00359 else {
00360 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00361 }
00362 }
00363
00364 done:
00365 if (howlong_ != NULL) {
00366 DL((REACT,"delay (%f)\n", double (*howlong_) ));
00367 }
00368 else {
00369 DL((REACT,"delay (forever)\n"));
00370 }
00371 }
00372
00373 void
00374 Reactor::
00375 waitForEvents (void)
00376 {
00377 while ( m_active ) {
00378 waitForEvents ((TimeVal*) NULL);
00379 }
00380 }
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398 void
00399 Reactor::
00400 waitForEvents (TimeVal* tv_)
00401 {
00402 trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00403
00404 TimerCountdown traceTime (tv_);
00405 DL((REACT,"======================================\n"));
00406
00407
00408 m_tqueue.expire (TimeVal::gettimeofday ());
00409
00410
00411
00412
00413 if (!m_active) {
00414 return;
00415 }
00416
00417 int nReady;
00418 TimeVal delay;
00419 TimeVal* dlp = &delay;
00420
00421
00422
00423
00424
00425
00426
00427
00428 if ((nReady = isAnyReady ())) {
00429 DL((REACT,"isAnyReady returned: %d\n",nReady));
00430 dispatch (nReady);
00431 return;
00432 }
00433
00434 DL((REACT,"=== m_waitSet ===\n"));
00435 m_waitSet.dump ();
00436
00437 do {
00438 m_readySet.reset ();
00439 m_readySet = m_waitSet;
00440 calculateTimeout (dlp, tv_);
00441
00442 nReady = ::select (m_maxfd,
00443 &m_readySet.m_rset,
00444 &m_readySet.m_wset,
00445 &m_readySet.m_eset,
00446 dlp);
00447 DL((REACT,"::select() returned: %d\n",nReady));
00448 }
00449 while (nReady < 0 && handleError ());
00450
00451 dispatch (nReady);
00452 }
00453
00454 void
00455 Reactor::
00456 dispatchHandler (FdSet& mask_, EventHandler** fdSet_, EH_IO_Callback callback_)
00457 {
00458 trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00459
00460 register int fd;
00461 register int ret;
00462
00463
00464
00465
00466
00467
00468
00469
00470 for (fd = 0; m_active && fd < m_maxfd; fd++) {
00471 if (mask_.isSet (fd) && fdSet_[fd] != NULL) {
00472
00473 DL((REACT,"Data detected on connection %s (FD=%d)\n",
00474 fdSet_[fd]->get_id ().c_str (), fd));
00475
00476 if ((ret = (fdSet_[fd]->*callback_) (fd)) == -1) {
00477 removeIOHandler (fd);
00478 }
00479 else if (ret > 0) {
00480 DL((REACT,"More data (%d bytes) are pending on FD=%d\n",
00481 ret,fd));
00482
00483 }
00484 else {
00485 DL((REACT,"All data are consumed from FD=%d\n", fd));
00486 mask_.clear (fd);
00487 }
00488 }
00489 }
00490 }
00491
00492 bool
00493 Reactor::
00494 dispatch (int ready_)
00495 {
00496
00497
00498
00499
00500
00501 trace_with_mask("Reactor::dispatch", REACTTRACE);
00502
00503 m_tqueue.expire (TimeVal::gettimeofday ());
00504
00505 if ( ready_ < 0 ) {
00506 EL((ERROR,"::select(3) error\n"));
00507 return (false);
00508 }
00509 if ( ready_ == 0 ) {
00510 return (true);
00511 }
00512 DL((REACT,"Dispatching %d FDs\n",ready_));
00513
00514
00515 dispatchHandler (m_readySet.m_wset,
00516 m_writeSet,
00517 &EventHandler::handle_write);
00518
00519
00520 dispatchHandler (m_readySet.m_eset,
00521 m_exceptSet,
00522 &EventHandler::handle_except);
00523
00524
00525 dispatchHandler (m_readySet.m_rset,
00526 m_readSet,
00527 &EventHandler::handle_read);
00528 return (true);
00529 }
00530
00531 void
00532 Reactor::
00533 stopReactor (void)
00534 {
00535 trace_with_mask("Reactor::stopReactor", REACTTRACE);
00536
00537 m_active = false;
00538 register int i;
00539
00540 for (i = 0; i < m_maxfd; i++) {
00541 removeIOHandler (i);
00542 }
00543 }
00544
00545