opt parameter

This commit is contained in:
yihaoDeng 2024-09-13 16:30:21 +08:00
parent 290c6f89df
commit 12b08ecc09
1 changed files with 21 additions and 16 deletions

View File

@ -264,7 +264,7 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn); int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn);
int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
@ -435,7 +435,7 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->info.handle = (void*)qid; pResp->info.handle = (void*)qid;
return 0; return 0;
} }
int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) { int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
@ -480,7 +480,7 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
} }
return 0; return 0;
} }
int32_t cliMayHandleState(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp) { int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp) {
int32_t code = 0; int32_t code = 0;
int64_t qId = taosHton64(pHead->qid); int64_t qId = taosHton64(pHead->qid);
if (qId == 0) { if (qId == 0) {
@ -496,15 +496,20 @@ int32_t cliMayHandleState(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp
TMSG_INFO(pHead->msgType)); TMSG_INFO(pHead->msgType));
return 0; return 0;
} }
static FORCE_INLINE void cliConnClearInitUserMsg(SCliConn* conn) {
if (conn->pInitUserReq) {
taosMemoryFree(conn->pInitUserReq);
conn->pInitUserReq = NULL;
}
}
void cliHandleResp(SCliConn* conn) { void cliHandleResp(SCliConn* conn) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
if (conn->pInitUserReq) { cliConnClearInitUserMsg(conn);
taosMemoryFree(conn->pInitUserReq);
conn->pInitUserReq = NULL;
}
cliResetConnTimer(conn); cliResetConnTimer(conn);
SCliReq* pReq = NULL; SCliReq* pReq = NULL;
@ -529,7 +534,7 @@ void cliHandleResp(SCliConn* conn) {
int32_t seq = htonl(pHead->seqNum); int32_t seq = htonl(pHead->seqNum);
STransMsg resp = {0}; STransMsg resp = {0};
if (cliConnMayHandleState_releaseReq(conn, pHead)) { if (cliHandleState_mayHandleReleaseResp(conn, pHead)) {
if (cliMayRecycleConn(conn)) { if (cliMayRecycleConn(conn)) {
return; return;
} }
@ -537,7 +542,7 @@ void cliHandleResp(SCliConn* conn) {
} }
code = cliGetReqBySeq(conn, seq, &pReq); code = cliGetReqBySeq(conn, seq, &pReq);
if (code == TSDB_CODE_OUT_OF_RANGE) { if (code == TSDB_CODE_OUT_OF_RANGE) {
code = cliMayHandleState(conn, pHead, &resp); code = cliHandleState_mayCreateAhandle(conn, pHead, &resp);
if (code == 0) { if (code == 0) {
code = cliBuildRespFromCont(NULL, &resp, pHead); code = cliBuildRespFromCont(NULL, &resp, pHead);
code = cliNotifyCb(conn, NULL, &resp); code = cliNotifyCb(conn, NULL, &resp);
@ -871,7 +876,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
code = cliMayUpdateState(pThrd, pReq, pConn); code = cliHandleState_mayUpdateState(pThrd, pReq, pConn);
addConnToHeapCache(pThrd->connHeapCache, pConn); addConnToHeapCache(pThrd->connHeapCache, pConn);
transQueuePush(&pConn->reqsToSend, &pReq->q); transQueuePush(&pConn->reqsToSend, &pReq->q);
@ -1530,7 +1535,7 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg); taosMemoryFree(arg);
} }
int32_t clConnMayUpdateReqCtx(SCliConn* pConn, SCliReq* pReq) { int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0; int32_t code = 0;
int64_t qid = pReq->msg.info.qId; int64_t qid = pReq->msg.info.qId;
SReqCtx* pCtx = pReq->ctx; SReqCtx* pCtx = pReq->ctx;
@ -1572,7 +1577,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
return 0; return 0;
} }
int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
int32_t code = 0; int32_t code = 0;
int64_t qid = pReq->msg.info.qId; int64_t qid = pReq->msg.info.qId;
if (qid == 0) { if (qid == 0) {
@ -1587,7 +1592,7 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
tDebug("%s conn %p succ to add statue, qid:%ld (1)", transLabel(pThrd->pInst), pConn, qid); tDebug("%s conn %p succ to add statue, qid:%ld (1)", transLabel(pThrd->pInst), pConn, qid);
} }
(void)clConnMayUpdateReqCtx(pConn, pReq); (void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
return code; return code;
} }
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
@ -1600,7 +1605,7 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
SCliConn* pConn = NULL; SCliConn* pConn = NULL;
code = cliMayGetStateByQid(pThrd, pReq, &pConn); code = cliMayGetStateByQid(pThrd, pReq, &pConn);
if (code == 0) { if (code == 0) {
(void)clConnMayUpdateReqCtx(pConn, pReq); (void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
} else if (code == TSDB_CODE_RPC_STATE_DROPED) { } else if (code == TSDB_CODE_RPC_STATE_DROPED) {
STraceId* trace = &pReq->msg.info.traceId; STraceId* trace = &pReq->msg.info.traceId;
tWarn("%s failed to get statue, qid:%ld", pInst->label, pReq->msg.info.qId); tWarn("%s failed to get statue, qid:%ld", pInst->label, pReq->msg.info.qId);
@ -1627,7 +1632,7 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
addConnToHeapCache(pThrd->connHeapCache, pConn); addConnToHeapCache(pThrd->connHeapCache, pConn);
} }
} }
code = cliMayUpdateState(pThrd, pReq, pConn); code = cliHandleState_mayUpdateState(pThrd, pReq, pConn);
} }
code = cliSendReq(pConn, pReq); code = cliSendReq(pConn, pReq);
@ -2017,7 +2022,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
} }
pThrd->pIdConnTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pThrd->pIdConnTable = taosHashInit(512, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pThrd->connHeapCache == NULL) { if (pThrd->connHeapCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
} }