Merge pull request #2091 from taosdata/hotfix/rpcMaxTime
Hotfix/rpc max time
This commit is contained in:
commit
fead65bf00
|
@ -124,8 +124,7 @@ typedef struct SRpcConn {
|
|||
} SRpcConn;
|
||||
|
||||
int tsRpcMaxUdpSize = 15000; // bytes
|
||||
int tsRpcProgressTime = 10; // milliseocnds
|
||||
|
||||
int tsProgressTimer = 100;
|
||||
// not configurable
|
||||
int tsRpcMaxRetry;
|
||||
int tsRpcHeadSize;
|
||||
|
@ -204,7 +203,8 @@ static void rpcUnlockConn(SRpcConn *pConn);
|
|||
void *rpcOpen(const SRpcInit *pInit) {
|
||||
SRpcInfo *pRpc;
|
||||
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
|
||||
tsProgressTimer = tsRpcTimer/2;
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
|
||||
tsRpcHeadSize = RPC_MSG_OVERHEAD;
|
||||
tsRpcOverhead = sizeof(SRpcReqContext);
|
||||
|
||||
|
@ -420,8 +420,11 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
|||
pConn->rspMsgLen = msgLen;
|
||||
if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
||||
|
||||
SRpcInfo *pRpc = pConn->pRpc;
|
||||
taosTmrStopA(&pConn->pTimer);
|
||||
// taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
|
||||
// set the idle timer to monitor the activity
|
||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||
pConn->secured = 1; // connection shall be secured
|
||||
|
||||
|
@ -683,6 +686,7 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
|
|||
tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
|
||||
}
|
||||
|
||||
pConn->tretry = 0;
|
||||
return pConn;
|
||||
}
|
||||
|
||||
|
@ -748,20 +752,28 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
|
|||
taosTmrStopA(&pConn->pTimer);
|
||||
pConn->retry = 0;
|
||||
|
||||
if (pHead->code == TSDB_CODE_AUTH_REQUIRED && pRpc->spi) {
|
||||
tTrace("%s, authentication shall be restarted", pConn->info);
|
||||
pConn->secured = 0;
|
||||
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
|
||||
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
||||
return TSDB_CODE_ALREADY_PROCESSED;
|
||||
}
|
||||
|
||||
if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
if (pConn->tretry <= tsRpcMaxRetry) {
|
||||
tTrace("%s, peer is still processing the transaction", pConn->info);
|
||||
tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry);
|
||||
pConn->tretry++;
|
||||
rpcSendReqHead(pConn);
|
||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
||||
return TSDB_CODE_ALREADY_PROCESSED;
|
||||
} else {
|
||||
// peer still in processing, give up
|
||||
return TSDB_CODE_TOO_SLOW;
|
||||
tTrace("%s, server processing takes too long time, give up", pConn->info);
|
||||
pHead->code = TSDB_CODE_TOO_SLOW;
|
||||
}
|
||||
}
|
||||
|
||||
pConn->tretry = 0;
|
||||
pConn->outType = 0;
|
||||
pConn->pReqMsg = NULL;
|
||||
pConn->reqMsgLen = 0;
|
||||
|
@ -820,7 +832,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
|||
if ( rpcIsReq(pHead->msgType) ) {
|
||||
terrno = rpcProcessReqHead(pConn, pHead);
|
||||
pConn->connType = pRecv->connType;
|
||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
|
||||
// client shall send the request within tsRpcTime again, put 20 mseconds tolerance
|
||||
taosTmrReset(rpcProcessIdleTimer, tsRpcTimer+20, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
} else {
|
||||
terrno = rpcProcessRspHead(pConn, pHead);
|
||||
}
|
||||
|
@ -935,7 +949,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
|||
|
||||
if ( rpcIsReq(pHead->msgType) ) {
|
||||
rpcMsg.handle = pConn;
|
||||
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
|
||||
(*(pRpc->cfp))(&rpcMsg, NULL);
|
||||
} else {
|
||||
// it's a response
|
||||
|
@ -943,14 +957,12 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
|||
rpcMsg.handle = pContext->ahandle;
|
||||
pConn->pContext = NULL;
|
||||
|
||||
if (pHead->code == TSDB_CODE_AUTH_REQUIRED) {
|
||||
pConn->secured = 0;
|
||||
rpcSendReqToServer(pRpc, pContext);
|
||||
return;
|
||||
}
|
||||
|
||||
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
|
||||
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType);
|
||||
if (pHead->code != TSDB_CODE_TOO_SLOW) {
|
||||
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType);
|
||||
} else {
|
||||
rpcCloseConn(pConn);
|
||||
}
|
||||
|
||||
if (pHead->code == TSDB_CODE_REDIRECT) {
|
||||
pContext->redirect++;
|
||||
|
@ -1039,6 +1051,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
|
|||
pReplyHead->sourceId = pRecvHead->destId;
|
||||
pReplyHead->destId = pRecvHead->sourceId;
|
||||
pReplyHead->linkUid = pRecvHead->linkUid;
|
||||
pReplyHead->ahandle = pRecvHead->ahandle;
|
||||
|
||||
pReplyHead->code = htonl(code);
|
||||
msgLen = sizeof(SRpcHead);
|
||||
|
@ -1095,8 +1108,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
|
|||
pConn->reqMsgLen = msgLen;
|
||||
pConn->pContext = pContext;
|
||||
|
||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||
|
||||
rpcUnlockConn(pConn);
|
||||
}
|
||||
|
@ -1172,7 +1185,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
|
|||
if (pConn->retry < 4) {
|
||||
tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
|
||||
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
|
||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
|
||||
} else {
|
||||
// close the connection
|
||||
tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
|
||||
|
@ -1225,7 +1238,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
|
|||
if (pConn->inType && pConn->user[0]) {
|
||||
tTrace("%s, progress timer expired, send progress", pConn->info);
|
||||
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
|
||||
} else {
|
||||
tTrace("%s, progress timer:%p not processed", pConn->info, tmrId);
|
||||
}
|
||||
|
@ -1357,15 +1370,17 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
|
|||
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
|
||||
// secured link, or no authentication
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
|
||||
// tTrace("%s, secured link, no auth is required", pConn->info);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( !rpcIsReq(pHead->msgType) ) {
|
||||
// for response, if code is auth failure, it shall bypass the auth process
|
||||
code = htonl(pHead->code);
|
||||
if (code==TSDB_CODE_INVALID_TIME_STAMP || code==TSDB_CODE_AUTH_FAILURE ||
|
||||
if (code==TSDB_CODE_INVALID_TIME_STAMP || code==TSDB_CODE_AUTH_FAILURE || code == TSDB_CODE_AUTH_REQUIRED ||
|
||||
code==TSDB_CODE_INVALID_USER || code == TSDB_CODE_NOT_READY) {
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
|
||||
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -1388,12 +1403,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
|
|||
} else {
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
|
||||
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client
|
||||
//tTrace("%s, message is authenticated", pConn->info);
|
||||
// tTrace("%s, message is authenticated", pConn->info);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
|
||||
code = TSDB_CODE_AUTH_FAILURE;
|
||||
code = pHead->spi ? TSDB_CODE_AUTH_FAILURE : TSDB_CODE_AUTH_REQUIRED;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -342,6 +342,7 @@ static void taosTimerLoopFunc(int signo) {
|
|||
int64_t now = taosGetTimestampMs();
|
||||
|
||||
for (int i = 0; i < tListLen(wheels); i++) {
|
||||
tmrTrace("begin processing wheel %d", i);
|
||||
// `expried` is a temporary expire list.
|
||||
// expired timers are first add to this list, then move
|
||||
// to expired queue as a batch to improve performance.
|
||||
|
@ -389,6 +390,7 @@ static void taosTimerLoopFunc(int signo) {
|
|||
}
|
||||
|
||||
addToExpired(expired);
|
||||
tmrTrace("end processing wheel %d", i);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue