Main Page   Namespace List   Class Hierarchy   Data Structures   File List   Namespace Members   Data Fields   Globals   Related Pages  

MsgTransport.cpp

Go to the documentation of this file.
00001 /*
00002  * MsgTransport.cpp
00003  *
00004  * Copyright 2003, MobileSpear Inc. (www.mobilespear.com). All rights reserved.
00005  * Copyright 2003, David Resnick. All rights reserved.
00006  *
00007  * See the file doc\license.txt for the terms of usage and distribution.
00008  */
00009 
00010 #include <bogotel/Portability.h>
00011 #include <bogotel/MsgTransport.h>
00012 #include <bogotel/BgtRt.h>
00013 #include <bogotel/BgtErrors.h>
00014 #include <bogotel/util.h>
00015 
00016 #include <boost/thread/xtime.hpp>
00017 #include <log4cpp/NDC.hh>
00018 
00019 namespace bogotel {
00020 
00021     #define MAXBUFLEN 500
00022 
00024     // CMsgTransport
00026 
00027     CMsgTransport::CMsgTransport(CBgtRt *pBgtRt) : 
00028     m_usMyPort(0), 
00029     m_usOppPort(0),
00030     m_pBgtRt(pBgtRt),
00031     m_bListenerStarted(false),
00032     m_bStopTalker(false),
00033     m_bStopListener(false)
00034     {
00035 
00036     }
00037 
00038     CMsgTransport::~CMsgTransport()
00039     {
00040         /*
00041         m_bStopListener = true;
00042         m_bStopTalker = true;
00043 
00044         m_pthrdListener->join();
00045         delete m_pthrdListener;
00046 
00047         m_pthrdTalker->join();
00048         delete m_pthrdTalker;
00049         */
00050 
00051         /*
00052         shutdown(m_fdListener, SD_BOTH);
00053         shutdown(m_fdTalker, SD_BOTH);
00054         */
00055     }
00056 
00057     int CMsgTransport::init()
00058     {
00059         char method[] = "CMsgTransport::init()";
00060 
00061         std::string strInstanceNum = g_util->getInstanceNum();
00062 
00063         if ((m_usMyPort = (u_short)g_util->m_propIni.getProperty("MyPort" + strInstanceNum, (unsigned long)0)) == 0) {
00064             g_util->log(3, -1, "%s: No MyPort in ini file", method);
00065             return resultERROR;
00066         }
00067 
00068         if ((m_usOppPort = (u_short)g_util->m_propIni.getProperty("OppPort" + strInstanceNum, (unsigned long)0)) == 0) {
00069             g_util->log(3, -1, "%s: No OppAddress in ini file", method);
00070             return resultERROR;
00071         }
00072 
00073         m_strOppIp = g_util->m_propIni.getProperty("OppAddress" + strInstanceNum, "127.0.0.1");
00074         if (m_strOppIp == "") {
00075             g_util->log(3, -1, "%s: No OppAddress in ini file", method);
00076             return resultERROR;
00077         }
00078 
00079         {
00080             boost::mutex::scoped_lock lock(m_mtxStartup);
00081         
00082             // start the listener on a separate thread
00083             m_pthrdListener = new boost::thread(thread_adapter(&CMsgTransport::do_thread_listen, this));
00084 
00085             boost::xtime xt;
00086             boost::xtime_get(&xt, boost::TIME_UTC);
00087             xt.sec += 25;
00088             if (! m_condStartup.timed_wait(lock, xt, cond_predicate(m_bListenerStarted, true))) {
00089                 // this is taking too long...
00090                 g_util->log(3, -1, "%s: Timed out waiting for listener thread.", method);
00091                 return resultERROR;
00092             }
00093         }
00094 
00095         // start the talker on a separate thread
00096         m_pthrdListener = new boost::thread(thread_adapter(&CMsgTransport::do_thread_talk, this));
00097 
00098         g_util->log(9, -1, "CMsgTransport::init() completed.");
00099         return resultSUCCESS;
00100     }
00101 
00102     void CMsgTransport::do_thread_listen(void* param)
00103     {
00104         log4cpp::NDC::push("ListenThread");
00105         try {
00106             static_cast<CMsgTransport*>(param)->runListener();
00107         } catch(...) {
00108             g_util->log(1, -1, "CMsgTransport::do_thread_listen: Exception");
00109         }
00110     }
00111 
00112     void CMsgTransport::do_thread_talk(void* param)
00113     {
00114         log4cpp::NDC::push("TalkThread");
00115         try {
00116             static_cast<CMsgTransport*>(param)->runTalker();
00117         } catch(...) {
00118             g_util->log(1, -1, "CMsgTransport::do_thread_talk: Exception");
00119         }
00120     }
00121 
00122     void CMsgTransport::runTalker()
00123     {
00124         int rc;
00125         int numbytes;
00126 
00127         if ((rc = openTalkerSocket()) != resultSUCCESS) {
00128             g_util->log(3, -1, "CMsgTransport::talkerRun(): openTalkerSocket() failed. rc = %d", rc);
00129             return;
00130         }
00131 
00132         while (!m_bStopTalker) {
00133             {
00134                 boost::mutex::scoped_lock lock(m_mtxAddedToQueue);
00135                 ContainerNotEmpty<DEQUE_STRING> notEmpty(m_deqMsg);
00136 
00137                 // wait until a new message is posted
00138                 m_condEvent.wait(lock, notEmpty);
00139             }
00140 
00141             while (true) {
00142                 std::string strMsg;
00143 
00144                 // get an item from the queue (if any)
00145                 {
00146                     boost::mutex::scoped_lock lock(m_mtxDeque);
00147                     if (! m_deqMsg.empty()) {
00148                         strMsg = m_deqMsg.front();
00149                         m_deqMsg.pop_front();
00150                     } else {
00151                         // nothing was in the queue, wait for another message
00152                         break;
00153                     }
00154                 }
00155 
00156                 if ((numbytes = sendto(m_fdTalker,
00157                                        strMsg.c_str(),
00158                                        strlen(strMsg.c_str()) + 1,
00159                                        0,
00160                                        (struct sockaddr *)&m_addrTalker,
00161                                        sizeof(struct sockaddr))) == -1) {
00162                     g_util->log(3, -1, "CMsgTransport::talkerRun(): sendto() failed. "
00163                         "::WSAGetLastError() is %d", ::WSAGetLastError());
00164                     return;
00165                 }
00166             }
00167         }
00168     }
00169 
00170     void CMsgTransport::runListener()
00171     {
00172        int rc;
00173        int addr_len, numbytes;
00174        char buf[MAXBUFLEN];
00175 
00176         // open socket for listening
00177         if ((rc = openListenerSocket(&m_addrListener, &m_fdListener, m_usMyPort)) != resultSUCCESS) {
00178             g_util->log(3, -1, "CMsgTransport::listenerRun(): openListenerSocket() failed. rc = %d", rc);
00179             return;
00180         }
00181 
00182         // inform main thread that we've finished initializing successfully
00183         m_bListenerStarted = true;
00184         m_condStartup.notify_one();
00185 
00186         CMsg *pMsg = new CMsg();
00187 
00188         // listen for packets on socket
00189         while (! m_bStopListener) {
00190             addr_len = sizeof(struct sockaddr);
00191             if ((numbytes = recv(m_fdListener, buf, MAXBUFLEN, 0)) == -1) {
00192                 g_util->log(3, -1, "CMsgTransport::listenerRun(): recvfrom() failed. "
00193                     "::WSAGetLastError() is %d", ::WSAGetLastError());
00194                 continue;
00195             }
00196 
00197             if ((rc = pMsg->init(buf)) != resultSUCCESS) {
00198                 g_util->log(3, -1, "CMsgTransport::listenerRun(): CMsg::init() failed. rc is %d", rc);
00199             } else {
00200                 m_pBgtRt->incomingMsg(pMsg);
00201             }
00202         }
00203     }
00204 
00205     int CMsgTransport::openListenerSocket(struct sockaddr_in *pAddr,
00206                                                 int *pFd,
00207                                                 u_short usPort)
00208     {
00209         char method[] = "CMsgTransport::openListenerSocket()";
00210         if (((*pFd) = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
00211             g_util->log(3, -1, "%s: socket() failed. ::WSAGetLastError() is %d", 
00212                 method, ::WSAGetLastError());
00213             return resultSOCKET_ERROR;
00214         }
00215         pAddr->sin_family = AF_INET;         // host byte order 
00216         pAddr->sin_addr.s_addr = INADDR_ANY; // auto-fill with my IP 
00217         memset(&(pAddr->sin_zero), 0, 8);    // zero the rest of the struct 
00218         pAddr->sin_port = htons(usPort);     // short, network byte order 
00219 
00220         if (bind((*pFd), (struct sockaddr *)pAddr, sizeof(struct sockaddr)) != 0) {
00221             // some error, check if the port is in use
00222             int iErr = ::WSAGetLastError();
00223             if (iErr == WSAEADDRINUSE) {
00224                 g_util->log(5, -1, "%s: bind() failed. Socket in use.", method);
00225                 return resultSOCKET_IN_USE;
00226             }
00227             g_util->log(3, -1, "%s: bind() failed. ::WSAGetLastError() is %d", 
00228                 method, ::WSAGetLastError());
00229             return resultSOCKET_ERROR;
00230         }
00231         g_util->log(9, -1, "%s completed.", method);
00232         return resultSUCCESS;
00233     }
00234 
00235     int CMsgTransport::openTalkerSocket()
00236     {
00237         struct hostent *pheTalker = NULL;
00238 
00239         // prepare the address to be talked to
00240         if ((pheTalker = gethostbyname(m_strOppIp.c_str())) == NULL) {  /* get the host info */
00241             g_util->log(3, -1, "CMsgTransport::openTalkerSocket(): gethostbyname() failed. "
00242                 "::WSAGetLastError() is %d", ::WSAGetLastError());
00243             return resultSOCKET_ERROR;
00244         }
00245         m_addrTalker.sin_family = AF_INET;      /* host byte order */
00246         m_addrTalker.sin_port = htons(m_usOppPort);  /* short, network byte order */
00247         m_addrTalker.sin_addr = *((struct in_addr *)pheTalker->h_addr);
00248         memset(&(m_addrTalker.sin_zero), 0, 8);     /* zero the rest of the struct */
00249 
00250         if ((m_fdTalker = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
00251             g_util->log(3, -1, "CMsgTransport::openTalkerSocket(): socket() failed. "
00252                 "::WSAGetLastError() is %d", ::WSAGetLastError());
00253             return resultSOCKET_ERROR;
00254         }
00255 
00256         g_util->log(9, -1, "CMsgTransport::openTalkerSocket() completed.");
00257         return resultSUCCESS;
00258     }
00259 
00260     int CMsgTransport::sendMsg(CMsg *pMsg)
00261     {
00262         char szMsg[MAXBUFLEN];
00263 
00264         if (pMsg->toString(szMsg, sizeof(szMsg)) == NULL){
00265             g_util->log(3, -1, "CMsgTransport::sendMsg(): CMsg::toString() failed. ");
00266             return resultFUNC_BAD_PARAMETER;
00267         }
00268 
00269         // put the string in the queue
00270         {
00271             boost::mutex::scoped_lock lock(m_mtxDeque);
00272             m_deqMsg.push_back(szMsg);
00273         }
00274 
00275         // wake the talker thread up so that it sends the message
00276         m_condEvent.notify_one();
00277 
00278         g_util->log(9, -1, "CMsgTransport::sendMsg() completed.");
00279 
00280         return resultSUCCESS;
00281     }
00282 
00284     // CMsg
00286 
00287     CMsg::CMsg()
00288     {
00289         clear();
00290     }
00291 
00292     CMsg::~CMsg()
00293     {
00294 
00295     }
00296 
00297     void CMsg::clear()
00298     {
00299         m_type = MT_INVALID;
00300         m_class = MC_INVALID;
00301         m_id = -1;
00302         m_crn = INVALID_CRN;
00303         if (! m_mapParam.empty()) {
00304             m_mapParam.clear();
00305         }
00306     }
00307 
00308     void CMsg::init(MessageType type, MessageClass cls, long lId)
00309     {
00310         clear();
00311         m_class = cls;
00312         m_type = type;
00313         m_id = lId;
00314     }
00315 
00316     void CMsg::addParam(enum ParmType pt, std::string strParm)
00317     {
00318         m_mapParam.insert(MAP_LONG2STR::value_type(pt, strParm));
00319     }
00320 
00321     void CMsg::addParam(enum ParmType pt, char *szParm)
00322     {
00323         m_mapParam.insert(MAP_LONG2STR::value_type(pt, szParm));
00324     }
00325 
00326     std::string CMsg::getParam(enum ParmType pt)
00327     {
00328         MAP_LONG2STR::iterator itL2S;
00329         itL2S = m_mapParam.find(pt);
00330         if (itL2S == m_mapParam.end()) {
00331             return "";
00332         }
00333         return itL2S->second;
00334     }
00335 
00336     // serializes the message to a null terminated string
00337     char *CMsg::toString(char *szDmp, size_t iDmpLen)
00338     {
00339         char szTmp[MAXBUFLEN];
00340         int rc;
00341         rc = _snprintf(szDmp, iDmpLen, "Cls:%d Crn:%d Id:%d Type:%d", m_class, m_crn, m_id, m_type);
00342         if (rc < 0) {
00343             return NULL;
00344         }
00345         MAP_LONG2STR::iterator it;
00346         it = m_mapParam.begin();
00347         while (it != m_mapParam.end()) {
00348             long p1 = it->first;
00349             std::string p2 = it->second;
00350             if (p2 == "") {
00351                 g_util->log(3, -1, "CMsg::toString(): parm data empty. parm type is %d", p1);
00352                 return NULL;
00353             }
00354             rc = _snprintf(szTmp, sizeof(szTmp), " ;PT:%d PC:%s", p1, p2.c_str());
00355             if (rc < 0) {
00356                 g_util->log(3, -1, "CMsg::toString(): _snprintf failed. (probably should increase size of szTmp)");
00357                 return NULL;
00358             }
00359             if (strlen(szTmp) + strlen(szDmp) + 1 > iDmpLen) {
00360                 g_util->log(3, -1, "CMsg::toString(): szDmp is too small "
00361                     "(must be inceased to at least %d)", strlen(szTmp) + strlen(szDmp) + 1);
00362                 return NULL;
00363             }
00364             strcat(szDmp, szTmp);
00365             it++;
00366         }
00367         return szDmp;
00368     }
00369 
00370     int CMsg::init(char *szMsg)
00371     {
00372         char _func[] = "CMsg::init";
00373         // first clear the class in case it's being reused
00374         if (m_class != MC_INVALID) {
00375             clear();
00376         }
00377 
00378         int rc = sscanf(szMsg, "Cls:%d Crn:%d Id:%d Type:%d", &m_class, &m_crn, &m_id, &m_type);
00379         if (rc != 4) {
00380             g_util->log(3, -1, "%s: failed to retrieve 4 vars from msg \"%s\"", _func, szMsg);
00381             return resultFUNC_BAD_PARAMETER;
00382         }
00383 
00384         // check for parameters
00385         char *szTok = strtok(szMsg, ";");
00386         if (szTok == NULL) {
00387             // there are no parameters, return
00388             return resultSUCCESS;
00389         }
00390 
00391         enum ParmType pt;
00392         char szTemp[MAXBUFLEN];
00393         // get the next token and discard the initial token (it was already handled above)
00394         while ((szTok = strtok(NULL, ";")) != NULL) {
00395             // get the parm type and data from the token
00396             rc = sscanf(szTok, "PT:%d PC:%s", &pt, szTemp);
00397             if (rc != 2) {
00398                 g_util->log(3, -1, "%s: failed to retrieve parm type/data from token \"%s\"", _func, szTok);
00399                 return resultFUNC_BAD_PARAMETER;
00400             }
00401             addParam(pt, szTemp);
00402         }
00403         return resultSUCCESS;
00404     }
00405 
00406     void CMsg::getMsgTypeString(char *szType, int iMaxLen)
00407     {
00408         switch(m_type) {
00409         case MT_INVALID:           strncpy(szType,"INVALID", iMaxLen);     break;
00410         case MT_MAKE_CALL:         strncpy(szType,"MAKE_CALL", iMaxLen);   break;
00411         case MT_ACCEPT_CALL:       strncpy(szType,"ACCEPT_CALL", iMaxLen); break;
00412         case MT_ANSWER_CALL:       strncpy(szType,"ANSWER_CALL", iMaxLen); break;
00413         case MT_DROPCALL:          strncpy(szType,"DROP_CALL", iMaxLen);   break;
00414         case MT_RELEASE_CALL:      strncpy(szType,"RELEASE_CALL", iMaxLen);break;
00415         case MT_PLAY_WAV:          strncpy(szType,"PLAY_WAV", iMaxLen);    break;
00416         case MT_PLAY_WAV_FINISHED: strncpy(szType,"PLAY_WAV_FINISHED", iMaxLen);break;
00417         case MT_PLAY_DTMF:         strncpy(szType,"PLAY_DTMF", iMaxLen);    break;
00418         case MT_PLAY_DTMF_FINISHED:strncpy(szType,"PLAY_DTMF_FINISHED", iMaxLen);break;
00419         default:                   strncpy(szType,"Unknown", iMaxLen);     break;
00420         }
00421     }
00422 
00423 }

Generated on Tue Aug 12 12:41:30 2003 for bogotel by doxygen 1.3. Hosted by SourceForge.net Logo