fix the bug for code

add the code to handle idle timer
This commit is contained in:
Jeff Tao 2020-03-27 20:51:51 +08:00
parent eabc5511d1
commit 4c60448baf
2 changed files with 68 additions and 38 deletions

View File

@ -182,6 +182,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
@ -412,12 +413,12 @@ void rpcSendResponse(SRpcMsg *pMsg) {
rpcFreeMsg(pConn->pRspMsg); rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg; pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen; pConn->rspMsgLen = msgLen;
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
pConn->secured = 1; // connection shall be secured pConn->secured = 1; // connection shall be secured
@ -653,8 +654,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->inTranId == pHead->tranId) { if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) { if (pConn->inType == pHead->msgType) {
if (pHead->code == 0) {
tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else {
// do nothing, it is heart beat from client
}
} else if (pConn->inType == 0) { } else if (pConn->inType == 0) {
tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHead->msgType], pConn->inTranId); taosMsg[pHead->msgType], pConn->inTranId);
@ -695,22 +700,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_INVALID_RESPONSE_TYPE; return TSDB_CODE_INVALID_RESPONSE_TYPE;
} }
if (*pHead->content == TSDB_CODE_NOT_READY) { if (pHead->code == TSDB_CODE_NOT_READY) {
return TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} }
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
pConn->retry = 0; pConn->retry = 0;
if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) { if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) {
if (pConn->tretry <= tsRpcMaxRetry) { if (pConn->tretry <= tsRpcMaxRetry) {
pConn->tretry++;
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); pConn->tretry++;
rpcSendReqHead(pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED; return TSDB_CODE_ALREADY_PROCESSED;
} else { } else {
// peer still in processing, give up // peer still in processing, give up
*pHead->content = TSDB_CODE_TOO_SLOW; return TSDB_CODE_TOO_SLOW;
} }
} }
@ -771,6 +777,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
terrno = rpcProcessReqHead(pConn, pHead); terrno = rpcProcessReqHead(pConn, pHead);
pConn->connType = pRecv->connType; pConn->connType = pRecv->connType;
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
} }
@ -816,7 +823,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pConn = rpcProcessMsgHead(pRpc, pRecv); pConn = rpcProcessMsgHead(pRpc, pRecv);
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
} }
@ -825,7 +832,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
if (terrno != 0) { // parsing error if (terrno != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRecv, terrno); rpcSendErrorMsgToPeer(pRecv, terrno);
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno); tTrace("%s %p, %s is sent with error code:0x%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno);
} }
} else { // parsing OK } else { // parsing OK
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
@ -885,7 +892,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead = (SRpcHead *)msg; pHead = (SRpcHead *)msg;
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType+1;
pHead->spi = 0; pHead->spi = pConn->spi;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
@ -894,7 +901,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, 0); rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
pConn->secured = 1; // connection shall be secured
}
static void rpcSendReqHead(SRpcConn *pConn) {
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead;
// set msg header
memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgType = pConn->outType;
pHead->spi = pConn->spi;
pHead->encrypt = 0;
pHead->tranId = pConn->outTranId;
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = 1;
rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
} }
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
@ -990,9 +1019,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
@ -1070,6 +1099,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
if (pConn->user[0]) { if (pConn->user[0]) {
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
if (pConn->inType && pRpc->cfp) {
// if there are pending request, notify the app
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
(*(pRpc->cfp))(&rpcMsg);
}
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId);

View File

@ -28,25 +28,23 @@ void *qhandle = NULL;
void processShellMsg() { void processShellMsg() {
static int num = 0; static int num = 0;
taos_qall qall; taos_qall qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg rpcMsg;
int type;
qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, qall); int numOfMsgs = taosReadAllQitems(qhandle, &qall);
if (numOfMsgs <= 0) { if (numOfMsgs <= 0) {
usleep(1000); usleep(1000);
continue; continue;
} }
tTrace("%d shell msgs are received", numOfMsgs); tTrace("%d shell msgs are received", numOfMsgs);
sleep(5);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &type, (void **)&pRpcMsg); taosGetQitem(qall, &rpcMsg);
if (dataFd >=0) { if (dataFd >=0) {
if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno)); tPrint("failed to write data file, reason:%s", strerror(errno));
} }
} }
@ -65,22 +63,19 @@ void processShellMsg() {
taosResetQitems(qall); taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
taosGetQitem(qall, &type, (void **)&pRpcMsg); rpcFreeCont(rpcMsg.pCont);
rpcFreeCont(pRpcMsg->pCont);
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle; rpcMsg.handle = rpcMsg.handle;
rpcMsg.code = 1; rpcMsg.code = 1;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
} }
taosFreeQitems(qall);
} }
taosFreeQall(qall);
/* /*
SRpcIpSet ipSet; SRpcIpSet ipSet;
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
@ -114,21 +109,17 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
} }
void processRequestMsg(SRpcMsg *pMsg) { void processRequestMsg(SRpcMsg *pMsg) {
SRpcMsg *pTemp; tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen);
taosWriteQitem(qhandle, pMsg);
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
char dataName[20] = "server.data"; char dataName[20] = "server.data";
char localIp[40] = "0.0.0.0";
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0"; rpcInit.localIp = localIp;
rpcInit.localPort = 7000; rpcInit.localPort = 7000;
rpcInit.label = "SER"; rpcInit.label = "SER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
@ -154,7 +145,6 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]); commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
ddebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag;
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);