From 4c60448baf9438d512dc0c252a955aa8c9bcb99b Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 27 Mar 2020 20:51:51 +0800 Subject: [PATCH] fix the bug for code add the code to handle idle timer --- src/rpc/src/rpcMain.c | 70 +++++++++++++++++++++++++++++++++--------- src/rpc/test/rserver.c | 36 ++++++++-------------- 2 files changed, 68 insertions(+), 38 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index c6f527a7d2..3b0137231f 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -182,6 +182,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); @@ -412,12 +413,12 @@ void rpcSendResponse(SRpcMsg *pMsg) { rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; - if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; rpcUnlockConn(pConn); taosTmrStopA(&pConn->pTimer); - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured @@ -653,8 +654,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { - tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); - rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + if (pHead->code == 0) { + tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + } else { + // do nothing, it is heart beat from client + } } else if (pConn->inType == 0) { tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->inTranId); @@ -695,22 +700,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_INVALID_RESPONSE_TYPE; } - if (*pHead->content == TSDB_CODE_NOT_READY) { + if (pHead->code == TSDB_CODE_NOT_READY) { return TSDB_CODE_ALREADY_PROCESSED; } taosTmrStopA(&pConn->pTimer); pConn->retry = 0; - if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) { + if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - pConn->tretry++; tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->tretry++; + rpcSendReqHead(pConn); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } else { // peer still in processing, give up - *pHead->content = TSDB_CODE_TOO_SLOW; + return TSDB_CODE_TOO_SLOW; } } @@ -771,6 +777,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if ( rpcIsReq(pHead->msgType) ) { terrno = rpcProcessReqHead(pConn, pHead); pConn->connType = pRecv->connType; + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } else { terrno = rpcProcessRspHead(pConn, pHead); } @@ -816,7 +823,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", + tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } @@ -825,7 +832,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { if (terrno != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { rpcSendErrorMsgToPeer(pRecv, terrno); - tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno); + tTrace("%s %p, %s is sent with error code:0x%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -885,7 +892,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead = (SRpcHead *)msg; pHead->version = 1; pHead->msgType = pConn->inType+1; - pHead->spi = 0; + pHead->spi = pConn->spi; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; @@ -894,7 +901,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { memcpy(pHead->user, pConn->user, tListLen(pHead->user)); pHead->code = htonl(code); - rpcSendMsgToPeer(pConn, msg, 0); + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); + pConn->secured = 1; // connection shall be secured +} + +static void rpcSendReqHead(SRpcConn *pConn) { + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgType = pConn->outType; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = pConn->outTranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->code = 1; + + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); } static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { @@ -990,9 +1019,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", + tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); @@ -1070,6 +1099,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->user[0]) { tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); + if (pConn->inType && pRpc->cfp) { + // if there are pending request, notify the app + tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); + SRpcMsg rpcMsg; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; + rpcMsg.handle = pConn; + rpcMsg.msgType = pConn->inType; + rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; + (*(pRpc->cfp))(&rpcMsg); + } rpcCloseConn(pConn); } else { tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 6c5b320809..deb6135cef 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -28,25 +28,23 @@ void *qhandle = NULL; void processShellMsg() { static int num = 0; taos_qall qall; - SRpcMsg *pRpcMsg, rpcMsg; - int type; - - qall = taosAllocateQall(); + SRpcMsg rpcMsg; while (1) { - int numOfMsgs = taosReadAllQitems(qhandle, qall); + int numOfMsgs = taosReadAllQitems(qhandle, &qall); if (numOfMsgs <= 0) { usleep(1000); continue; } tTrace("%d shell msgs are received", numOfMsgs); + sleep(5); for (int i=0; i=0) { - if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { + if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { tPrint("failed to write data file, reason:%s", strerror(errno)); } } @@ -65,22 +63,19 @@ void processShellMsg() { taosResetQitems(qall); for (int i=0; ipCont); - + rpcFreeCont(rpcMsg.pCont); rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; + rpcMsg.handle = rpcMsg.handle; rpcMsg.code = 1; rpcSendResponse(&rpcMsg); - - taosFreeQitem(pRpcMsg); } + taosFreeQitems(qall); } - taosFreeQall(qall); /* SRpcIpSet ipSet; ipSet.numOfIps = 1; @@ -114,21 +109,17 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char } void processRequestMsg(SRpcMsg *pMsg) { - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); + tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); + taosWriteQitem(qhandle, pMsg); } int main(int argc, char *argv[]) { SRpcInit rpcInit; char dataName[20] = "server.data"; + char localIp[40] = "0.0.0.0"; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = "0.0.0.0"; + rpcInit.localIp = localIp; rpcInit.localPort = 7000; rpcInit.label = "SER"; rpcInit.numOfThreads = 1; @@ -154,7 +145,6 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); - ddebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]);