From 88c07244688d61c74e700c744e2beaaf241b4081 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 18 Sep 2024 21:15:46 +0800 Subject: [PATCH] merge 3.0 --- source/libs/transport/src/transCli.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8417c89565..16afa3628c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -405,20 +405,28 @@ void destroyCliConnQTable(SCliConn* conn) { taosHashCleanup(conn->pQTable); conn->pQTable = NULL; } + +typedef struct { + int64_t seq; + int32_t msgType; +} SFiterArg; + bool filteBySeq(void* key, void* arg) { - int64_t* seq = arg; - SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); - if (pReq->seq == *seq) { + SFiterArg* targ = arg; + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) { return true; } else { return false; } } -int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, SCliReq** pReq) { +int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, int32_t msgType, SCliReq** pReq) { int32_t code = 0; queue set; QUEUE_INIT(&set) - transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &seq, &set, 1); + + SFiterArg arg = {.seq = seq, .msgType = msgType}; + transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &arg, &set, 1); if (QUEUE_IS_EMPTY(&set)) { return TSDB_CODE_OUT_OF_RANGE; @@ -578,7 +586,7 @@ void cliHandleResp(SCliConn* conn) { } return; } - code = cliGetReqBySeq(conn, seq, &pReq); + code = cliGetReqBySeq(conn, seq, pHead->msgType, &pReq); if (code == TSDB_CODE_OUT_OF_RANGE) { code = cliHandleState_mayCreateAhandle(conn, pHead, &resp); if (code == 0) {