This commit is contained in:
Jeff Tao 2020-10-07 02:00:38 +00:00
parent 62fe32678f
commit 756aa48077
1 changed files with 17 additions and 18 deletions

View File

@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
}
if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
tDebug("%s, message body is empty, ignore", pConn->info);
return TSDB_CODE_RPC_APP_ERROR;
}
pConn->inTranId = pHead->tranId;
pConn->inType = pHead->msgType;
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
return 0;
}
@ -960,11 +969,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
} else {
terrno = rpcProcessRspHead(pConn, pHead);
if (terrno == 0) {
SRpcReqContext *pContext = pConn->pContext;
*ppContext = pContext;
pConn->pContext = NULL;
}
SRpcReqContext *pContext = pConn->pContext;
*ppContext = pContext;
pConn->pContext = NULL;
}
}
@ -1094,20 +1102,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.ahandle = pConn->ahandle;
if (rpcMsg.contLen > 0) {
rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
} else {
tDebug("%s, message body is empty, ignore", pConn->info);
rpcFreeCont(rpcMsg.pCont);
}
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
} else {
// it's a response
rpcMsg.handle = pContext;