00001
00002
00003
00004
00005
00006
00007
00008 #include "Comm/AddressClient.h"
00009
00010 #include <string>
00011
00012 AddressClient::AddressClient (const ACE_INET_Addr &addrServer)
00013 : m_addrServer(addrServer),
00014 m_bindingMutex(),
00015 m_bindings(),
00016 m_pendingReplyMutex(),
00017 m_pendingReplyQueue(m_pendingReplyMutex),
00018 m_notificationID(-1),
00019 m_pingMutex(),
00020 m_pingCondition(m_pingMutex),
00021 m_timeout(0,0)
00022 {
00023 m_socket.open(ACE_INET_Addr());
00024 }
00025
00026
00027
00028 AddressClient::~AddressClient (void)
00029 {
00030 m_socket.close();
00031 }
00032
00033
00034 int AddressClient::RegisterWithReactor(ACE_Reactor &theReactor)
00035 {
00036 int retVal = theReactor.register_handler (this,
00037 ACE_Event_Handler::READ_MASK);
00038
00039 if (retVal == -1)
00040 {
00041 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "register_handler"));
00042 }
00043
00044 return retVal;
00045 }
00046
00047
00048 int AddressClient::RemoveFromReactor(ACE_Reactor &theReactor, bool immediate)
00049 {
00050 if (!immediate)
00051 {
00052 RemoveServerCallback();
00053 }
00054
00055 int retVal = theReactor.remove_handler (this,
00056 ACE_Event_Handler::READ_MASK);
00057
00058 if (retVal == -1)
00059 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "remove_handler"));
00060
00061 return retVal;
00062 }
00063
00064
00065
00066 void AddressClient::SetTimeout(const ACE_Time_Value &tv)
00067 {
00068 m_timeout = tv;
00069 }
00070
00071
00072
00073 int AddressClient::PingServer()
00074 {
00075
00076
00077
00078
00079
00080 static const ACE_Time_Value PING_TIMEOUT(1,0);
00081
00082
00083
00084 ACE_Guard<ACE_Thread_Mutex> g(m_pingMutex);
00085
00086
00087
00088 int retVal = this->m_socket.send("PING", 4, m_addrServer);
00089 if (retVal == -1)
00090 {
00091 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n","send"), -1);
00092 }
00093
00094
00095
00096 ACE_Time_Value deadline = ACE_OS::gettimeofday() + PING_TIMEOUT;
00097
00098
00099
00100
00101 return m_pingCondition.wait(&deadline);
00102 }
00103
00104
00105
00106 int AddressClient::RegisterServerCallback()
00107 {
00108
00109
00110 if (m_notificationID != -1)
00111 {
00112 return -1;
00113 }
00114
00115
00116
00117 int status = -1;
00118 for (int i = 0; status == -1 && i < 1; ++i)
00119 {
00120 status = PingServer();
00121 }
00122
00123
00124
00125 if (status == -1)
00126 {
00127 ACE_ERROR_RETURN((LM_ERROR,
00128 "(%N:%l) RegisterServerCallback failed because the address server"
00129 " won't respond.\n"), -1);
00130 }
00131
00132
00133
00134 std::string theReply;
00135 status = SendCommand("ADD_NOTIFY", theReply);
00136
00137
00138
00139 u_int myID;
00140 if (status == -1 || sscanf(theReply.c_str(), "YOU_ARE %u", &myID) != 1)
00141 {
00142 return -1;
00143 }
00144
00145
00146
00147 m_notificationID = myID;
00148
00149 return 0;
00150 }
00151
00152
00153
00154 int AddressClient::RemoveServerCallback()
00155 {
00156
00157
00158 if (m_notificationID == -1)
00159 {
00160 return -1;
00161 }
00162
00163
00164
00165 int status = -1;
00166 for (int i = 0; status == -1 && i < 5; ++i)
00167 {
00168 status = PingServer();
00169 }
00170
00171
00172
00173 if (status == -1)
00174 {
00175 ACE_ERROR_RETURN((LM_WARNING,
00176 "(%N:%l) RemoveServerCallback failed because the address server"
00177 " won't respond.\n"), -1);
00178 }
00179
00180
00181
00182 u_int id = m_notificationID;
00183 char sendBuf[32];
00184 sprintf(sendBuf, "REMOVE_NOTIFY %u", id);
00185
00186
00187
00188 std::string theReply;
00189 status = SendCommand(sendBuf, theReply);
00190
00191
00192
00193 if (status == -1 || theReply != "OK")
00194 {
00195 return -1;
00196 }
00197
00198
00199
00200 m_notificationID = -1;
00201
00202 return 0;
00203 }
00204
00205
00206 int AddressClient::ClearServerCallbacks()
00207 {
00208 int retVal = -1;
00209
00210 std::string theReply;
00211 retVal = SendCommand("CLEAR_NOTIFY", theReply);
00212
00213 if (theReply == "OK")
00214 {
00215 m_notificationID = -1;
00216 retVal = 0;
00217 }
00218
00219 return retVal;
00220 }
00221
00222
00223
00224 int AddressClient::GetBindings(u_int dest, AddrList_t &theList)
00225 {
00226
00227
00228
00229 {
00230 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00231
00232 BindMap_t::iterator it = m_bindings.find(dest);
00233
00234 if (it != m_bindings.end())
00235 {
00236 theList = it->second;
00237 return 0;
00238 }
00239 }
00240
00241
00242
00243
00244
00245
00246
00247
00248 char sendBuf[25];
00249 sprintf(sendBuf, "GET %u", dest);
00250
00251 std::string theReply;
00252 if (SendCommand(sendBuf, theReply) == -1)
00253 {
00254 return -1;
00255 }
00256
00257
00258 if (theReply == "NAK")
00259 {
00260 return -1;
00261 }
00262
00263 theList.clear();
00264
00265
00266
00267 if (theReply.size() > 0)
00268 {
00269
00270
00271 size_t comma = theReply.find_last_of(',');
00272
00273 while (comma != std::string::npos)
00274 {
00275
00276
00277 const char *binding = theReply.c_str() + (comma + 1);
00278
00279
00280
00281 theList.push_back(ACE_INET_Addr(binding));
00282
00283
00284
00285 theReply.erase(comma);
00286
00287
00288
00289 comma = theReply.find_last_of(',');
00290 }
00291
00292
00293 theList.push_back(ACE_INET_Addr(theReply.c_str()));
00294 }
00295
00296 {
00297 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00298
00299
00300
00301
00302
00303 BindMap_t::iterator it = m_bindings.insert( BindValue_t(dest, AddrList_t()) ).first;
00304 it->second = theList;
00305 }
00306
00307 return 0;
00308 }
00309
00310
00311
00312 int AddressClient::AddBinding(u_int id, const char *binding)
00313 {
00314 int retVal = -1;
00315
00316 char sendBuf[25];
00317 sprintf(sendBuf, "ADD %u %s", id, binding);
00318
00319 std::string theReply;
00320 if (SendCommand(sendBuf, theReply) == -1)
00321 {
00322 return -1;
00323 }
00324
00325 if (theReply == "OK")
00326 {
00327 retVal = 0;
00328 }
00329
00330 return retVal;
00331 }
00332
00333
00334
00335 int AddressClient::RemoveBinding(u_int id, const char *binding)
00336 {
00337 int retVal = -1;
00338
00339 char sendBuf[25];
00340 sprintf(sendBuf, "REMOVE %u %s", id, binding);
00341
00342 std::string theReply;
00343 if (SendCommand(sendBuf, theReply) == -1)
00344 {
00345 return -1;
00346 }
00347
00348 if (theReply == "OK")
00349 {
00350 retVal = 0;
00351 }
00352
00353 return retVal;
00354 }
00355
00356
00357
00358 int AddressClient::ClearBindings()
00359 {
00360 int retVal = -1;
00361
00362 std::string theReply;
00363 if (SendCommand("CLEAR_BINDINGS", theReply) == -1)
00364 {
00365 return -1;
00366 }
00367
00368 if (theReply == "OK")
00369 {
00370 retVal = 0;
00371 }
00372
00373 return retVal;
00374 }
00375
00376
00377
00378
00379 ACE_HANDLE AddressClient::get_handle (void) const
00380 {
00381 return this->m_socket.get_handle ();
00382 }
00383
00384
00385
00386 int AddressClient::handle_input (ACE_HANDLE)
00387 {
00388 ACE_INET_Addr remoteAddr;
00389
00390 int len = m_socket.recv(m_buf, sizeof(m_buf)-1, remoteAddr);
00391
00392
00393
00394
00395
00396
00397 if (len == -1)
00398 {
00399 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "recv"), -1);
00400 }
00401
00402
00403
00404 m_buf[len] = '\0';
00405
00406
00407
00408 u_int objAddr;
00409
00410
00411
00412
00413 if (strncmp("CHANGED_ALL", m_buf, 11) == 0)
00414 {
00415 ACE_DEBUG((LM_INFO,
00416 "(%N:%l) Address server changed. Clearing cache.\n"));
00417
00418 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00419
00420
00421
00422 m_bindings.clear();
00423 }
00424
00425
00426
00427 else if (sscanf(m_buf, "CHANGED %u", &objAddr) == 1)
00428 {
00429 ACE_DEBUG((LM_INFO,
00430 "(%N:%l) Address server changed. Clearing entries for %u.\n",
00431 objAddr));
00432
00433 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00434
00435
00436
00437 m_bindings.erase(objAddr);
00438 }
00439
00440
00441
00442 else if (strncmp("ACK", m_buf, 3) == 0)
00443 {
00444 ACE_Guard<ACE_Thread_Mutex> g(m_pingMutex);
00445
00446 m_pingCondition.broadcast();
00447 }
00448
00449
00450
00451 else
00452 {
00453 ACE_Guard<ACE_Thread_Mutex> g(m_pendingReplyQueue.GetMutex());
00454
00455 if (m_pendingReplyQueue.IsEmpty_i())
00456 {
00457 ACE_ERROR((LM_WARNING, "Got spurious message: %s\n", m_buf));
00458 }
00459 else
00460 {
00461 std::string buf(m_buf);
00462 m_pendingReplyQueue.SetReply_i(buf);
00463 }
00464 }
00465
00466 return 0;
00467 }
00468
00469
00470
00471 int AddressClient::SendCommand(const char *str, std::string &reply)
00472 {
00473
00474
00475 int status = -1;
00476
00477 {
00478 ACE_Guard<ACE_Thread_Mutex> g(m_pendingReplyQueue.GetMutex());
00479
00480
00481
00482 status = this->m_socket.send(str, strlen(str), m_addrServer);
00483 if (status == -1)
00484 {
00485 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n","send"), -1);
00486 }
00487
00488
00489
00490 if (m_timeout == ACE_Time_Value::zero)
00491 {
00492 status = m_pendingReplyQueue.GetReply_i(reply);
00493 }
00494
00495
00496
00497 else
00498 {
00499 ACE_Time_Value deadline = ACE_OS::gettimeofday() + m_timeout;
00500 status = m_pendingReplyQueue.GetReply_i(reply, &deadline);
00501 }
00502 }
00503
00504
00505
00506
00507 if (status == -1)
00508 {
00509
00510
00511
00512 if (ACE_OS::last_error() == ETIME)
00513 {
00514 ACE_ERROR_RETURN((LM_ERROR,
00515 "(%N:%l) Reply to command '%s' timed out.\n", str), -1);
00516 }
00517 else
00518 {
00519 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "acquire"), -1);
00520 }
00521 }
00522
00523 return 0;
00524 }
00525
00526