Add RpcNoDelayfp function to handle specific message types

This commit is contained in:
Yihao Deng 2024-04-26 08:10:39 +00:00
parent e678d2508f
commit ac15015cb8
7 changed files with 32 additions and 9 deletions

View File

@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType); typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef bool (*RpcNoDelayfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle); typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit { typedef struct SRpcInit {
@ -118,6 +119,8 @@ typedef struct SRpcInit {
// fail fast fp // fail fast fp
RpcFFfp ffp; RpcFFfp ffp;
RpcNoDelayfp noDelayFp;
int32_t connLimitNum; int32_t connLimitNum;
int32_t connLimitLock; int32_t connLimitLock;
int32_t timeToGetConn; int32_t timeToGetConn;

View File

@ -77,6 +77,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)

View File

@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) {
return false; return false;
} }
} }
static bool rpcNoDelayMsg(tmsg_t msgType) {
if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
return true;
}
return false;
}
int32_t dmInitClient(SDnode *pDnode) { int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp; rpcInit.ffp = dmFailFastFp;
rpcInit.noDelayFp = rpcNoDelayMsg;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);

View File

@ -63,6 +63,7 @@ typedef struct {
bool (*startTimer)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType); bool (*failFastFp)(tmsg_t msgType);
bool (*noDelayFp)(tmsg_t msgType);
int32_t connLimitNum; int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock int8_t connLimitLock; // 0: no lock. 1. lock

View File

@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp; pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp; pRpc->failFastFp = pInit->ffp;
pRpc->noDelayFp = pInit->noDelayFp;
pRpc->connLimitNum = pInit->connLimitNum; pRpc->connLimitNum = pInit->connLimitNum;
if (pRpc->connLimitNum == 0) { if (pRpc->connLimitNum == 0) {
pRpc->connLimitNum = 20; pRpc->connLimitNum = 20;

View File

@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status); static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code);
// handle req from app // handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) {
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL; pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd); doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
} }
taosMemoryFree(msglist); taosMemoryFree(msglist);
@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SMsgList* list = plist->list; SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) { if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId; STraceId* trace = &(*pMsg)->msg.info.traceId;
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) {
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
*pMsg = NULL;
return NULL;
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg; arg->param1 = *pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL; *pMsg = NULL;
} else { } else {
@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
} }
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsg transMsg = {0}; STransMsg transMsg = {0};
transMsg.contLen = 0; transMsg.contLen = 0;
transMsg.pCont = NULL; transMsg.pCont = NULL;
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; transMsg.code = code;
transMsg.msgType = pMsg->msg.msgType + 1; transMsg.msgType = pMsg->msg.msgType + 1;
transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.traceId = pMsg->msg.info.traceId;
@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) {
SCliMsg* pMsg = arg->param1; SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2; SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int32_t code = TSDB_CODE_RPC_MAX_SESSIONS;
QUEUE_REMOVE(&pMsg->q); QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd); doNotifyApp(pMsg, pThrd, code);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {

View File

@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")