From 13b1e5ee4ec5c460199bb6743195335192de12b3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 11 Sep 2024 22:17:38 +0800 Subject: [PATCH] opt parameter --- source/libs/transport/inc/transComm.h | 6 +- source/libs/transport/src/transCli.c | 164 ++++++++++++++++---------- source/libs/transport/src/transComm.c | 17 +-- 3 files changed, 113 insertions(+), 74 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6fd3e830ae..dabed050ec 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -94,6 +94,10 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) +typedef struct { + queue q; +} queueWrapper; + // #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit // #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms) @@ -364,7 +368,7 @@ void transReqQueueClear(queue* q); // queue sending msgs typedef struct { queue node; - void (*freeFunc)(const void* arg); + void (*freeFunc)(void* arg); int32_t size; } STransQueue; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 63ef272533..d5ec6e7e33 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -274,7 +274,7 @@ static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq); static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq); static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq); static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq); -static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq); +static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq) { return; } static void cliDoReq(queue* h, SCliThrd* pThrd); static void cliDoBatchReq(queue* h, SCliThrd* pThrd); @@ -393,17 +393,30 @@ void cliResetConnTimer(SCliConn* conn) { void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } -int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { - int32_t code = 0; - for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) { - SCliReq* p = transQueueGet(&conn->reqsSentOut, i); - if (p->seq == seq) { - transQueueRm(&conn->reqsSentOut, i); - *pReq = p; - return 0; - } +bool filteBySeq(void* key, void* arg) { + int32_t* seq = arg; + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + if (pReq->seq == *seq) { + return true; + } else { + return false; } - return TSDB_CODE_OUT_OF_RANGE; +} +int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { + int32_t code = 0; + queueWrapper set; + QUEUE_INIT(&set.q) + transQueueRemoveByFilter(&conn->reqsSentOut, filteBySeq, &seq, &set, 1); + + if (QUEUE_IS_EMPTY(&set.q)) { + return TSDB_CODE_OUT_OF_RANGE; + } + + queue* e = QUEUE_HEAD(&set.q); + SCliReq* p = QUEUE_DATA(e, SCliReq, q); + + *pReq = p; + return 0; } int8_t cliMayRecycleConn(SCliConn* conn) { @@ -468,13 +481,13 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) { tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend), transQueueSize(&conn->reqsSentOut)); - queue set; - QUEUE_INIT(&set); + queueWrapper set; + QUEUE_INIT(&set.q); transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1); transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1); - while (!QUEUE_IS_EMPTY(&set)) { - queue* el = QUEUE_HEAD(&set); + while (!QUEUE_IS_EMPTY(&set.q)) { + queue* el = QUEUE_HEAD(&set.q); SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); QUEUE_REMOVE(el); if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { @@ -1105,7 +1118,14 @@ _exception: // free conn return code; } - +void cliDestroyMsg(void* arg) { + queue* e = arg; + SCliReq* pReq = QUEUE_DATA(e, SCliReq, q); + if (pReq->msg.info.notFreeAhandle == 0) { + taosMemoryFree(pReq->ctx->ahandle); + } + destroyReq(pReq); +} static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int32_t port) { int32_t code = 0; int32_t lino = 0; @@ -1129,7 +1149,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int transReqQueueInit(&conn->wreqQueue); - TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, NULL), NULL, _failed); + TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, cliDestroyMsg), NULL, _failed); + TAOS_CHECK_GOTO(transQueueInit(&conn->reqsSentOut, cliDestroyMsg), NULL, _failed); TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); @@ -1287,15 +1308,25 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { transUnrefCliHandle(conn); } +bool fileToRmReq(void* h, void* arg) { + queue* el = h; + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { + return true; + } + return false; +} static void cliConnRmReqs(SCliConn* conn) { - // for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) { - // SCliReq* pReq = transQueueGet(&conn->reqsSentOut, i); - // if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { - // transQueueRm(&conn->reqsToSend, i); - // destroyReq(pReq); - // i--; - // } - // } + queueWrapper set; + QUEUE_INIT(&set.q); + + transQueueRemoveByFilter(&conn->reqsSentOut, fileToRmReq, NULL, &set, -1); + while (!QUEUE_IS_EMPTY(&set.q)) { + queue* el = QUEUE_HEAD(&set.q); + QUEUE_REMOVE(el); + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + destroyReq(pReq); + } return; } @@ -1382,7 +1413,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) { STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - transQueuePush(&pConn->reqsSentOut, pCliMsg->q); + + transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); } if (j == 0) { taosMemoryFree(wb); @@ -1399,7 +1431,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; - transQueuePush(&pConn->reqsToSend, pCliMsg); + transQueuePush(&pConn->reqsToSend, &pCliMsg->q); if (pConn->connnected) { code = cliSend2(pConn); } else { @@ -1496,11 +1528,11 @@ _exception2: static void cliHandleFastFail_resp(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - SCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0); + // //kSCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0); - STraceId* trace = &pReq->msg.info.traceId; - tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); + // STraceId* trace = &pReq->msg.info.traceId; + // tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + // TMSG_INFO(pReq->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); } static void cliHandleFastFail_noresp(SCliConn* pConn, int status) { @@ -1616,31 +1648,32 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) { (void)uv_walk(pThrd->loop, cliWalkCb, NULL); } static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { - int64_t refId = (int64_t)(pReq->msg.info.handle); - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); - if (exh == NULL) { - tDebug("%" PRId64 " already released", refId); - destroyReq(pReq); - return; - } + return; + // int64_t refId = (int64_t)(pReq->msg.info.handle); + // SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + // if (exh == NULL) { + // tDebug("%" PRId64 " already released", refId); + // destroyReq(pReq); + // return; + // } - taosRLockLatch(&exh->latch); - SCliConn* conn = exh->handle; - taosRUnLockLatch(&exh->latch); + // taosRLockLatch(&exh->latch); + // SCliConn* conn = exh->handle; + // taosRUnLockLatch(&exh->latch); - (void)transReleaseExHandle(transGetRefMgt(), refId); - tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); + // (void)transReleaseExHandle(transGetRefMgt(), refId); + // tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); - if (TRANS_CONN_REF_GET(conn) == 2) { - transUnrefCliHandle(conn); - if (!transQueuePush(&conn->reqsToSend, pReq)) { - return; - } - (void)cliSend2(conn); - } else { - tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); - destroyReq(pReq); - } + // if (TRANS_CONN_REF_GET(conn) == 2) { + // transUnrefCliHandle(conn); + // if (!transQueuePush(&conn->reqsToSend, &pReq->q)) { + // return; + // } + // (void)cliSend2(conn); + // } else { + // tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); + // destroyReq(pReq); + // } } static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; } @@ -2111,31 +2144,32 @@ static void cliAsyncCb(uv_async_t* handle) { } void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { - transCtxCleanup(&conn->ctx); - // cliReleaseUnfinishedMsg(conn); + // transCtxCleanup(&conn->ctx); + // cliReleaseUnfinishedMsg(conn); if (destroy == 1) { transQueueDestroy(&conn->reqsToSend); } else { transQueueClear(&conn->reqsToSend); } + transQueueDestroy(&conn->reqsSentOut); } void cliConnFreeMsgs(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) { - SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i); - if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { - continue; - } + // for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) { + // SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i); + // if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { + // continue; + // } - if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) { - continue; - } + // if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) { + // continue; + // } - cmsg->ctx->ahandle = NULL; - } + // cmsg->ctx->ahandle = NULL; + // } } static FORCE_INLINE void destroyReq(void* arg) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 64ca99ad46..e85ce2091a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -439,7 +439,7 @@ void transReqQueueClear(queue* q) { int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) { QUEUE_INIT(&wq->node); - wq->freeFunc = (void (*)(const void*))freeFunc; + wq->freeFunc = (void (*)(void*))freeFunc; wq->size = 0; return 0; } @@ -453,9 +453,10 @@ int32_t transQueuePush(STransQueue* q, void* arg) { void* transQueuePop(STransQueue* q) { if (q->size == 0) return NULL; - queue* tail = QUEUE_HEAD(&q->node); - QUEUE_REMOVE(tail); - return tail; + queue* head = QUEUE_HEAD(&q->node); + QUEUE_REMOVE(head); + q->size--; + return head; } int32_t transQueueSize(STransQueue* q) { return q->size; } @@ -470,14 +471,14 @@ void* transQueueGet(STransQueue* q, int idx) { } void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) { - queue* d = dst; - queue* node = QUEUE_NEXT(&q->node); + queueWrapper* d = dst; + queue* node = QUEUE_NEXT(&q->node); while (node != &q->node) { queue* next = QUEUE_NEXT(node); if (filter(node, arg)) { QUEUE_REMOVE(node); q->size--; - QUEUE_PUSH(d, node); + QUEUE_PUSH(&d->q, node); if (--size == 0) { break; } @@ -516,7 +517,7 @@ void transQueueRemove(STransQueue* q, void* e) { bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; } void transQueueClear(STransQueue* q) { - while (!QUEUE_IS_EMPTY(q->node)) { + while (!QUEUE_IS_EMPTY(&q->node)) { queue* h = QUEUE_HEAD(&q->node); QUEUE_REMOVE(h); if (q->freeFunc != NULL) (q->freeFunc)(h);