opt transport

This commit is contained in:
yihaoDeng 2024-09-14 09:47:31 +08:00
parent ff60d50cbc
commit 45bb65336c
2 changed files with 34 additions and 28 deletions

View File

@ -601,7 +601,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
// tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH, CFG_DYN_NONE));

View File

@ -231,9 +231,7 @@ static void cliDestroyBatch(SCliBatch* pBatch);
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx);
static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* resp);
// static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* thrd, SCliReq* pReq, STransMsg* resp);
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr);
static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
@ -519,6 +517,7 @@ void cliHandleResp(SCliConn* conn) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb
ASSERT(0);
pThrd->notifyExceptCb(pThrd, NULL, NULL);
return;
}
@ -1411,7 +1410,12 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) {
(void)uv_walk(pThrd->loop, cliWalkCb, NULL);
}
static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; }
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; }
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) {
SReqCtx* pCtx = pReq->ctx;
pThrd->cvtAddr = pCtx->cvtAddr;
destroyReq(pReq);
return;
}
FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) {
if (pCvtAddr->cvt == false) {
@ -1437,33 +1441,33 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx) {
return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true;
}
FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* pResp) {
FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMsg* pResp) {
if (pReq == NULL) return -1;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
// resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL);
pResp->msgType = pReq ? pReq->msg.msgType + 1 : 0;
pResp->info.cliVer = pInst->compatibilityVer;
pResp->info.ahandle = pCtx ? pCtx->ahandle : 0;
if (pReq) {
pResp->info.traceId = pReq->msg.info.traceId;
}
// handle noresp and inter manage msg
if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) {
destroyReq(pReq);
return 0;
}
if (pResp->code == 0) {
pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
}
pResp->msgType = pReq->msg.msgType + 1;
pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL;
pResp->info.traceId = pReq->msg.info.traceId;
return 0;
}
// FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) {
// STrans* pInst = pThrd->pInst;
// STransMsg resp = {.code = code};
// code = cliBuildExceptResp(pReq, &resp);
// if (code != 0) {
// return code;
// }
// resp.info.cliVer = pInst->compatibilityVer;
// pInst->cfp(pInst->parent, &resp, NULL);
// return 0;
// }
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) {
int32_t code = 0;
uint32_t addr = 0;
@ -1641,6 +1645,8 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
_exception:
resp.code = code;
STraceId* trace = &pReq->msg.info.traceId;
tGWarn("%s failed to process req, reason:%s", pInst->label, tstrerror(code));
(void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);
return;
}
@ -1923,14 +1929,14 @@ int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr);
}
int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
STrans* pInst = ((SCliThrd*)thrd)->pInst;
int32_t code = cliBuildExceptResp(pReq, pResp);
SCliThrd* pThrd = thrd;
STrans* pInst = pThrd->pInst;
int32_t code = cliBuildExceptResp(pThrd, pReq, pResp);
if (code != 0) {
return code;
}
pResp->info.cliVer = pInst->compatibilityVer;
pInst->cfp(pInst->parent, pResp, NULL);
destroyReq(pReq);
return code;
}
@ -3122,7 +3128,7 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea
static FORCE_INLINE int8_t shouldSWitchToOtherConn(int32_t reqNum, int32_t sentNum, int32_t stateNum) {
int32_t total = reqNum + sentNum + stateNum;
if (total >= 16) {
if (total >= 4) {
return 1;
}