merge 3.0

This commit is contained in:
yihaoDeng 2024-09-18 21:15:46 +08:00
parent bffc70cd5a
commit 88c0724468
1 changed files with 14 additions and 6 deletions

View File

@ -405,20 +405,28 @@ void destroyCliConnQTable(SCliConn* conn) {
taosHashCleanup(conn->pQTable); taosHashCleanup(conn->pQTable);
conn->pQTable = NULL; conn->pQTable = NULL;
} }
typedef struct {
int64_t seq;
int32_t msgType;
} SFiterArg;
bool filteBySeq(void* key, void* arg) { bool filteBySeq(void* key, void* arg) {
int64_t* seq = arg; SFiterArg* targ = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->seq == *seq) { if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
return true; return true;
} else { } else {
return false; return false;
} }
} }
int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, SCliReq** pReq) { int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, int32_t msgType, SCliReq** pReq) {
int32_t code = 0; int32_t code = 0;
queue set; queue set;
QUEUE_INIT(&set) QUEUE_INIT(&set)
transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &seq, &set, 1);
SFiterArg arg = {.seq = seq, .msgType = msgType};
transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &arg, &set, 1);
if (QUEUE_IS_EMPTY(&set)) { if (QUEUE_IS_EMPTY(&set)) {
return TSDB_CODE_OUT_OF_RANGE; return TSDB_CODE_OUT_OF_RANGE;
@ -578,7 +586,7 @@ void cliHandleResp(SCliConn* conn) {
} }
return; return;
} }
code = cliGetReqBySeq(conn, seq, &pReq); code = cliGetReqBySeq(conn, seq, pHead->msgType, &pReq);
if (code == TSDB_CODE_OUT_OF_RANGE) { if (code == TSDB_CODE_OUT_OF_RANGE) {
code = cliHandleState_mayCreateAhandle(conn, pHead, &resp); code = cliHandleState_mayCreateAhandle(conn, pHead, &resp);
if (code == 0) { if (code == 0) {