00001
00002
00003
00004
00005
00006
00007
00008 #include "Comm/AddressServer.h"
00009
00010
00011 AddressServer::AddressServer (u_short port, const char *mcast_addr,
00012 const char * mcast_iface)
00013 : m_mcastAddr (port, mcast_addr),
00014 m_nextObserverID(0)
00015 {
00016 ACE_INET_Addr in_addr_any;
00017 m_replySocket.open(in_addr_any);
00018
00019 if (this->m_mcastDgram.subscribe (this->m_mcastAddr, 1, mcast_iface) == -1)
00020 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "subscribe"));
00021 }
00022
00023 AddressServer::AddressServer (const ACE_INET_Addr &mcast_addr,
00024 const char *mcast_iface)
00025 : m_mcastAddr (mcast_addr),
00026 m_nextObserverID(0)
00027 {
00028 ACE_INET_Addr in_addr_any;
00029 m_replySocket.open(in_addr_any);
00030
00031 if (this->m_mcastDgram.subscribe (this->m_mcastAddr, 1, mcast_iface) == -1)
00032 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "subscribe"));
00033 }
00034
00035 AddressServer::~AddressServer (void)
00036 {
00037 this->m_mcastDgram.close ();
00038 }
00039
00040
00041
00042 int AddressServer::SetLoopbackEnable(bool enabled)
00043 {
00044 u_char arg = enabled;
00045
00046 int retVal = m_mcastDgram.set_option(IP_MULTICAST_LOOP, arg);
00047
00048 if (retVal < 0)
00049 ACE_ERROR((LM_NOTICE, "(%N:%l): set_option: %p"));
00050
00051 return retVal;
00052
00053 }
00054
00055
00056
00057 int AddressServer::SetTimeToLive(u_char ttl)
00058 {
00059 int retVal = m_mcastDgram.set_option(IP_MULTICAST_TTL, ttl);
00060
00061 if (retVal < 0)
00062 ACE_ERROR((LM_INFO, "(%N:%l): set_option: %p"));
00063
00064 return retVal;
00065 }
00066
00067
00068
00069 int AddressServer::RegisterWithReactor(ACE_Reactor &theReactor)
00070 {
00071 int retVal = theReactor.register_handler (this,
00072 ACE_Event_Handler::READ_MASK);
00073
00074 if (retVal == -1)
00075 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "register_handler"));
00076
00077 return retVal;
00078 }
00079
00080
00081
00082 int AddressServer::RemoveFromReactor(ACE_Reactor &theReactor)
00083 {
00084 int retVal = theReactor.remove_handler (this,
00085 ACE_Event_Handler::READ_MASK);
00086
00087 if (retVal == -1)
00088 ACE_ERROR ((LM_ERROR, "(%N:%l) %p\n", "remove_handler"));
00089
00090 return retVal;
00091 }
00092
00093
00094
00095 ACE_HANDLE AddressServer::get_handle (void) const
00096 {
00097 return this->m_mcastDgram.get_handle ();
00098 }
00099
00100
00101
00102
00103
00104 int AddressServer::handle_input (ACE_HANDLE)
00105 {
00106
00107 ssize_t retVal =
00108 this->m_mcastDgram.recv(m_buf, sizeof(m_buf)-1, m_remoteAddr);
00109
00110 if (retVal == -1)
00111 {
00112 return retVal;
00113 }
00114
00115
00116
00117 m_buf[retVal] = '\0';
00118
00119
00120
00121
00122 u_int objAddr;
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 char binding[MAXHOSTNAMELEN+8];
00133
00134
00135
00136
00137 if (sscanf(m_buf, "GET %u", &objAddr) == 1)
00138 {
00139
00140
00141
00142
00143 char *idx = m_buf;
00144 u_int remaining = sizeof(m_buf);
00145
00146
00147 idx[0] = '\0';
00148
00149 bool firstTime = true;
00150 BindMap_t::iterator i = m_bindings.find(objAddr);
00151
00152 for(;i != m_bindings.end() && objAddr == i->first; ++i)
00153 {
00154
00155
00156 if (firstTime)
00157 {
00158 firstTime = false;
00159 }
00160
00161
00162
00163 else if (remaining > 3)
00164 {
00165 *(idx++) = ',';
00166 --remaining;
00167 }
00168
00169
00170 else
00171 {
00172 ACE_ERROR((LM_WARNING,
00173 "(%N:%l) Binding list for address %d is too long to send!\n",
00174 objAddr));
00175 break;
00176 }
00177
00178
00179
00180
00181 std::string &addr = i->second;
00182 if (addr.length() >= remaining)
00183 {
00184 ACE_ERROR((LM_WARNING,
00185 "(%N:%l) Binding list for address %d is too long to send!\n",
00186 objAddr));
00187 break;
00188 }
00189
00190
00191 else
00192 {
00193 strcpy(idx, addr.c_str());
00194 size_t chunk = strlen(idx);
00195 remaining -= chunk;
00196 idx += chunk;
00197 }
00198 }
00199
00200
00201
00202
00203 SendString(m_buf, m_remoteAddr);
00204 }
00205
00206
00207 else if (sscanf(m_buf, "ADD %u %s", &objAddr, binding) == 2)
00208 {
00209 m_bindings.insert(BindValue_t(objAddr, binding));
00210
00211 SendString("OK", m_remoteAddr);
00212
00213 NotifyChange(objAddr);
00214 }
00215
00216
00217 else if (sscanf(m_buf, "REMOVE %u %s", &objAddr, binding) == 2)
00218 {
00219 BindMap_t::iterator i = m_bindings.find(objAddr);
00220
00221 for(;; ++i)
00222 {
00223 if (i == m_bindings.end() || objAddr != i->first)
00224 {
00225 SendString("NOT_FOUND", m_remoteAddr);
00226 break;
00227 }
00228
00229 std::string &addr = i->second;
00230
00231 if (!strcmp(addr.c_str(), binding))
00232 {
00233 m_bindings.erase(i);
00234
00235 SendString("OK", m_remoteAddr);
00236
00237 NotifyChange(objAddr);
00238
00239 break;
00240 }
00241 }
00242
00243 }
00244
00245
00246 else if (strncmp(m_buf, "CLEAR_BINDINGS", 14) == 0)
00247 {
00248 m_bindings.clear();
00249
00250 SendString("OK", m_remoteAddr);
00251
00252 NotifyChange();
00253 }
00254
00255
00256 else if (strncmp(m_buf, "ADD_NOTIFY", 10) == 0)
00257 {
00258
00259
00260
00261 if (m_observers.size() == UINT_MAX)
00262 {
00263 SendString("TOO_MANY", m_remoteAddr);
00264 }
00265 else
00266 {
00267
00268
00269
00270 NotificationMap_t::iterator i = m_observers.find(m_nextObserverID);
00271 while (i != m_observers.end())
00272 {
00273 m_observers.find(++m_nextObserverID);
00274 }
00275
00276
00277
00278 m_observers.insert(NotificationValue_t(m_nextObserverID, m_remoteAddr));
00279 sprintf(m_buf, "YOU_ARE %u", m_nextObserverID++);
00280 SendString(m_buf, m_remoteAddr);
00281 }
00282 }
00283
00284
00285 else if (sscanf(m_buf, "REMOVE_NOTIFY %u", &objAddr) == 1)
00286 {
00287 NotificationMap_t::iterator i = m_observers.find(objAddr);
00288
00289 if (i == m_observers.end())
00290 {
00291 SendString("BAD_ID", m_remoteAddr);
00292 }
00293 else
00294 {
00295 m_observers.erase(i);
00296 SendString("OK", m_remoteAddr);
00297 }
00298 }
00299
00300
00301 else if (strncmp(m_buf, "CLEAR_NOTIFY", 12) == 0)
00302 {
00303 m_observers.clear();
00304
00305 SendString("OK", m_remoteAddr);
00306 }
00307
00308
00309 else if (strncmp(m_buf, "PING", 4) == 0)
00310 {
00311 SendString("ACK", m_remoteAddr);
00312 }
00313 else
00314 {
00315 ACE_ERROR((LM_WARNING,
00316 "(%N:%l) Received unrecognized AddressServer message: %s\n",
00317 m_buf));
00318
00319 SendString("NAK", m_remoteAddr);
00320
00321 }
00322
00323 return 0;
00324 }
00325
00326
00327
00328 int AddressServer::SendString(const char *str, const ACE_INET_Addr &dest)
00329 {
00330 int retVal = this->m_replySocket.send(str, strlen(str), dest);
00331
00332 if (retVal == -1)
00333 {
00334 ACE_ERROR((LM_ERROR, "(%N:%l) %p\n","send"));
00335 }
00336
00337 return retVal;
00338 }
00339
00340
00341
00342 int AddressServer::NotifyChange()
00343 {
00344 int retVal = 0;
00345
00346 NotificationMap_t::iterator i = m_observers.begin();
00347
00348 for (; i != m_observers.end(); ++i)
00349 {
00350 if (SendString("CHANGED_ALL", i->second) == -1)
00351 {
00352 ACE_ERROR((LM_WARNING,
00353 "(%N:%l) Change notification failed for a client. Dropping the client.\n"));
00354 m_observers.erase(i);
00355 retVal = -1;
00356 }
00357 }
00358
00359 return retVal;
00360 }
00361
00362
00363 int AddressServer::NotifyChange(u_int id)
00364 {
00365 int retVal = 0;
00366
00367 char buf[25];
00368
00369 sprintf(buf, "CHANGED %u", id);
00370
00371 NotificationMap_t::iterator i = m_observers.begin();
00372
00373 for (; i != m_observers.end(); ++i)
00374 {
00375 if (SendString(buf, i->second) == -1)
00376 {
00377 ACE_ERROR((LM_WARNING,
00378 "(%N:%l) Change notification failed for a client. Dropping the client.\n"));
00379 m_observers.erase(i);
00380 retVal = -1;
00381 }
00382 }
00383
00384 return retVal;
00385 }
00386