commit
5e33854b37
|
@ -721,7 +721,7 @@ int32_t dnodeGetDnodeId() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
|
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
|
||||||
SRpcConnInfo connInfo;
|
SRpcConnInfo connInfo = {0};
|
||||||
rpcGetConnInfo(rpcMsg->handle, &connInfo);
|
rpcGetConnInfo(rpcMsg->handle, &connInfo);
|
||||||
|
|
||||||
SRpcIpSet ipSet = {0};
|
SRpcIpSet ipSet = {0};
|
||||||
|
|
|
@ -121,8 +121,8 @@ void mnodeCleanupSystem() {
|
||||||
dnodeFreeMnodeWqueue();
|
dnodeFreeMnodeWqueue();
|
||||||
dnodeFreeMnodeRqueue();
|
dnodeFreeMnodeRqueue();
|
||||||
dnodeFreeMnodePqueue();
|
dnodeFreeMnodePqueue();
|
||||||
mnodeCleanupTimer();
|
|
||||||
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
|
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
|
||||||
|
mnodeCleanupTimer();
|
||||||
|
|
||||||
mPrint("mnode is cleaned up");
|
mPrint("mnode is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
|
@ -819,6 +819,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
tTrace("%s, authentication shall be restarted", pConn->info);
|
tTrace("%s, authentication shall be restarted", pConn->info);
|
||||||
pConn->secured = 0;
|
pConn->secured = 0;
|
||||||
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
|
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
|
||||||
|
if (pConn->connType != RPC_CONN_TCPC)
|
||||||
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
||||||
return TSDB_CODE_RPC_ALREADY_PROCESSED;
|
return TSDB_CODE_RPC_ALREADY_PROCESSED;
|
||||||
}
|
}
|
||||||
|
@ -828,6 +829,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
|
tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
|
||||||
pConn->tretry++;
|
pConn->tretry++;
|
||||||
rpcSendReqHead(pConn);
|
rpcSendReqHead(pConn);
|
||||||
|
if (pConn->connType != RPC_CONN_TCPC)
|
||||||
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
||||||
return TSDB_CODE_RPC_ALREADY_PROCESSED;
|
return TSDB_CODE_RPC_ALREADY_PROCESSED;
|
||||||
} else {
|
} else {
|
||||||
|
@ -896,8 +898,12 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
terrno = rpcProcessReqHead(pConn, pHead);
|
terrno = rpcProcessReqHead(pConn, pHead);
|
||||||
pConn->connType = pRecv->connType;
|
pConn->connType = pRecv->connType;
|
||||||
|
|
||||||
// client shall send the request within tsRpcTime again, double it
|
// stop idle timer
|
||||||
taosTmrReset(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
taosTmrStopA(&pConn->pIdleTimer);
|
||||||
|
|
||||||
|
// client shall send the request within tsRpcTime again for UDP, double it
|
||||||
|
if (pConn->connType != RPC_CONN_TCPS)
|
||||||
|
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
|
||||||
} else {
|
} else {
|
||||||
terrno = rpcProcessRspHead(pConn, pHead);
|
terrno = rpcProcessRspHead(pConn, pHead);
|
||||||
}
|
}
|
||||||
|
@ -1024,6 +1030,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
rpcAddRef(pRpc); // add the refCount for requests
|
rpcAddRef(pRpc); // add the refCount for requests
|
||||||
|
|
||||||
// start the progress timer to monitor the response from server app
|
// start the progress timer to monitor the response from server app
|
||||||
|
if (pConn->connType != RPC_CONN_TCPS)
|
||||||
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
|
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
|
||||||
|
|
||||||
// notify the server app
|
// notify the server app
|
||||||
|
@ -1056,9 +1063,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
for (int i=0; i<pContext->ipSet.numOfIps; ++i)
|
for (int i=0; i<pContext->ipSet.numOfIps; ++i)
|
||||||
pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]);
|
pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]);
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
|
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
|
||||||
pContext->code = pHead->code;
|
pContext->code = pHead->code;
|
||||||
rpcProcessConnError(pContext, NULL);
|
rpcProcessConnError(pContext, NULL);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
} else {
|
} else {
|
||||||
rpcNotifyClient(pContext, &rpcMsg);
|
rpcNotifyClient(pContext, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
@ -1187,6 +1196,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
|
||||||
pConn->pContext = pContext;
|
pConn->pContext = pContext;
|
||||||
|
|
||||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||||
|
if (pConn->connType != RPC_CONN_TCPC)
|
||||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
|
|
Loading…
Reference in New Issue