00001
00002
00003
00004
00005
00006
00007
00008 #include "Comm/MessageServer.h"
00009 #include "Comm/MessageClient.h"
00010
00011
00012
00013
00014
00015 MessageServer::MessageServer (const ACE_INET_Addr &addrServer, const ACE_INET_Addr &localAddr)
00016 : m_pAddrClient(new AddressClient(addrServer)),
00017 m_addrClientOwned(true),
00018 m_localAddr()
00019 {
00020
00021
00022 m_socket.open(localAddr);
00023
00024
00025
00026 m_socket.get_local_addr(m_localAddr);
00027 }
00028
00029 MessageServer::MessageServer (AddressClient *addrClient, bool takeOwnership,
00030 const ACE_INET_Addr &localAddr)
00031 : m_pAddrClient(addrClient),
00032 m_addrClientOwned(takeOwnership),
00033 m_localAddr()
00034 {
00035
00036
00037 m_socket.open(localAddr);
00038
00039
00040
00041 m_socket.get_local_addr(m_localAddr);
00042 }
00043
00044
00045
00046 MessageServer::~MessageServer (void)
00047 {
00048 m_socket.close();
00049
00050 if (m_addrClientOwned)
00051 {
00052 delete m_pAddrClient;
00053 }
00054 }
00055
00056
00057
00058 int MessageServer::RegisterWithReactor(ACE_Reactor &theReactor)
00059 {
00060 int retVal = theReactor.register_handler (this,
00061 ACE_Event_Handler::READ_MASK);
00062
00063 if (retVal == -1)
00064 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "register_handler"), -1);
00065
00066
00067 if (m_addrClientOwned)
00068 {
00069 retVal = m_pAddrClient->RegisterWithReactor(theReactor);
00070
00071 if (retVal == -1)
00072 {
00073 theReactor.remove_handler (this, ACE_Event_Handler::READ_MASK);
00074 }
00075 }
00076
00077 return retVal;
00078 }
00079
00080
00081
00082 int MessageServer::RemoveFromReactor(ACE_Reactor &theReactor, bool immediate)
00083 {
00084 int retVal = theReactor.remove_handler (this,
00085 ACE_Event_Handler::READ_MASK);
00086
00087 if (retVal == -1)
00088 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "remove_handler"));
00089
00090 if (m_addrClientOwned)
00091 {
00092 if (m_pAddrClient->RemoveFromReactor(theReactor, immediate) == -1)
00093 retVal = -1;
00094 }
00095
00096 return retVal;
00097 }
00098
00099
00100
00101 int MessageServer::RegisterClient(u_int id, MessageClient *client)
00102 {
00103
00104
00105
00106
00107
00108
00109
00110 ClientMap_t::iterator it = m_clients.find(id);
00111
00112
00113
00114 bool addressBindingNeeded = (it == m_clients.end());
00115
00116 for (; it != m_clients.end() && it->first == id; ++it)
00117 {
00118 if (it->second == client)
00119 {
00120 ACE_ERROR_RETURN((LM_ERROR,
00121 "(%N:%l) Attempt to register the same client for the same id twice.\n"),
00122 -1);
00123 }
00124 }
00125
00126
00127
00128 m_clients.insert(ClientValue_t(id, client));
00129 client->m_pServer = this;
00130
00131 if (addressBindingNeeded)
00132 {
00133 char binding[MAXHOSTNAMELEN+8];
00134 m_localAddr.addr_to_string(binding, sizeof(binding), 0);
00135 m_pAddrClient->AddBinding(id, binding);
00136 }
00137
00138 return 0;
00139 }
00140
00141
00142 int MessageServer::RemoveClient(u_int id, MessageClient *client)
00143 {
00144
00145
00146 ACE_Guard<ACE_Thread_Mutex> g(m_clientMutex);
00147
00148 ClientMap_t::iterator it = m_clients.find(id);
00149 for (; it != m_clients.end() && it->first == id; ++it)
00150 {
00151 if (it->second == client)
00152 {
00153 RemoveClient_i(it);
00154 return 0;
00155 }
00156 }
00157
00158 ACE_ERROR_RETURN((LM_ERROR,
00159 "(%N:%l) Attempt to remove a bad client/id registration.\n"),
00160 -1);
00161 }
00162
00163
00164
00165 MessageServer::ClientMap_t::iterator
00166 MessageServer::RemoveClient_i(MessageServer::ClientMap_t::iterator it)
00167 {
00168
00169
00170 u_int id = it->first;
00171
00172
00173
00174 it->second->m_pServer = NULL;
00175 ClientMap_t::iterator next = it;
00176 ++next;
00177 m_clients.erase(it);
00178
00179
00180
00181
00182 ClientMap_t::iterator remain = m_clients.find(id);
00183 if (remain == m_clients.end())
00184 {
00185 char binding[MAXHOSTNAMELEN+8];
00186 m_localAddr.addr_to_string(binding, sizeof(binding), 0);
00187
00188 m_pAddrClient->RemoveBinding(id, binding);
00189 }
00190
00191 return next;
00192 }
00193
00194
00195 ACE_HANDLE MessageServer::get_handle (void) const
00196 {
00197 return this->m_socket.get_handle ();
00198 }
00199
00200
00201
00202 int MessageServer::handle_input (ACE_HANDLE)
00203 {
00204 ACE_INET_Addr remoteAddr;
00205 MsgHdr_t hdr;
00206
00207 iovec bufs[2];
00208
00209 bufs[0].iov_len = sizeof(hdr);
00210 bufs[0].iov_base = (char *)(&hdr);
00211
00212 bufs[1].iov_len = sizeof(m_buf);
00213 bufs[1].iov_base = m_buf;
00214
00215
00216
00217
00218 int status =
00219 this->m_socket.recv(bufs, 2, remoteAddr);
00220
00221
00222
00223
00224 if (status == -1)
00225 {
00226 ACE_ERROR_RETURN((LM_ERROR, "(%N:%l) %p\n", "recv"), -1);
00227 }
00228
00229
00230
00231
00232 if ((u_int)status < sizeof(hdr))
00233 {
00234 ACE_ERROR_RETURN((LM_WARNING,
00235 "(%N:%l) Received message is shorter than a header. Dropping.\n"),
00236 0);
00237 }
00238
00239
00240 else if (hdr.m_magicNumber == MsgHdr_t::MAGIC_NUMBER)
00241 {
00242 }
00243
00244
00245 else if ( hdr.m_magicNumber == ACE_SWAP_LONG(MsgHdr_t::MAGIC_NUMBER) )
00246 {
00247 ACE_ERROR((LM_WARNING,
00248 "(%N:%l) Sender and receiver don't have the same endianness --"
00249 " hope nobody sent any integers...\n"));
00250
00251 hdr.m_msgLength = ACE_SWAP_LONG(hdr.m_msgLength);
00252 hdr.m_destination = ACE_SWAP_LONG(hdr.m_destination);
00253 }
00254
00255
00256
00257 else
00258 {
00259 ACE_ERROR_RETURN((LM_WARNING,
00260 "(%N:%l) Got message with bad/missing header. Dropping.\n"),
00261 0);
00262 }
00263
00264
00265
00266
00267
00268 size_t actualLen = status - sizeof(hdr);
00269 if (hdr.m_msgLength != actualLen)
00270 {
00271 ACE_ERROR_RETURN((LM_WARNING,
00272 "(%N:%l) Message length (%d) is different than header says it"
00273 " should be (%d). Dropping.\n",
00274 actualLen, hdr.m_msgLength), 0);
00275 }
00276
00277
00278
00279
00280 ACE_Guard<ACE_Thread_Mutex> g(m_clientMutex);
00281
00282
00283
00284 ClientMap_t::iterator it = m_clients.find(hdr.m_destination);
00285 while (it != m_clients.end() && it->first == hdr.m_destination)
00286 {
00287
00288
00289 if (it->second->ReceiveMessage(m_buf, actualLen) == -1)
00290 {
00291 it->second->m_pServer = NULL;
00292
00293
00294
00295
00296
00297
00298 it = RemoveClient_i(it);
00299 }
00300 else
00301 {
00302 ++it;
00303 }
00304 }
00305
00306 return 0;
00307 }
00308
00309
00310
00311
00312
00313 int MessageServer::SendMessage(u_int originator, const void *msg, size_t len,
00314 u_int dest)
00315 {
00316 int retVal = 0;
00317
00318 AddrList_t addrList;
00319
00320 if (m_pAddrClient->GetBindings(dest, addrList) == -1)
00321 {
00322 ACE_ERROR_RETURN((LM_ERROR,
00323 "(%N:%l) Unable to retrieve bindings from AddressServer.\n"),
00324 -1);
00325 }
00326
00327 iovec bufs[2];
00328
00329 MsgHdr_t hdr;
00330
00331 hdr.m_msgLength = len;
00332 hdr.m_destination = dest;
00333
00334 bufs[0].iov_len = sizeof(hdr);
00335 bufs[0].iov_base = (char *)(&hdr);
00336
00337 bufs[1].iov_len = len;
00338 bufs[1].iov_base = (char *)(msg);
00339
00340 AddrList_t::const_iterator i = addrList.begin();
00341
00342 for (; i != addrList.end(); ++i)
00343 {
00344 int status = this->m_socket.send(bufs, 2, *i);
00345 if (status == -1)
00346 {
00347 ACE_ERROR((LM_ERROR, "(%N:%l) %p\n","send"));
00348 retVal = -1;
00349 }
00350 }
00351
00352 return retVal;
00353 }
00354