00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef CONNECTOR_H
00013 #define CONNECTOR_H
00014
00015 #include <unistd.h>
00016 #include <fcntl.h>
00017 #include <sys/types.h>
00018 #include <sys/socket.h>
00019 #include <string.h>
00020 #include <errno.h>
00021
00022 #include "assa/Logger.h"
00023 #include "assa/EventHandler.h"
00024 #include "assa/Reactor.h"
00025 #include "assa/TimeVal.h"
00026 #include "assa/Address.h"
00027
00028 namespace ASSA {
00029
00037 enum ConnectMode {
00038 sync,
00039 async
00040 };
00041
00056 template<class SERVICE_HANDLER, class PEER_CONNECTOR>
00057 class Connector : public virtual EventHandler
00058 {
00059 public:
00061 Connector ();
00062
00064 virtual ~Connector ();
00065
00077 virtual int open (const TimeVal& tv_ = TimeVal (5.0),
00078 ConnectMode mode_ = sync,
00079 Reactor* r_ = (Reactor*)NULL);
00080
00085 virtual int close (void);
00086
00111 virtual int connect (SERVICE_HANDLER* sh_,
00112 Address& addr_,
00113 int protocol_ = AF_INET);
00114
00116 virtual int handle_write (int fd);
00117
00119 virtual int handle_timeout (TimerId tid);
00120
00121 protected:
00125 enum ProgressState {
00126 idle,
00127 waiting,
00128 conned,
00129 failed
00130 };
00131
00139 virtual SERVICE_HANDLER* makeServiceHandler (SERVICE_HANDLER* sh_);
00140
00146 virtual int connectServiceHandler (Address& addr, int protocol);
00147
00151 virtual int activateServiceHandler ();
00152
00153 protected:
00155 TimeVal m_timeout;
00156
00158 TimerId m_tid;
00159
00161 Reactor* m_reactor;
00162
00164 ProgressState m_state;
00165
00167 int m_flags;
00168
00170 SERVICE_HANDLER* m_sh;
00171
00173 int m_fd;
00174
00176 ConnectMode m_mode;
00177
00178 private:
00180 void doAsync (void);
00181
00185 int doSync (void);
00186 };
00187
00188
00189
00190 #define SH SERVICE_HANDLER
00191 #define PC PEER_CONNECTOR
00192
00193
00194
00195
00196
00197 template<class SH, class PC>
00198 Connector<SH, PC>::
00199 Connector ()
00200 : m_tid (0), m_reactor (0), m_state (idle),
00201 m_flags (0), m_sh ((SERVICE_HANDLER*)NULL), m_fd (-1), m_mode (sync)
00202 {
00203 trace_with_mask("Connector::Connector",SOCKTRACE);
00204 set_id ("Connector");
00205 }
00206
00207 template<class SH, class PC>
00208 Connector<SH, PC>::
00209 ~Connector ()
00210 {
00211 trace_with_mask("Connector::~Connector",SOCKTRACE);
00212
00213 }
00214
00215 template<class SH, class PC> int
00216 Connector<SH, PC>::
00217 open (const TimeVal& tv_, ConnectMode mode_, Reactor* r_)
00218 {
00219 trace_with_mask("Connector::open", SOCKTRACE);
00220
00221 m_timeout = tv_;
00222 if (async == mode_ && (Reactor*) NULL == r_)
00223 return -1;
00224 m_mode = mode_;
00225 m_reactor = r_;
00226 return 0;
00227 }
00228
00229 template<class SH, class PC> int
00230 Connector<SH, PC>::
00231 close ()
00232 {
00233 trace_with_mask("Connector::close",SOCKTRACE);
00234 return 0;
00235 }
00236
00237 template<class SH, class PC> int
00238 Connector<SH, PC>::
00239 connect (SH* sh_, Address& addr_, int protocol_family_)
00240 {
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251 trace_with_mask("Connector::connect",SOCKTRACE);
00252 errno = 0;
00253
00254 m_sh = makeServiceHandler (sh_);
00255 PEER_CONNECTOR& s = *m_sh;
00256
00257 if (addr_.bad ()) {
00258 errno = EFAULT;
00259 EL((ERROR,"Bad address (errno %d)\n", errno));
00260 return -1;
00261 }
00262
00263 if (connectServiceHandler (addr_, protocol_family_) == -1) {
00264
00265 if (errno == EINPROGRESS) {
00266
00267 if (async == m_mode) {
00268 doAsync ();
00269 return 0;
00270 }
00271 return doSync ();
00272 }
00273 return -1;
00274 }
00275
00276 fcntl (s.getHandler (), F_SETFL, m_flags);
00277
00278 return activateServiceHandler ();
00279 }
00280
00281 template<class SH, class PC> SERVICE_HANDLER*
00282 Connector<SH, PC>::
00283 makeServiceHandler (SERVICE_HANDLER* sh_)
00284 {
00285 trace_with_mask("Connector::makeServiceHandler",SOCKTRACE);
00286
00287 SERVICE_HANDLER* new_sh = sh_;
00288
00289 if (sh_ == 0) {
00290 new_sh = new SERVICE_HANDLER;
00291 }
00292 return new_sh;
00293 }
00294
00295 template<class SH, class PC> int
00296 Connector<SH, PC>::
00297 connectServiceHandler (Address& addr_, int protocol_family_)
00298 {
00299 trace_with_mask("Connector::connectServiceHandler",SOCKTRACE);
00300
00301 PEER_CONNECTOR& s = *m_sh;
00302
00303 if ( !s.open (protocol_family_) ) {
00304 EL((ERROR,"Socket::open (protocol=%d) failed\n",
00305 protocol_family_));
00306 return -1;
00307 }
00308
00309
00310 m_fd = s.getHandler ();
00311 m_flags = fcntl (m_fd, F_GETFL, 0);
00312 fcntl (m_fd, F_SETFL, m_flags | O_NONBLOCK);
00313
00314 return s.connect (addr_) ? 0 : -1;
00315 }
00316
00317 template<class SH, class PC> int
00318 Connector<SH, PC>::
00319 activateServiceHandler ()
00320 {
00321 trace_with_mask("Connector::activateServiceHandler",SOCKTRACE);
00322
00323 return m_sh->open ();
00324 }
00325
00326 template<class SH, class PC> void
00327 Connector<SH, PC>::
00328 doAsync (void)
00329 {
00330 trace_with_mask("Connector::doAsync",SOCKTRACE);
00331
00332
00333
00334
00335
00336
00337 m_reactor->registerIOHandler (this, m_fd, WRITE_EVENT);
00338
00339 m_tid = m_reactor->registerTimerHandler (this, m_timeout, "ASYNC Connect");
00340 m_state = waiting;
00341 }
00342
00343 template<class SH, class PC> int
00344 Connector<SH, PC>::
00345 doSync (void)
00346 {
00347 trace_with_mask("Connector::doSync",SOCKTRACE);
00348
00349 m_reactor = new Reactor;
00350
00351 m_reactor->registerIOHandler (this, m_fd, WRITE_EVENT);
00352 m_reactor->registerTimerHandler (this, m_timeout, "SYNC Connect");
00353 m_state = waiting;
00354
00355 m_reactor->waitForEvents (&m_timeout);
00356
00357 m_reactor->removeHandler (this);
00358 delete m_reactor;
00359 m_reactor = 0;
00360
00361 if (conned == m_state) {
00362 DL((SOCKTRACE,"Synchronous connect() completed\n"));
00363 fcntl (m_fd, F_SETFL, m_flags);
00364 return 0;
00365 }
00366 EL((ERROR,"Synchronous connect() timed out\n"));
00367 errno = ETIMEDOUT;
00368 return -1;
00369 }
00370
00371 template<class SH, class PC> int
00372 Connector<SH, PC>::
00373 handle_write (int fd_)
00374 {
00375 trace_with_mask("Connector::handle_write",SOCKTRACE);
00376
00377
00378
00379 if (fd_ != m_fd) {
00380 return -1;
00381 }
00382
00383
00384
00385
00386
00387
00388
00389 if (async == m_mode) {
00390 m_reactor->removeTimerHandler (m_tid);
00391 m_tid = 0;
00392 }
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407 int error;
00408 int ret;
00409 error = ret = errno = 0;
00410 socklen_t n = sizeof (error);
00411
00414 m_reactor->removeHandler (this, WRITE_EVENT);
00415
00416 #ifdef __CYGWIN32__
00417 ret = getsockopt (m_fd, SOL_SOCKET, SO_ERROR, (void*)&error, (int*)&n);
00418 #else
00419 ret = getsockopt (m_fd, SOL_SOCKET, SO_ERROR, (void*)&error, &n);
00420 #endif
00421
00422 if (ret == 0) {
00423 if (error == 0) {
00424 if (activateServiceHandler () == 0) {
00425 DL((SOCKTRACE,"Nonblocking connect() completed\n"));
00426 m_state = conned;
00427 }
00428 else {
00429 DL((SOCKTRACE,"Nonblocking connect() failed\n"));
00430 m_state = failed;
00431 }
00432 return (0);
00433 }
00434
00435 EL((ERROR,"Socket pending error: %d\n",error));
00436 errno = error;
00437 }
00438 else {
00439
00440 EL((ERROR,"getsockopt(3) = %d\n", ret));
00441 EL((ERROR,"Solaris pending error!\n"));
00442 }
00443 m_state = failed;
00444
00445 EL((ERROR,"Nonblocking connect (2) failed\n"));
00446
00447 if (errno == ECONNREFUSED) {
00448 EL((ERROR,"Try to compare port "
00449 "numbers on client and service hosts.\n"));
00450 }
00451
00452
00453 if (async == m_mode) {
00454 m_sh->close ();
00455 }
00456
00457
00458
00459
00460 return 0;
00461 }
00462
00463 template<class SH, class PC> int
00464 Connector<SH, PC>::
00465 handle_timeout (TimerId tid_)
00466 {
00467 trace_with_mask("Connector::handle_timeout",SOCKTRACE);
00468
00469 m_state = failed;
00470 errno = ETIMEDOUT;
00471
00472 if (async == m_mode) {
00473 m_reactor->removeHandler (this, WRITE_EVENT);
00474 }
00475 return -1;
00476 }
00477
00478 }
00479
00480 #endif