Merge remote-tracking branch 'origin/3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-27 09:41:26 +08:00
parent f534731cd9
commit 34f86d9686
3 changed files with 24 additions and 27 deletions

View File

@ -132,6 +132,7 @@ typedef struct SRpcInit {
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t startReadTimer;
void *parent;
} SRpcInit;

View File

@ -263,7 +263,11 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn);
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
static int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq);
int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
@ -911,7 +915,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
code = cliHandleState_mayUpdateState(pThrd, pReq, pConn);
code = cliHandleState_mayUpdateState(pConn, pReq);
(void)addConnToHeapCache(pThrd->connHeapCache, pConn);
(void)transQueuePush(&pConn->reqsToSend, &pReq->q);
@ -1648,9 +1652,10 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
}
}
int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) {
SCliThrd* pThrd = pConn->hostThrd;
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
if (qid == 0) {
return TSDB_CODE_RPC_NO_STATE;
}
@ -1703,7 +1708,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
return;
}
}
code = cliHandleState_mayUpdateState(pThrd, pReq, pConn);
code = cliHandleState_mayUpdateState(pConn, pReq);
}
code = cliSendReq(pConn, pReq);

View File

@ -437,8 +437,13 @@ static int8_t uvValidConn(SSvrConn* pConn) {
static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
int32_t code = 0;
STrans* pInst = pConn->pInst;
int64_t qId = taosHton64(pHead->qid);
if (pHead->msgType == TDMT_SCH_TASK_RELEASE && qId > 0) {
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qId = taosHton64(pHead->qid);
if (qId <= 0) {
tError("conn %p recv release, but invalid qid:%" PRId64 "", pConn, qId);
return TSDB_CODE_RPC_NO_STATE;
}
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
if (p == NULL) {
code = TSDB_CODE_RPC_NO_STATE;
@ -1644,22 +1649,8 @@ void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd) {
}
taosMemoryFree(msg);
}
void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) {
return;
// int32_t code = 0;
// SSvrConn* conn = msg->pConn;
// if (conn->status == ConnAcquire) {
// if (!transQueuePush(&conn->resps, msg)) {
// return;
// }
// uvStartSendRespImpl(msg);
// return;
// } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
// tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn);
// }
void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) { return; }
// destroySmsg(msg);
}
void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) {
// send msg to client
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
@ -1669,16 +1660,16 @@ void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) {
int32_t uvHandleStateReq(SSvrRespMsg* msg) {
int32_t code = 0;
SSvrConn* conn = msg->pConn;
tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn,
msg->msg.info.qId);
int64_t qid = msg->msg.info.qId;
tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, qid);
SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg};
SSvrRegArg* p = taosHashGet(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId));
SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid));
if (p != NULL) {
transFreeMsg(p->msg.pCont);
}
code = taosHashPut(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId), &arg, sizeof(arg));
code = taosHashPut(conn->pQTable, &qid, sizeof(qid), &arg, sizeof(arg));
if (code == 0) tDebug("conn %p register brokenlink callback succ", conn);
return code;
}