From f570ac22cffb28b97b312bba4dce1f7a514b7c63 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Feb 2023 18:48:12 +0800 Subject: [PATCH 1/6] fix: fix invalid read-write --- source/libs/transport/src/transCli.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7e1aeafaad..e470d989a7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; - conn->broken = 0; + conn->broken = false; transRefCliHandle(conn); atomic_add_fetch_32(&pThrd->connCount, 1); @@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) { taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + if (pThrd->quit == true) { + cliDestroyBatch(pBatch); + return; + } + if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { return; } @@ -1087,12 +1092,15 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); - - if (nxtBatch != NULL) { - conn->pBatch = nxtBatch; - cliSendBatch(conn); + if (conn->broken != true) { + if (nxtBatch != NULL) { + conn->pBatch = nxtBatch; + cliSendBatch(conn); + } else { + addConnToPool(thrd->pool, conn); + } } else { - addConnToPool(thrd->pool, conn); + // release by other callback } } From 328cde55d5e8161d049de705305ba509eef4dc78 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Feb 2023 18:57:53 +0800 Subject: [PATCH 2/6] fix: fix invalid read-write --- source/libs/transport/src/transCli.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e470d989a7..0ede6f9b2d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1087,7 +1087,9 @@ static void cliSendBatchCb(uv_write_t* req, int status) { if (status != 0) { tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize, uv_err_name(status)); - cliHandleExcept(conn); + + if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn); + cliHandleBatchReq(nxtBatch, thrd); } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, @@ -1100,7 +1102,8 @@ static void cliSendBatchCb(uv_write_t* req, int status) { addConnToPool(thrd->pool, conn); } } else { - // release by other callback + cliDestroyBatch(nxtBatch); + // conn release by other callback } } From 2bd38f453cd78c3117f16248066c4d1496933dca Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Feb 2023 19:13:29 +0800 Subject: [PATCH 3/6] fix: fix invalid read-write --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0ede6f9b2d..65e16eb618 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1094,7 +1094,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); - if (conn->broken != true) { + if (!uv_is_closing((uv_handle_t*)&conn->stream)) { if (nxtBatch != NULL) { conn->pBatch = nxtBatch; cliSendBatch(conn); From 409e9a9ac8f6d350be10705f57582407aef7698a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Feb 2023 20:45:16 +0800 Subject: [PATCH 4/6] fix: fix invalid read-write --- source/libs/transport/src/transCli.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 65e16eb618..9c3ca20387 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1465,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + if (pMsg->type == Quit) { + pThrd->stopMsg = pMsg; + continue; + } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; @@ -1496,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + if (pMsg->type == Quit) { + pThrd->stopMsg = pMsg; + continue; + } + if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { STransConnCtx* pCtx = pMsg->ctx; From 8fa55d38446e3d7dd6bc4dd35c7884fad7450495 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Feb 2023 21:40:24 +0800 Subject: [PATCH 5/6] fix: fix invalid read-write --- source/libs/transport/src/transCli.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9c3ca20387..b3b8e8b230 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1604,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) { SCliThrd* pThrd = item->pThrd; STrans* pTransInst = pThrd->pTransInst; - SCliMsg* pMsg = NULL; // batch process to avoid to lock/unlock frequently queue wq; taosThreadMutexLock(&item->mtx); From 487ca893b1cdd6dbaf3064873a3d001f4be04c77 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Feb 2023 22:41:49 +0800 Subject: [PATCH 6/6] fix: fix invalid read-write --- source/libs/transport/src/transCli.c | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b3b8e8b230..9e74825a1f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2306,22 +2306,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } - /*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { - char key[TSDB_FQDN_LEN + 64] = {0}; - char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet); - uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet); - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); - if (val != NULL && *val >= pTransInst->connLimitNum) { - transFreeMsg(pReq->pCont); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_MAX_SESSIONS; - } - }*/ TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); - STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle;