diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 36fc9a7846..6f843d7fc3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -132,6 +132,7 @@ typedef struct SCliReq { uint64_t st; int64_t seq; int32_t sent; //(0: no send, 1: alread sent) + int8_t inSendQ; STransMsg msg; int8_t inRetry; @@ -275,6 +276,8 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); +static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq); + static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq); static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead); static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp); @@ -701,6 +704,7 @@ void cliHandleResp(SCliConn* conn) { tstrerror(code)); } } + removeReqFromSendQ(pReq); code = cliBuildRespFromCont(pReq, &resp, pHead); STraceId* trace = &resp.info.traceId; @@ -1279,7 +1283,7 @@ static void cliHandleException(SCliConn* conn) { bool filterToRmReq(void* h, void* arg) { queue* el = h; SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { + if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) { return true; } return false; @@ -1311,7 +1315,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { QUEUE_REMOVE(h); SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ); - pReq->sent = 1; + pReq->inSendQ = 0; } freeWReqToWQ(&conn->wq, wrapper); @@ -1473,6 +1477,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { totalLen += msgLen; pCliMsg->seq = pConn->seq; + pCliMsg->inSendQ = 1; STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), @@ -1482,6 +1487,8 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); QUEUE_INIT(&pCliMsg->sendQ); QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ); + + pCliMsg->inSendQ = 1; if (j >= batchLimit) { break; } @@ -1491,6 +1498,14 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); + while (!QUEUE_IS_EMPTY(&reqToSend)) { + queue* h = QUEUE_HEAD(&reqToSend); + QUEUE_REMOVE(h); + + SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); + pCliMsg->inSendQ = 0; + } + transRefCliHandle(pConn); return terrno; } @@ -1503,6 +1518,14 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); + while (!QUEUE_IS_EMPTY(&pWreq->node)) { + queue* h = QUEUE_HEAD(&pWreq->node); + QUEUE_REMOVE(h); + + SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); + pCliMsg->inSendQ = 0; + } + freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn)); @@ -2214,11 +2237,21 @@ static void cliAsyncCb(uv_async_t* handle) { if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg); } +static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq) { + if (pReq == NULL || pReq->inSendQ == 0) { + return; + } + QUEUE_REMOVE(&pReq->sendQ); + pReq->inSendQ = 0; +} + static FORCE_INLINE void destroyReq(void* arg) { SCliReq* pReq = arg; if (pReq == NULL) { return; } + + removeReqFromSendQ(pReq); STraceId* trace = &pReq->msg.info.traceId; tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); @@ -2993,6 +3026,7 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { STrans* pInst = pThrd->pInst; if (pReq != NULL) { + removeReqFromSendQ(pReq); if (pResp->code != TSDB_CODE_SUCCESS) { if (cliMayRetry(pConn, pReq, pResp)) { return TSDB_CODE_RPC_ASYNC_IN_PROCESS;