Merge pull request #25524 from taosdata/feat/TD-29823

Feat/TD-29823
This commit is contained in:
Hongze Cheng 2024-04-28 17:51:39 +08:00 committed by GitHub
commit 13cd60f17f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 32 additions and 9 deletions

View File

@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef bool (*RpcNoDelayfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit {
@ -118,6 +119,8 @@ typedef struct SRpcInit {
// fail fast fp
RpcFFfp ffp;
RpcNoDelayfp noDelayFp;
int32_t connLimitNum;
int32_t connLimitLock;
int32_t timeToGetConn;

View File

@ -77,6 +77,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)

View File

@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) {
return false;
}
}
static bool rpcNoDelayMsg(tmsg_t msgType) {
if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
return true;
}
return false;
}
int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
rpcInit.noDelayFp = rpcNoDelayMsg;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->clientRpc = rpcOpen(&rpcInit);

View File

@ -63,6 +63,7 @@ typedef struct {
bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType);
bool (*noDelayFp)(tmsg_t msgType);
int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock

View File

@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp;
pRpc->noDelayFp = pInit->noDelayFp;
pRpc->connLimitNum = pInit->connLimitNum;
if (pRpc->connLimitNum == 0) {
pRpc->connLimitNum = 20;

View File

@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code);
// handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) {
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd);
doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
}
taosMemoryFree(msglist);
@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId;
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) {
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
*pMsg = NULL;
return NULL;
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL;
} else {
@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) {
}
}
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
STransMsg transMsg = {0};
transMsg.contLen = 0;
transMsg.pCont = NULL;
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
transMsg.code = code;
transMsg.msgType = pMsg->msg.msgType + 1;
transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId;
@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) {
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst;
int32_t code = TSDB_CODE_RPC_MAX_SESSIONS;
QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd);
doNotifyApp(pMsg, pThrd, code);
taosMemoryFree(arg);
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {

View File

@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")