Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages | Examples

MessageServer.cpp

Go to the documentation of this file.
00001 ////////////////////////////////////////////////////////////////////////////////
00002 /*! \file MessageServer.cpp
00003 *  \brief Defines the class that routes messages around the system.
00004 *  \author $Author: rsharo $
00005 *  \version $Revision: 1.1 $
00006 *  \date    $Date: 2004/04/07 13:45:04 $
00007 *///////////////////////////////////////////////////////////////////////////////
00008 #include "Comm/MessageServer.h"
00009 #include "Comm/MessageClient.h"
00010 
00011 
00012 
00013 
00014 
00015 MessageServer::MessageServer (const ACE_INET_Addr &addrServer, const ACE_INET_Addr &localAddr)
00016         : m_pAddrClient(new AddressClient(addrServer)),
00017           m_addrClientOwned(true),
00018           m_localAddr()
00019 {
00020         //
00021         // Open with the specified binding (which could have wildcards).
00022         m_socket.open(localAddr);
00023 
00024         //
00025         // Get the complete binding from the socket.
00026         m_socket.get_local_addr(m_localAddr);
00027 }
00028 
00029 MessageServer::MessageServer (AddressClient *addrClient, bool takeOwnership,
00030                 const ACE_INET_Addr &localAddr)
00031         : m_pAddrClient(addrClient),
00032           m_addrClientOwned(takeOwnership),
00033           m_localAddr()
00034 {
00035         //
00036         // Open with the specified binding (which could have wildcards).
00037         m_socket.open(localAddr);
00038 
00039         //
00040         // Get the complete binding from the socket.
00041         m_socket.get_local_addr(m_localAddr);
00042 }
00043 
00044 
00045 
00046 MessageServer::~MessageServer (void)
00047 {
00048         m_socket.close();
00049 
00050         if (m_addrClientOwned)
00051         {
00052                 delete m_pAddrClient;
00053         }
00054 }
00055 
00056 
00057 
00058 int MessageServer::RegisterWithReactor(ACE_Reactor &theReactor)
00059 {
00060         int retVal = theReactor.register_handler (this,
00061                 ACE_Event_Handler::READ_MASK);
00062                 
00063         if (retVal == -1)
00064                 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "register_handler"), -1);
00065 
00066 
00067         if (m_addrClientOwned)
00068         {
00069                 retVal = m_pAddrClient->RegisterWithReactor(theReactor);
00070 
00071                 if (retVal == -1)
00072                 {
00073                         theReactor.remove_handler (this, ACE_Event_Handler::READ_MASK);
00074                 }
00075         }
00076 
00077         return retVal;
00078 }
00079 
00080 
00081 
00082 int MessageServer::RemoveFromReactor(ACE_Reactor &theReactor, bool immediate)
00083 {
00084         int retVal = theReactor.remove_handler (this,
00085                 ACE_Event_Handler::READ_MASK);
00086                 
00087         if (retVal == -1)
00088                 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "remove_handler"));
00089 
00090         if (m_addrClientOwned)
00091         {
00092                 if (m_pAddrClient->RemoveFromReactor(theReactor, immediate) == -1)
00093                         retVal = -1;
00094         }
00095 
00096         return retVal;
00097 }
00098 
00099 
00100 
00101 int MessageServer::RegisterClient(u_int id, MessageClient *client)
00102 {
00103         //
00104         // NOTE: Insertion of elements does not invalidate iterators,
00105         // so this method need not be guarded by the mutex.
00106         //ACE_Guard<ACE_Thread_Mutex> g(m_clientMutex);
00107 
00108         //
00109         // Check to make sure this client isn't already registered somehow.
00110         ClientMap_t::iterator it = m_clients.find(id);
00111 
00112         //
00113         // If this is an entirely new ID, we'll need to bind the address.
00114         bool addressBindingNeeded = (it == m_clients.end());
00115 
00116         for (; it != m_clients.end() && it->first == id; ++it)
00117         {
00118                 if (it->second == client)
00119                 {
00120                         ACE_ERROR_RETURN((LM_ERROR,
00121                                 "(%N:%l) Attempt to register the same client for the same id twice.\n"),
00122                                 -1);
00123                 }
00124         }
00125 
00126         //
00127         // Add the client. They will start receiving messages immediately.
00128         m_clients.insert(ClientValue_t(id, client));
00129         client->m_pServer = this;
00130 
00131         if (addressBindingNeeded)
00132         {
00133                 char binding[MAXHOSTNAMELEN+8];
00134                 m_localAddr.addr_to_string(binding, sizeof(binding), 0);
00135                 m_pAddrClient->AddBinding(id, binding);
00136         }
00137 
00138         return 0;
00139 }
00140 
00141 
00142 int MessageServer::RemoveClient(u_int id, MessageClient *client)
00143 {
00144         //
00145         // Lock out the reactor so we don't corrupt its iterator.
00146         ACE_Guard<ACE_Thread_Mutex> g(m_clientMutex);
00147 
00148         ClientMap_t::iterator it = m_clients.find(id);
00149         for (; it != m_clients.end() && it->first == id; ++it)
00150         {
00151                 if (it->second == client)
00152                 {
00153                         RemoveClient_i(it);
00154                         return 0;
00155                 }
00156         }
00157 
00158         ACE_ERROR_RETURN((LM_ERROR,
00159                 "(%N:%l) Attempt to remove a bad client/id registration.\n"),
00160                 -1);
00161 }
00162 
00163 
00164 
00165 MessageServer::ClientMap_t::iterator
00166 MessageServer::RemoveClient_i(MessageServer::ClientMap_t::iterator it)
00167 {
00168         //
00169         // Remember the id of this client.
00170         u_int id = it->first;
00171 
00172         //
00173         // Clear the client's server pointer and remove it from the client list.
00174         it->second->m_pServer = NULL;
00175         ClientMap_t::iterator next = it;
00176         ++next;
00177         m_clients.erase(it);
00178 
00179         //
00180         // If this was the last client with the given ID, then remove our binding
00181         // to this ID from the address server.
00182         ClientMap_t::iterator remain = m_clients.find(id);
00183         if (remain == m_clients.end())
00184         {
00185                 char binding[MAXHOSTNAMELEN+8];
00186                 m_localAddr.addr_to_string(binding, sizeof(binding), 0);
00187 
00188                 m_pAddrClient->RemoveBinding(id, binding);
00189         }
00190 
00191         return next;
00192 }
00193 
00194 
00195 ACE_HANDLE MessageServer::get_handle (void) const
00196 {
00197         return this->m_socket.get_handle ();
00198 }
00199 
00200 
00201 
00202 int MessageServer::handle_input (ACE_HANDLE)
00203 {
00204         ACE_INET_Addr remoteAddr;
00205         MsgHdr_t hdr;
00206 
00207         iovec bufs[2];
00208         
00209         bufs[0].iov_len = sizeof(hdr);
00210         bufs[0].iov_base = (char *)(&hdr);
00211 
00212         bufs[1].iov_len = sizeof(m_buf);
00213         bufs[1].iov_base = m_buf;
00214 
00215 
00216         //
00217         // Receive message.
00218         int status =
00219                 this->m_socket.recv(bufs, 2, remoteAddr);
00220 
00221         //
00222         // If reception failed, log the error and return -1.
00223         // NOTE: Returning -1 deregisters the server.
00224         if (status == -1)
00225         {
00226                 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "recv"), -1);
00227         }
00228 
00229         //
00230         // Check to see if the message was too short...
00231         // NOTE: This error doesn't deregister the server -- its just a user error.
00232         if ((u_int)status < sizeof(hdr))
00233         {
00234                 ACE_ERROR_RETURN((LM_WARNING,
00235                         "(%N:%l) Received message is shorter than a header. Dropping.\n"),
00236                         0);
00237         }
00238         //
00239         // Check to see if the header is intact...
00240         else if (hdr.m_magicNumber == MsgHdr_t::MAGIC_NUMBER)
00241         {
00242         }
00243         //
00244         // If not intact, see if its just reversed due to an endianness mismatch...
00245         else if ( hdr.m_magicNumber == ACE_SWAP_LONG(MsgHdr_t::MAGIC_NUMBER) )
00246         {
00247                 ACE_ERROR((LM_WARNING,
00248                         "(%N:%l) Sender and receiver don't have the same endianness --"
00249                         " hope nobody sent any integers...\n"));
00250 
00251                 hdr.m_msgLength = ACE_SWAP_LONG(hdr.m_msgLength);
00252                 hdr.m_destination = ACE_SWAP_LONG(hdr.m_destination);
00253         }
00254         //
00255         // Otherwise, its just a bad header.
00256         // NOTE: This error doesn't deregister the server -- its just a user error.
00257         else
00258         {
00259                 ACE_ERROR_RETURN((LM_WARNING,
00260                         "(%N:%l) Got message with bad/missing header. Dropping.\n"),
00261                         0);
00262         }
00263 
00264         //
00265         // Check to see if the received message length matches what the header
00266         // tells us it should be...
00267         // NOTE: This error doesn't deregister the server -- its just a user error.
00268         size_t actualLen = status - sizeof(hdr);
00269         if (hdr.m_msgLength != actualLen)
00270         {
00271                 ACE_ERROR_RETURN((LM_WARNING,
00272                         "(%N:%l) Message length (%d) is different than header says it"
00273                         " should be (%d). Dropping.\n",
00274                         actualLen, hdr.m_msgLength), 0);
00275         }
00276 
00277 
00278         //
00279         // Prevent removal of clients, since this could corrupt our iterator.
00280         ACE_Guard<ACE_Thread_Mutex> g(m_clientMutex);
00281 
00282         //
00283         // Iterate through all the clients and show them this message.
00284         ClientMap_t::iterator it = m_clients.find(hdr.m_destination);
00285         while (it != m_clients.end() && it->first == hdr.m_destination)
00286         {
00287                 //
00288                 // If a client returns -1, then remove them from the server.
00289                 if (it->second->ReceiveMessage(m_buf, actualLen) == -1)
00290                 {
00291                         it->second->m_pServer = NULL;
00292 
00293                         //
00294                         // Note that the following line will block until 
00295                         // the address server's event handler can run --
00296                         // if the reactor isn't multithreaded this will hang
00297                         // forever!  Use a thread pool or a WFMO reactor.
00298                         it = RemoveClient_i(it);
00299                 }
00300                 else
00301                 {
00302                         ++it;
00303                 }
00304         }
00305         
00306         return 0;
00307 }
00308 
00309 
00310 
00311 
00312 
00313 int MessageServer::SendMessage(u_int originator, const void *msg, size_t len,
00314                                                            u_int dest)
00315 {
00316         int retVal = 0;
00317 
00318         AddrList_t addrList;
00319 
00320         if (m_pAddrClient->GetBindings(dest, addrList) == -1)
00321         {
00322                 ACE_ERROR_RETURN((LM_ERROR,
00323                         "(%N:%l) Unable to retrieve bindings from AddressServer.\n"),
00324                         -1);
00325         }
00326 
00327         iovec bufs[2];
00328         
00329         MsgHdr_t hdr;
00330 
00331         hdr.m_msgLength = len;
00332         hdr.m_destination = dest;
00333 
00334         bufs[0].iov_len = sizeof(hdr);
00335         bufs[0].iov_base = (char *)(&hdr);
00336 
00337         bufs[1].iov_len = len;
00338         bufs[1].iov_base = (char *)(msg);
00339 
00340         AddrList_t::const_iterator i = addrList.begin();
00341 
00342         for (; i != addrList.end(); ++i)
00343         {
00344                 int status = this->m_socket.send(bufs, 2, *i);
00345                 if (status == -1)
00346                 {
00347                         ACE_ERROR((LM_ERROR, "(%N:%l) %p\n","send"));
00348                         retVal = -1;
00349                 }
00350         }
00351         
00352         return retVal;
00353 }
00354 

Generated on Wed Sep 5 12:54:22 2007 for DSACSS Operational Code by  doxygen 1.3.9.1