Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages | Examples

AddressClient.cpp

Go to the documentation of this file.
00001 ////////////////////////////////////////////////////////////////////////////////
00002 /*! \file AddressClient.cpp
00003 *  \brief Defines the class that communicates with an address server.
00004 *  \author $Author: rsharo $
00005 *  \version $Revision: 1.1 $
00006 *  \date    $Date: 2004/04/07 13:45:03 $
00007 *///////////////////////////////////////////////////////////////////////////////
00008 #include "Comm/AddressClient.h"
00009 
00010 #include <string>
00011 
00012 AddressClient::AddressClient (const ACE_INET_Addr &addrServer)
00013         : m_addrServer(addrServer),
00014           m_bindingMutex(),
00015           m_bindings(),
00016           m_pendingReplyMutex(),
00017           m_pendingReplyQueue(m_pendingReplyMutex),
00018           m_notificationID(-1),
00019           m_pingMutex(),
00020           m_pingCondition(m_pingMutex),
00021           m_timeout(0,0)
00022 {
00023         m_socket.open(ACE_INET_Addr());
00024 }
00025 
00026 
00027 
00028 AddressClient::~AddressClient (void)
00029 {
00030         m_socket.close();
00031 }
00032 
00033 
00034 int AddressClient::RegisterWithReactor(ACE_Reactor &theReactor)
00035 {
00036         int retVal = theReactor.register_handler (this,
00037                 ACE_Event_Handler::READ_MASK);
00038                 
00039         if (retVal == -1)
00040         {
00041                 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "register_handler"));
00042         }
00043 
00044         return retVal;
00045 }
00046 
00047 
00048 int AddressClient::RemoveFromReactor(ACE_Reactor &theReactor, bool immediate)
00049 {
00050         if (!immediate)
00051         {
00052                 RemoveServerCallback();
00053         }
00054 
00055         int retVal = theReactor.remove_handler (this,
00056                 ACE_Event_Handler::READ_MASK);
00057                 
00058         if (retVal == -1)
00059                 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "remove_handler"));
00060 
00061         return retVal;
00062 }
00063 
00064 
00065 
00066 void AddressClient::SetTimeout(const ACE_Time_Value &tv)
00067 {
00068         m_timeout = tv;
00069 }
00070 
00071 
00072 
00073 int AddressClient::PingServer()
00074 {
00075         //
00076         // Unlike other commands that use the m_timeout member to determine
00077         // how long to wait, PingServer() always waits two seconds.  This should
00078         // be plenty of time and prevents deadlock when m_timeout is zero (wait
00079         // forever).
00080         static const ACE_Time_Value PING_TIMEOUT(1,0);
00081 
00082         //
00083         // Acquire the mutex to lock out other callers.
00084         ACE_Guard<ACE_Thread_Mutex> g(m_pingMutex);
00085 
00086         //
00087         // Try to send the ping command to the address server.
00088         int retVal = this->m_socket.send("PING", 4, m_addrServer);
00089         if (retVal == -1)
00090         {
00091                 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n","send"), -1);
00092         }
00093 
00094         //
00095         // Set the deadline for one second from now.
00096         ACE_Time_Value deadline = ACE_OS::gettimeofday() + PING_TIMEOUT;
00097 
00098         //
00099         // Return 0 if handle_input() reports an "ACK" response,
00100         // -1 if we timeout or experience an error.
00101         return m_pingCondition.wait(&deadline);
00102 }
00103 
00104 
00105 
00106 int AddressClient::RegisterServerCallback()
00107 {
00108         //
00109         // If we are already registered, don't try to register again.
00110         if (m_notificationID != -1)
00111         {
00112                 return -1;
00113         }
00114 
00115         //
00116         // First, make sure the server is there so we don't deadlock.
00117         int status = -1;
00118         for (int i = 0; status == -1 && i < 1; ++i)
00119         {
00120                 status = PingServer();
00121         }
00122 
00123         //
00124         // If we can't get an answer from the server, then give up.
00125         if (status == -1)
00126         {
00127                 ACE_ERROR_RETURN((LM_ERROR,
00128                         "(%N:%l) RegisterServerCallback failed because the address server"
00129                         " won't respond.\n"), -1);
00130         }
00131 
00132         //
00133         // Send the command and wait for reply.
00134         std::string theReply;
00135         status = SendCommand("ADD_NOTIFY", theReply);
00136 
00137         //
00138         // Try to get our id from the reply.
00139         u_int myID;
00140         if (status == -1 || sscanf(theReply.c_str(), "YOU_ARE %u", &myID) != 1)
00141         {
00142                 return -1;
00143         }
00144 
00145         //
00146         // Store our id.
00147         m_notificationID = myID;
00148 
00149         return 0;
00150 }
00151 
00152 
00153 
00154 int AddressClient::RemoveServerCallback()
00155 {
00156         //
00157         // If we're aren't registered, then don't try to remove.
00158         if (m_notificationID == -1)
00159         {
00160                 return -1;
00161         }
00162 
00163         //
00164         // First, make sure the server is there so we don't deadlock.
00165         int status = -1;
00166         for (int i = 0; status == -1 && i < 5; ++i)
00167         {
00168                 status = PingServer();
00169         }
00170 
00171         //
00172         // If we can't get an answer from the server, then give up.
00173         if (status == -1)
00174         {
00175                 ACE_ERROR_RETURN((LM_WARNING,
00176                         "(%N:%l) RemoveServerCallback failed because the address server"
00177                         " won't respond.\n"), -1);
00178         }
00179 
00180         //
00181         // We'll be sending the "REMOVE_NOTIFY" command with our ID.
00182         u_int id = m_notificationID;
00183         char sendBuf[32];
00184         sprintf(sendBuf, "REMOVE_NOTIFY %u", id);
00185 
00186         //
00187         // Send the command to the address server and wait for the reply.
00188         std::string theReply;
00189         status = SendCommand(sendBuf, theReply);
00190 
00191         //
00192         // If the command failed, return an error.
00193         if (status == -1 || theReply != "OK")
00194         {
00195                 return -1;
00196         }
00197 
00198         //
00199         // Clear our id.
00200         m_notificationID = -1;
00201 
00202         return 0;
00203 }
00204 
00205 
00206 int AddressClient::ClearServerCallbacks()
00207 {
00208         int retVal = -1;
00209 
00210         std::string theReply;
00211         retVal = SendCommand("CLEAR_NOTIFY", theReply);
00212 
00213         if (theReply == "OK")
00214         {
00215                 m_notificationID = -1;
00216                 retVal = 0;
00217         }
00218 
00219         return retVal;
00220 }
00221 
00222 
00223 
00224 int AddressClient::GetBindings(u_int dest, AddrList_t &theList)
00225 {
00226 
00227         //
00228         // Try to get the binding from the cache.
00229         {
00230                 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00231 
00232                 BindMap_t::iterator it = m_bindings.find(dest);
00233 
00234                 if (it != m_bindings.end())
00235                 {
00236                         theList = it->second;
00237                         return 0;
00238                 }
00239         }
00240 
00241         
00242         //
00243         // We didn't get the binding from the cache. Try to get it
00244         // from the address server (and write it to the cache at the
00245         // same time).
00246         //
00247         // Send a "GET" command to the address server.
00248         char sendBuf[25];
00249         sprintf(sendBuf, "GET %u", dest);
00250         
00251         std::string theReply;
00252         if (SendCommand(sendBuf, theReply) == -1)
00253         {
00254                 return -1;
00255         }
00256         
00257         
00258         if (theReply == "NAK")
00259         {
00260                 return -1;
00261         }
00262         
00263         theList.clear();
00264         
00265         //
00266         // Iterate through the reply and fill the vector with the bindings.
00267         if (theReply.size() > 0)
00268         {
00269                 //
00270                 // Scan backward through the comma-separated entries.
00271                 size_t comma = theReply.find_last_of(',');
00272                 
00273                 while (comma != std::string::npos)
00274                 {
00275                         //
00276                         // The binding is everything following the comma.
00277                         const char *binding = theReply.c_str() + (comma + 1);
00278                         
00279                         //
00280                         // Store the binding
00281                         theList.push_back(ACE_INET_Addr(binding));
00282                         
00283                         //
00284                         // Remove the comma and everything following it.
00285                         theReply.erase(comma);
00286                         
00287                         //
00288                         // Prepare for next iteration.
00289                         comma = theReply.find_last_of(',');
00290                 }
00291                 
00292                 // Store the final binding.
00293                 theList.push_back(ACE_INET_Addr(theReply.c_str()));
00294         }
00295         
00296         {
00297                 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00298                 
00299                 //
00300                 // Insert an entry into the bindings and assign the list into it.
00301                 // By passing an empty address list and then
00302                 // assigning it, we save one copy construction.
00303                 BindMap_t::iterator it = m_bindings.insert( BindValue_t(dest, AddrList_t()) ).first;
00304                 it->second = theList;
00305         }
00306 
00307         return 0;
00308 }
00309 
00310 
00311 
00312 int AddressClient::AddBinding(u_int id, const char *binding)
00313 {
00314         int retVal = -1;
00315 
00316         char sendBuf[25];
00317         sprintf(sendBuf, "ADD %u %s", id, binding);
00318 
00319         std::string theReply;
00320         if (SendCommand(sendBuf, theReply) == -1)
00321         {
00322                 return -1;
00323         }
00324 
00325         if (theReply == "OK")
00326         {
00327                 retVal = 0;
00328         }
00329 
00330         return retVal;
00331 }
00332 
00333 
00334 
00335 int AddressClient::RemoveBinding(u_int id, const char *binding)
00336 {
00337         int retVal = -1;
00338 
00339         char sendBuf[25];
00340         sprintf(sendBuf, "REMOVE %u %s", id, binding);
00341 
00342         std::string theReply;
00343         if (SendCommand(sendBuf, theReply) == -1)
00344         {
00345                 return -1;
00346         }
00347 
00348         if (theReply == "OK")
00349         {
00350                 retVal = 0;
00351         }
00352 
00353         return retVal;
00354 }
00355 
00356 
00357 
00358 int AddressClient::ClearBindings()
00359 {
00360         int retVal = -1;
00361 
00362         std::string theReply;
00363         if (SendCommand("CLEAR_BINDINGS", theReply) == -1)
00364         {
00365                 return -1;
00366         }
00367 
00368         if (theReply == "OK")
00369         {
00370                 retVal = 0;
00371         }
00372 
00373         return retVal;
00374 }
00375 
00376 
00377 
00378 
00379 ACE_HANDLE AddressClient::get_handle (void) const
00380 {
00381         return this->m_socket.get_handle ();
00382 }
00383 
00384 
00385 
00386 int AddressClient::handle_input (ACE_HANDLE)
00387 {
00388         ACE_INET_Addr remoteAddr;
00389 
00390         int len = m_socket.recv(m_buf, sizeof(m_buf)-1, remoteAddr);
00391 
00392         //
00393         // If the receive call failed, report the error and
00394         // return -1.
00395         // NOTE: A return value of -1 means the AddressClient is 
00396         // deregistered with the Reactor.
00397         if (len == -1)
00398         {
00399                 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "recv"), -1);
00400         }
00401 
00402         //
00403         // Null-terminate the received string.
00404         m_buf[len] = '\0';
00405 
00406         //
00407         // This object address is needed for "CHANGED" commands.
00408         u_int objAddr;
00409 
00410         //
00411         // If the received message is "CHANGED_ALL" then all the cached
00412         // bindings are invalidated.
00413         if (strncmp("CHANGED_ALL", m_buf, 11) == 0)
00414         {
00415                 ACE_DEBUG((LM_INFO,
00416                         "(%N:%l) Address server changed. Clearing cache.\n"));
00417 
00418                 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00419                 
00420                 //
00421                 // Clear all the bindings.
00422                 m_bindings.clear();
00423         }
00424         //
00425         // If the received message is "CHANGED" then the cached bindings
00426         // with the given object address are invalidated.
00427         else if (sscanf(m_buf, "CHANGED %u", &objAddr) == 1)
00428         {
00429                 ACE_DEBUG((LM_INFO,
00430                         "(%N:%l) Address server changed. Clearing entries for %u.\n",
00431                         objAddr));
00432 
00433                 ACE_Guard<ACE_Thread_Mutex> g(m_bindingMutex);
00434                 
00435                 //
00436                 // If we have bindings for this address, remove them.
00437                 m_bindings.erase(objAddr);
00438         }
00439         //
00440         // If the message is "ACK", then someone pinged the server and wants to
00441         // know its alive. Tell them it is.
00442         else if (strncmp("ACK", m_buf, 3) == 0)
00443         {
00444                 ACE_Guard<ACE_Thread_Mutex> g(m_pingMutex);
00445 
00446                 m_pingCondition.broadcast();
00447         }
00448         //
00449         // Otherwise, assume the command is solicited. Route it to the
00450         // first pending reader.
00451         else 
00452         {
00453                 ACE_Guard<ACE_Thread_Mutex> g(m_pendingReplyQueue.GetMutex());
00454 
00455                 if (m_pendingReplyQueue.IsEmpty_i())
00456                 {
00457                         ACE_ERROR((LM_WARNING, "Got spurious message: %s\n", m_buf));
00458                 }
00459                 else
00460                 {
00461                         std::string buf(m_buf);
00462                         m_pendingReplyQueue.SetReply_i(buf);
00463                 }
00464         }
00465 
00466         return 0;
00467 }
00468 
00469 
00470 
00471 int AddressClient::SendCommand(const char *str, std::string &reply)
00472 {
00473         //
00474         // A place to hold the result of method calls.
00475         int status = -1;
00476 
00477         {
00478                 ACE_Guard<ACE_Thread_Mutex> g(m_pendingReplyQueue.GetMutex());
00479 
00480                 //
00481                 // Try to send the command over the network.
00482                 status = this->m_socket.send(str, strlen(str), m_addrServer);
00483                 if (status == -1)
00484                 {
00485                         ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n","send"), -1);
00486                 }       
00487                 
00488                 //
00489                 // If we have no timeout, wait indefinitely for the reply to arrive.
00490                 if (m_timeout == ACE_Time_Value::zero)
00491                 {
00492                         status = m_pendingReplyQueue.GetReply_i(reply);
00493                 }
00494                 //
00495                 // Otherwise, wait until either the reply arrives or the timeout
00496                 // expires.
00497                 else
00498                 {
00499                         ACE_Time_Value deadline = ACE_OS::gettimeofday() + m_timeout;
00500                         status = m_pendingReplyQueue.GetReply_i(reply, &deadline);
00501                 }
00502         }
00503 
00504         //
00505         // If we failed to get the semaphore for any reason, log the error
00506         // and return failure.
00507         if (status == -1)
00508         {
00509                 //
00510                 // If the command failed due to timeout, log a more descriptive
00511                 // error.  Other errors get the system-defined error message.
00512                 if (ACE_OS::last_error() == ETIME)
00513                 {
00514                         ACE_ERROR_RETURN((LM_ERROR,
00515                                 "(%N:%l) Reply to command '%s' timed out.\n", str), -1);
00516                 }
00517                 else
00518                 {
00519                         ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "acquire"), -1);
00520                 }
00521         }
00522 
00523         return 0;
00524 }
00525 
00526 

Generated on Wed Sep 5 12:54:17 2007 for DSACSS Operational Code by  doxygen 1.3.9.1