00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <bogotel/Portability.h>
00011 #include <bogotel/MsgTransport.h>
00012 #include <bogotel/BgtRt.h>
00013 #include <bogotel/BgtErrors.h>
00014 #include <bogotel/util.h>
00015
00016 #include <boost/thread/xtime.hpp>
00017 #include <log4cpp/NDC.hh>
00018
00019 namespace bogotel {
00020
00021 #define MAXBUFLEN 500
00022
00024
00026
00027 CMsgTransport::CMsgTransport(CBgtRt *pBgtRt) :
00028 m_usMyPort(0),
00029 m_usOppPort(0),
00030 m_pBgtRt(pBgtRt),
00031 m_bListenerStarted(false),
00032 m_bStopTalker(false),
00033 m_bStopListener(false)
00034 {
00035
00036 }
00037
00038 CMsgTransport::~CMsgTransport()
00039 {
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 }
00056
00057 int CMsgTransport::init()
00058 {
00059 char method[] = "CMsgTransport::init()";
00060
00061 std::string strInstanceNum = g_util->getInstanceNum();
00062
00063 if ((m_usMyPort = (u_short)g_util->m_propIni.getProperty("MyPort" + strInstanceNum, (unsigned long)0)) == 0) {
00064 g_util->log(3, -1, "%s: No MyPort in ini file", method);
00065 return resultERROR;
00066 }
00067
00068 if ((m_usOppPort = (u_short)g_util->m_propIni.getProperty("OppPort" + strInstanceNum, (unsigned long)0)) == 0) {
00069 g_util->log(3, -1, "%s: No OppAddress in ini file", method);
00070 return resultERROR;
00071 }
00072
00073 m_strOppIp = g_util->m_propIni.getProperty("OppAddress" + strInstanceNum, "127.0.0.1");
00074 if (m_strOppIp == "") {
00075 g_util->log(3, -1, "%s: No OppAddress in ini file", method);
00076 return resultERROR;
00077 }
00078
00079 {
00080 boost::mutex::scoped_lock lock(m_mtxStartup);
00081
00082
00083 m_pthrdListener = new boost::thread(thread_adapter(&CMsgTransport::do_thread_listen, this));
00084
00085 boost::xtime xt;
00086 boost::xtime_get(&xt, boost::TIME_UTC);
00087 xt.sec += 25;
00088 if (! m_condStartup.timed_wait(lock, xt, cond_predicate(m_bListenerStarted, true))) {
00089
00090 g_util->log(3, -1, "%s: Timed out waiting for listener thread.", method);
00091 return resultERROR;
00092 }
00093 }
00094
00095
00096 m_pthrdListener = new boost::thread(thread_adapter(&CMsgTransport::do_thread_talk, this));
00097
00098 g_util->log(9, -1, "CMsgTransport::init() completed.");
00099 return resultSUCCESS;
00100 }
00101
00102 void CMsgTransport::do_thread_listen(void* param)
00103 {
00104 log4cpp::NDC::push("ListenThread");
00105 try {
00106 static_cast<CMsgTransport*>(param)->runListener();
00107 } catch(...) {
00108 g_util->log(1, -1, "CMsgTransport::do_thread_listen: Exception");
00109 }
00110 }
00111
00112 void CMsgTransport::do_thread_talk(void* param)
00113 {
00114 log4cpp::NDC::push("TalkThread");
00115 try {
00116 static_cast<CMsgTransport*>(param)->runTalker();
00117 } catch(...) {
00118 g_util->log(1, -1, "CMsgTransport::do_thread_talk: Exception");
00119 }
00120 }
00121
00122 void CMsgTransport::runTalker()
00123 {
00124 int rc;
00125 int numbytes;
00126
00127 if ((rc = openTalkerSocket()) != resultSUCCESS) {
00128 g_util->log(3, -1, "CMsgTransport::talkerRun(): openTalkerSocket() failed. rc = %d", rc);
00129 return;
00130 }
00131
00132 while (!m_bStopTalker) {
00133 {
00134 boost::mutex::scoped_lock lock(m_mtxAddedToQueue);
00135 ContainerNotEmpty<DEQUE_STRING> notEmpty(m_deqMsg);
00136
00137
00138 m_condEvent.wait(lock, notEmpty);
00139 }
00140
00141 while (true) {
00142 std::string strMsg;
00143
00144
00145 {
00146 boost::mutex::scoped_lock lock(m_mtxDeque);
00147 if (! m_deqMsg.empty()) {
00148 strMsg = m_deqMsg.front();
00149 m_deqMsg.pop_front();
00150 } else {
00151
00152 break;
00153 }
00154 }
00155
00156 if ((numbytes = sendto(m_fdTalker,
00157 strMsg.c_str(),
00158 strlen(strMsg.c_str()) + 1,
00159 0,
00160 (struct sockaddr *)&m_addrTalker,
00161 sizeof(struct sockaddr))) == -1) {
00162 g_util->log(3, -1, "CMsgTransport::talkerRun(): sendto() failed. "
00163 "::WSAGetLastError() is %d", ::WSAGetLastError());
00164 return;
00165 }
00166 }
00167 }
00168 }
00169
00170 void CMsgTransport::runListener()
00171 {
00172 int rc;
00173 int addr_len, numbytes;
00174 char buf[MAXBUFLEN];
00175
00176
00177 if ((rc = openListenerSocket(&m_addrListener, &m_fdListener, m_usMyPort)) != resultSUCCESS) {
00178 g_util->log(3, -1, "CMsgTransport::listenerRun(): openListenerSocket() failed. rc = %d", rc);
00179 return;
00180 }
00181
00182
00183 m_bListenerStarted = true;
00184 m_condStartup.notify_one();
00185
00186 CMsg *pMsg = new CMsg();
00187
00188
00189 while (! m_bStopListener) {
00190 addr_len = sizeof(struct sockaddr);
00191 if ((numbytes = recv(m_fdListener, buf, MAXBUFLEN, 0)) == -1) {
00192 g_util->log(3, -1, "CMsgTransport::listenerRun(): recvfrom() failed. "
00193 "::WSAGetLastError() is %d", ::WSAGetLastError());
00194 continue;
00195 }
00196
00197 if ((rc = pMsg->init(buf)) != resultSUCCESS) {
00198 g_util->log(3, -1, "CMsgTransport::listenerRun(): CMsg::init() failed. rc is %d", rc);
00199 } else {
00200 m_pBgtRt->incomingMsg(pMsg);
00201 }
00202 }
00203 }
00204
00205 int CMsgTransport::openListenerSocket(struct sockaddr_in *pAddr,
00206 int *pFd,
00207 u_short usPort)
00208 {
00209 char method[] = "CMsgTransport::openListenerSocket()";
00210 if (((*pFd) = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
00211 g_util->log(3, -1, "%s: socket() failed. ::WSAGetLastError() is %d",
00212 method, ::WSAGetLastError());
00213 return resultSOCKET_ERROR;
00214 }
00215 pAddr->sin_family = AF_INET;
00216 pAddr->sin_addr.s_addr = INADDR_ANY;
00217 memset(&(pAddr->sin_zero), 0, 8);
00218 pAddr->sin_port = htons(usPort);
00219
00220 if (bind((*pFd), (struct sockaddr *)pAddr, sizeof(struct sockaddr)) != 0) {
00221
00222 int iErr = ::WSAGetLastError();
00223 if (iErr == WSAEADDRINUSE) {
00224 g_util->log(5, -1, "%s: bind() failed. Socket in use.", method);
00225 return resultSOCKET_IN_USE;
00226 }
00227 g_util->log(3, -1, "%s: bind() failed. ::WSAGetLastError() is %d",
00228 method, ::WSAGetLastError());
00229 return resultSOCKET_ERROR;
00230 }
00231 g_util->log(9, -1, "%s completed.", method);
00232 return resultSUCCESS;
00233 }
00234
00235 int CMsgTransport::openTalkerSocket()
00236 {
00237 struct hostent *pheTalker = NULL;
00238
00239
00240 if ((pheTalker = gethostbyname(m_strOppIp.c_str())) == NULL) {
00241 g_util->log(3, -1, "CMsgTransport::openTalkerSocket(): gethostbyname() failed. "
00242 "::WSAGetLastError() is %d", ::WSAGetLastError());
00243 return resultSOCKET_ERROR;
00244 }
00245 m_addrTalker.sin_family = AF_INET;
00246 m_addrTalker.sin_port = htons(m_usOppPort);
00247 m_addrTalker.sin_addr = *((struct in_addr *)pheTalker->h_addr);
00248 memset(&(m_addrTalker.sin_zero), 0, 8);
00249
00250 if ((m_fdTalker = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
00251 g_util->log(3, -1, "CMsgTransport::openTalkerSocket(): socket() failed. "
00252 "::WSAGetLastError() is %d", ::WSAGetLastError());
00253 return resultSOCKET_ERROR;
00254 }
00255
00256 g_util->log(9, -1, "CMsgTransport::openTalkerSocket() completed.");
00257 return resultSUCCESS;
00258 }
00259
00260 int CMsgTransport::sendMsg(CMsg *pMsg)
00261 {
00262 char szMsg[MAXBUFLEN];
00263
00264 if (pMsg->toString(szMsg, sizeof(szMsg)) == NULL){
00265 g_util->log(3, -1, "CMsgTransport::sendMsg(): CMsg::toString() failed. ");
00266 return resultFUNC_BAD_PARAMETER;
00267 }
00268
00269
00270 {
00271 boost::mutex::scoped_lock lock(m_mtxDeque);
00272 m_deqMsg.push_back(szMsg);
00273 }
00274
00275
00276 m_condEvent.notify_one();
00277
00278 g_util->log(9, -1, "CMsgTransport::sendMsg() completed.");
00279
00280 return resultSUCCESS;
00281 }
00282
00284
00286
00287 CMsg::CMsg()
00288 {
00289 clear();
00290 }
00291
00292 CMsg::~CMsg()
00293 {
00294
00295 }
00296
00297 void CMsg::clear()
00298 {
00299 m_type = MT_INVALID;
00300 m_class = MC_INVALID;
00301 m_id = -1;
00302 m_crn = INVALID_CRN;
00303 if (! m_mapParam.empty()) {
00304 m_mapParam.clear();
00305 }
00306 }
00307
00308 void CMsg::init(MessageType type, MessageClass cls, long lId)
00309 {
00310 clear();
00311 m_class = cls;
00312 m_type = type;
00313 m_id = lId;
00314 }
00315
00316 void CMsg::addParam(enum ParmType pt, std::string strParm)
00317 {
00318 m_mapParam.insert(MAP_LONG2STR::value_type(pt, strParm));
00319 }
00320
00321 void CMsg::addParam(enum ParmType pt, char *szParm)
00322 {
00323 m_mapParam.insert(MAP_LONG2STR::value_type(pt, szParm));
00324 }
00325
00326 std::string CMsg::getParam(enum ParmType pt)
00327 {
00328 MAP_LONG2STR::iterator itL2S;
00329 itL2S = m_mapParam.find(pt);
00330 if (itL2S == m_mapParam.end()) {
00331 return "";
00332 }
00333 return itL2S->second;
00334 }
00335
00336
00337 char *CMsg::toString(char *szDmp, size_t iDmpLen)
00338 {
00339 char szTmp[MAXBUFLEN];
00340 int rc;
00341 rc = _snprintf(szDmp, iDmpLen, "Cls:%d Crn:%d Id:%d Type:%d", m_class, m_crn, m_id, m_type);
00342 if (rc < 0) {
00343 return NULL;
00344 }
00345 MAP_LONG2STR::iterator it;
00346 it = m_mapParam.begin();
00347 while (it != m_mapParam.end()) {
00348 long p1 = it->first;
00349 std::string p2 = it->second;
00350 if (p2 == "") {
00351 g_util->log(3, -1, "CMsg::toString(): parm data empty. parm type is %d", p1);
00352 return NULL;
00353 }
00354 rc = _snprintf(szTmp, sizeof(szTmp), " ;PT:%d PC:%s", p1, p2.c_str());
00355 if (rc < 0) {
00356 g_util->log(3, -1, "CMsg::toString(): _snprintf failed. (probably should increase size of szTmp)");
00357 return NULL;
00358 }
00359 if (strlen(szTmp) + strlen(szDmp) + 1 > iDmpLen) {
00360 g_util->log(3, -1, "CMsg::toString(): szDmp is too small "
00361 "(must be inceased to at least %d)", strlen(szTmp) + strlen(szDmp) + 1);
00362 return NULL;
00363 }
00364 strcat(szDmp, szTmp);
00365 it++;
00366 }
00367 return szDmp;
00368 }
00369
00370 int CMsg::init(char *szMsg)
00371 {
00372 char _func[] = "CMsg::init";
00373
00374 if (m_class != MC_INVALID) {
00375 clear();
00376 }
00377
00378 int rc = sscanf(szMsg, "Cls:%d Crn:%d Id:%d Type:%d", &m_class, &m_crn, &m_id, &m_type);
00379 if (rc != 4) {
00380 g_util->log(3, -1, "%s: failed to retrieve 4 vars from msg \"%s\"", _func, szMsg);
00381 return resultFUNC_BAD_PARAMETER;
00382 }
00383
00384
00385 char *szTok = strtok(szMsg, ";");
00386 if (szTok == NULL) {
00387
00388 return resultSUCCESS;
00389 }
00390
00391 enum ParmType pt;
00392 char szTemp[MAXBUFLEN];
00393
00394 while ((szTok = strtok(NULL, ";")) != NULL) {
00395
00396 rc = sscanf(szTok, "PT:%d PC:%s", &pt, szTemp);
00397 if (rc != 2) {
00398 g_util->log(3, -1, "%s: failed to retrieve parm type/data from token \"%s\"", _func, szTok);
00399 return resultFUNC_BAD_PARAMETER;
00400 }
00401 addParam(pt, szTemp);
00402 }
00403 return resultSUCCESS;
00404 }
00405
00406 void CMsg::getMsgTypeString(char *szType, int iMaxLen)
00407 {
00408 switch(m_type) {
00409 case MT_INVALID: strncpy(szType,"INVALID", iMaxLen); break;
00410 case MT_MAKE_CALL: strncpy(szType,"MAKE_CALL", iMaxLen); break;
00411 case MT_ACCEPT_CALL: strncpy(szType,"ACCEPT_CALL", iMaxLen); break;
00412 case MT_ANSWER_CALL: strncpy(szType,"ANSWER_CALL", iMaxLen); break;
00413 case MT_DROPCALL: strncpy(szType,"DROP_CALL", iMaxLen); break;
00414 case MT_RELEASE_CALL: strncpy(szType,"RELEASE_CALL", iMaxLen);break;
00415 case MT_PLAY_WAV: strncpy(szType,"PLAY_WAV", iMaxLen); break;
00416 case MT_PLAY_WAV_FINISHED: strncpy(szType,"PLAY_WAV_FINISHED", iMaxLen);break;
00417 case MT_PLAY_DTMF: strncpy(szType,"PLAY_DTMF", iMaxLen); break;
00418 case MT_PLAY_DTMF_FINISHED:strncpy(szType,"PLAY_DTMF_FINISHED", iMaxLen);break;
00419 default: strncpy(szType,"Unknown", iMaxLen); break;
00420 }
00421 }
00422
00423 }