From 65a5dad3cbd6632f59abd26b752488e8e0fc8715 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 11 Sep 2024 20:18:34 +0800 Subject: [PATCH] opt transport --- source/libs/transport/inc/transComm.h | 6 +- source/libs/transport/src/transCli.c | 364 ++++++-------------------- source/libs/transport/src/transComm.c | 24 +- source/libs/transport/src/transSvr.c | 140 +++++----- 4 files changed, 173 insertions(+), 361 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 19322c1327..6fd3e830ae 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -372,7 +372,7 @@ typedef struct { * init queue * note: queue'size is small, default 1 */ -int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); +int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(void* arg)); /* * put arg into queue @@ -396,6 +396,10 @@ void* transQueueGet(STransQueue* queue, int i); */ void* tranQueueHead(STransQueue* q); +/* + * remove all match elm from queue + */ +void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size); /* * rm ith from queue */ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7c09082e5b..63ef272533 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -65,7 +65,8 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; - STransQueue reqs; + STransQueue reqsToSend; + STransQueue reqsSentOut; SHashObj* pQueryTable; queue q; @@ -340,57 +341,8 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pInst))->label) -// #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ -// do { \ -// int i = 0, sz = transQueueSize(&conn->reqs); \ -// for (; i < sz; i++) { \ -// pReq = transQueueGet(&conn->reqs, i); \ -// if (pReq->ctx != NULL && (uint64_t)pReq->ctx->ahandle == ahandle) { \ -// break; \ -// } \ -// } \ -// if (i == sz) { \ -// pReq = NULL; \ -// } else { \ -// pReq = transQueueRm(&conn->reqs, i); \ -// } \ -// } while (0) - -// #define CONN_GET_NEXT_SENDMSG(conn) \ -// do { \ -// int i = 0; \ -// do { \ -// pCliMsg = transQueueGet(&conn->reqs, i++); \ -// if (pCliMsg && 0 == pCliMsg->sent) { \ -// break; \ -// } \ -// } while (pCliMsg != NULL); \ -// if (pCliMsg == NULL) { \ -// goto _RETURN; \ -// } \ -// } while (0) - -// static int32_t cliConnFindToSendMsg(SCliConn* pConn, SCliReq** pReq) { -// int32_t code = 0; -// for (int32_t i = 0; i < transQueueSize(&pConn->reqs); i++) { -// SCliReq* p = transQueueGet(&pConn->reqs, i); -// if (p->sent == 0) { -// *pReq = p; -// return 0; -// } -// } -// return TSDB_CODE_OUT_OF_RANGE; -// } -// #define CONN_SET_PERSIST_BY_APP(conn) \ -// do { \ -// if (conn->status == ConnNormal) { \ -// conn->status = ConnAcquire; \ -// transRefCliHandle(conn); \ -// } \ -// } while (0) - -#define CONN_NO_PERSIST_BY_APP(conn) \ - (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1) +// #define CONN_NO_PERSIST_BY_APP(conn) \ +// (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1) // #define CONN_RELEASE_BY_SERVER(conn) \ // (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1) @@ -412,64 +364,6 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); static void* cliWorkThread(void* arg); -// static void cliReleaseUnfinishedMsg(SCliConn* conn) { -// SCliThrd* pThrd = conn->hostThrd; - -// for (int i = 0; i < transQueueSize(&conn->reqs); i++) { -// SCliReq* msg = transQueueGet(&conn->reqs, i); -// if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { -// if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { -// conn->ctx.freeFunc(msg->ctx->ahandle); -// } else if (msg->msg.info.notFreeAhandle == 0 && msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) { -// tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); -// pThrd->destroyAhandleFp(msg->ctx->ahandle); -// } -// } -// destroyReq(msg); -// } -// transQueueClear(&conn->reqs); -// memset(&conn->ctx, 0, sizeof(conn->ctx)); -// } -// bool cliMaySendCachedMsg(SCliConn* conn) { -// if (!transQueueEmpty(&conn->reqs)) { -// SCliReq* pCliMsg = NULL; -// CONN_GET_NEXT_SENDMSG(conn); -// (void)cliSend2(conn); -// return true; -// } -// return false; -// _RETURN: -// return false; -// } -// bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { -// if (refId == 0) return false; -// SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); -// if (exh == NULL) { -// tDebug("release conn %p, refId: %" PRId64 "", conn, refId); -// return false; -// } -// taosWLockLatch(&exh->latch); -// if (exh->handle == NULL) exh->handle = conn; -// exh->inited = 1; -// exh->pThrd = conn->hostThrd; -// if (!QUEUE_IS_EMPTY(&exh->q)) { -// queue* h = QUEUE_HEAD(&exh->q); -// QUEUE_REMOVE(h); -// taosWUnLockLatch(&exh->latch); -// SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); -// transCtxMerge(&conn->ctx, &t->ctx->userCtx); -// (void)transQueuePush(&conn->reqs, t); -// tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); -// (void)transReleaseExHandle(transGetRefMgt(), refId); -// (void)cliSend2(conn); -// return true; -// } -// taosWUnLockLatch(&exh->latch); -// tDebug("empty conn %p, refId: %" PRId64 "", conn, refId); -// (void)transReleaseExHandle(transGetRefMgt(), refId); -// return false; -// } - int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) { uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -501,10 +395,10 @@ void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int32_t code = 0; - for (int i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* p = transQueueGet(&conn->reqs, i); + for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) { + SCliReq* p = transQueueGet(&conn->reqsSentOut, i); if (p->seq == seq) { - transQueueRm(&conn->reqs, i); + transQueueRm(&conn->reqsSentOut, i); *pReq = p; return 0; } @@ -514,7 +408,8 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int8_t cliMayRecycleConn(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - if (transQueueSize(&conn->reqs) == 0 && taosHashGetSize(conn->pQTable) == 0) { + if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 && + taosHashGetSize(conn->pQTable) == 0) { (void)delConnFromHeapCache(pThrd->connHeapCache, conn); addConnToPool(pThrd->pool, conn); return 1; @@ -522,6 +417,16 @@ int8_t cliMayRecycleConn(SCliConn* conn) { return 0; } +bool filterByQid(void* key, void* arg) { + int64_t* qid = arg; + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + + if (pReq->msg.info.qId == *qid) { + return true; + } else { + return false; + } +} int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) { pResp->contLen = transContLenFromMsg(pHead->msgLen); pResp->pCont = transContFromHead((char*)pHead); @@ -560,18 +465,22 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) { tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId); } - tDebug("%s %p req size:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqs)); - for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* pReq = transQueueGet(&conn->reqs, i); - if (pReq->msg.info.qId == qId) { - transQueueRm(&conn->reqs, i); + tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend), + transQueueSize(&conn->reqsSentOut)); - if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pReq->ctx->ahandle); - } - destroyReq(pReq); - i--; + queue set; + QUEUE_INIT(&set); + transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1); + transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1); + + while (!QUEUE_IS_EMPTY(&set)) { + queue* el = QUEUE_HEAD(&set); + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + QUEUE_REMOVE(el); + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + pThrd->destroyAhandleFp(pReq->ctx->ahandle); } + destroyReq(pReq); } taosMemoryFree(pHead); return 1; @@ -674,76 +583,7 @@ void cliHandleResp2(SCliConn* conn) { (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } -void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { - // if (transQueueEmpty(&pConn->reqs)) { - // if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { - // tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); - // if (TRANS_CONN_REF_GET(pConn) > 1) transUnrefCliHandle(pConn); - // transUnrefCliHandle(pConn); - // return; - // } - // } - // SCliThrd* pThrd = pConn->hostThrd; - // STrans* pInst = pThrd->pInst; - // bool once = false; - // do { - // SCliReq* pReq = transQueuePop(&pConn->reqs); - - // if (pReq == NULL && once) { - // break; - // } - - // if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) { - // destroyReq(pReq); - // break; - // } - - // SReqCtx* pCtx = pReq ? pReq->ctx : NULL; - - // STransMsg transMsg = {0}; - // transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; - // transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0; - // transMsg.info.ahandle = NULL; - // transMsg.info.cliVer = pInst->compatibilityVer; - - // if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - // transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - // tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle, - // TMSG_INFO(transMsg.msgType)); - // if (transMsg.info.ahandle == NULL) { - // int32_t msgType = 0; - // transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType); - // transMsg.msgType = msgType; - // tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, - // transMsg.info.ahandle); - // } - // } else { - // transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL; - // } - - // if (pCtx == NULL || pCtx->pSem == NULL) { - // if (transMsg.info.ahandle == NULL) { - // if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) { - // destroyReq(pReq); - // once = true; - // continue; - // } - // } - // } - - // if (pReq == NULL || (pReq && pReq->type != Release)) { - // int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); - // cliDestroyMsgInExhandle(refId); - // if (cliNotifyCb(pConn, pReq, &transMsg) != 0) { - // return; - // } - // } - // destroyReq(pReq); - // tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, TRANS_CONN_REF_GET(pConn)); - // } while (!transQueueEmpty(&pConn->reqs)); - // if (TRANS_CONN_REF_GET(pConn) > 1) transUnrefCliHandle(pConn); - // transUnrefCliHandle(pConn); -} +void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { return; } void cliHandleExcept(SCliConn* conn, int32_t code) { tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); if (code != TSDB_CODE_RPC_FQDN_ERROR) { @@ -1068,22 +908,22 @@ static void addConnToPool(void* pool, SCliConn* conn) { SConnList* pList = conn->list; SMsgList* msgList = pList->list; - if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { - queue* h = QUEUE_HEAD(&(msgList)->msgQ); - QUEUE_REMOVE(h); + // if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { + // queue* h = QUEUE_HEAD(&(msgList)->msgQ); + // QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + // SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transDQCancel(thrd->waitConnQueue, pReq->ctx->task); - pReq->ctx->task = NULL; + // transDQCancel(thrd->waitConnQueue, pReq->ctx->task); + // pReq->ctx->task = NULL; - transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); - (void)transQueuePush(&conn->reqs, pReq); + // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); + // (void)transQueuePush(&conn->reqsToSend, pReq); - conn->status = ConnNormal; - (void)cliSend2(conn); - return; - } + // conn->status = ConnNormal; + // (void)cliSend2(conn); + // return; + // } conn->status = ConnInPool; QUEUE_PUSH(&conn->list->conns, &conn->q); @@ -1210,7 +1050,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } // static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { -// if (transQueuePush(&conn->reqs, pReq) != 0) { +// if (transQueuePush(&conn->reqsToSend, pReq) != 0) { // return TSDB_CODE_OUT_OF_MEMORY; // } // return 0; @@ -1218,7 +1058,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { // // do nothing -// SCliReq* pTail = transQueuePop(&conn->reqs); +// SCliReq* pTail = transQueuePop(&conn->reqsToSend); // if (pTail == NULL) { // return TSDB_CODE_INVALID_PARA; // } @@ -1259,7 +1099,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) code = cliMayUpdateState(pThrd, pReq, pConn); addConnToHeapCache(pThrd->connHeapCache, pConn); - transQueuePush(&pConn->reqs, pReq); + transQueuePush(&pConn->reqsToSend, &pReq->q); return cliDoConn(pThrd, pConn); _exception: // free conn @@ -1289,7 +1129,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int transReqQueueInit(&conn->wreqQueue); - TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed); + TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, NULL), NULL, _failed); TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); @@ -1331,7 +1171,8 @@ _failed: destroyCliConnQTable(conn); taosHashCleanup(conn->pQTable); (void)transDestroyBuffer(&conn->readBuf); - transQueueDestroy(&conn->reqs); + transQueueDestroy(&conn->reqsToSend); + transQueueDestroy(&conn->reqsSentOut); taosMemoryFree(conn->dstAddr); (void)transReleaseExHandle(transGetRefMgt(), conn->refId); @@ -1417,8 +1258,9 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - while (!transQueueEmpty(&conn->reqs)) { - SCliReq* pReq = transQueuePop(&conn->reqs); + while (!transQueueEmpty(&conn->reqsToSend)) { + queue* el = transQueuePop(&conn->reqsToSend); + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); // ASSERT(pReq->type != Release); // ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); @@ -1446,25 +1288,17 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { } static void cliConnRmReqs(SCliConn* conn) { - for (int i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* pReq = transQueueGet(&conn->reqs, i); - if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { - transQueueRm(&conn->reqs, i); - destroyReq(pReq); - i--; - } - } + // for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) { + // SCliReq* pReq = transQueueGet(&conn->reqsSentOut, i); + // if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { + // transQueueRm(&conn->reqsToSend, i); + // destroyReq(pReq); + // i--; + // } + // } + return; } -static int32_t cliShouldSendMsg(SCliConn* conn) { - for (int i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* pReq = transQueueGet(&conn->reqs, i); - if (pReq->sent == 0) { - return 1; - } - } - return 0; -} static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { SCliConn* conn = req->data; SCliThrd* pThrd = conn->hostThrd; @@ -1488,7 +1322,7 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { void cliSendBatch_shareConn(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - int32_t size = transQueueSize(&pConn->reqs); + int32_t size = transQueueSize(&pConn->reqsToSend); int32_t totalLen = 0; if (size == 0) { @@ -1498,11 +1332,9 @@ void cliSendBatch_shareConn(SCliConn* pConn) { uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); int j = 0; - for (int i = 0; i < size; i++) { - SCliReq* pCliMsg = transQueueGet(&pConn->reqs, i); - if (pCliMsg->sent == 1) { - continue; - } + while (!transQueueEmpty(&pConn->reqsToSend)) { + queue* h = transQueuePop(&pConn->reqsToSend); + SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q); SReqCtx* pCtx = pCliMsg->ctx; pConn->seq++; @@ -1550,6 +1382,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); + transQueuePush(&pConn->reqsSentOut, pCliMsg->q); } if (j == 0) { taosMemoryFree(wb); @@ -1566,7 +1399,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; - transQueuePush(&pConn->reqs, pCliMsg); + transQueuePush(&pConn->reqsToSend, pCliMsg); if (pConn->connnected) { code = cliSend2(pConn); } else { @@ -1663,7 +1496,7 @@ _exception2: static void cliHandleFastFail_resp(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - SCliReq* pReq = transQueueGet(&pConn->reqs, 0); + SCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0); STraceId* trace = &pReq->msg.info.traceId; tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), @@ -1800,7 +1633,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { if (TRANS_CONN_REF_GET(conn) == 2) { transUnrefCliHandle(conn); - if (!transQueuePush(&conn->reqs, pReq)) { + if (!transQueuePush(&conn->reqsToSend, pReq)) { return; } (void)cliSend2(conn); @@ -1809,48 +1642,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { destroyReq(pReq); } } -static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { - SReqCtx* pCtx = pReq->ctx; - pThrd->cvtAddr = pCtx->cvtAddr; - destroyReq(pReq); -} -static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq) { - int32_t code = 0; - int64_t refId = (int64_t)(pReq->msg.info.handle); - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); - if (exh == NULL) { - tDebug("id %" PRId64 " already released", refId); - destroyReq(pReq); - return; - } - - taosRLockLatch(&exh->latch); - SCliConn* conn = exh->handle; - taosRUnLockLatch(&exh->latch); - - if (conn == NULL || conn->refId != refId) { - TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); - } - tDebug("do free conn %p by id %" PRId64 "", conn, refId); - - int32_t size = transQueueSize(&conn->reqs); - if (size == 0) { - // already recv, and notify upper layer - TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); - } else { - while (TRANS_CONN_REF_GET(conn) >= 1) { - transUnrefCliHandle(conn); - } - return; - } -_exception: - tDebug("already free conn %p by id %" PRId64 "", conn, refId); - - (void)transReleaseExHandle(transGetRefMgt(), refId); - (void)transReleaseExHandle(transGetRefMgt(), refId); - (void)transRemoveExHandle(transGetRefMgt(), refId); - destroyReq(pReq); -} +static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; } SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) { SReqCtx* pCtx = (*pReq)->ctx; @@ -2322,9 +2114,9 @@ void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { transCtxCleanup(&conn->ctx); // cliReleaseUnfinishedMsg(conn); if (destroy == 1) { - transQueueDestroy(&conn->reqs); + transQueueDestroy(&conn->reqsToSend); } else { - transQueueClear(&conn->reqs); + transQueueClear(&conn->reqsToSend); } } @@ -2332,8 +2124,8 @@ void cliConnFreeMsgs(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - for (int i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* cmsg = transQueueGet(&conn->reqs, i); + for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) { + SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i); if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { continue; } @@ -3697,7 +3489,7 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node); - if (transQueueSize(&args1->reqs) > transQueueSize(&args2->reqs)) { + if (transQueueSize(&args1->reqsToSend) > transQueueSize(&args2->reqsToSend)) { return 0; } return 1; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 19196106d3..64ca99ad46 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -437,7 +437,7 @@ void transReqQueueClear(queue* q) { } } -int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(const void* arg)) { +int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) { QUEUE_INIT(&wq->node); wq->freeFunc = (void (*)(const void*))freeFunc; wq->size = 0; @@ -453,7 +453,7 @@ int32_t transQueuePush(STransQueue* q, void* arg) { void* transQueuePop(STransQueue* q) { if (q->size == 0) return NULL; - queue* tail = QUEUE_TAIL(&q->node); + queue* tail = QUEUE_HEAD(&q->node); QUEUE_REMOVE(tail); return tail; } @@ -469,6 +469,23 @@ void* transQueueGet(STransQueue* q, int idx) { return NULL; } +void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) { + queue* d = dst; + queue* node = QUEUE_NEXT(&q->node); + while (node != &q->node) { + queue* next = QUEUE_NEXT(node); + if (filter(node, arg)) { + QUEUE_REMOVE(node); + q->size--; + QUEUE_PUSH(d, node); + if (--size == 0) { + break; + } + } + node = next; + } +} + void* tranQueueHead(STransQueue* q) { if (q->size == 0) return NULL; @@ -490,6 +507,7 @@ void* transQueueRm(STransQueue* q, int i) { } void transQueueRemove(STransQueue* q, void* e) { + if (q->size == 0) return; queue* node = e; QUEUE_REMOVE(node); q->size--; @@ -501,7 +519,7 @@ void transQueueClear(STransQueue* q) { while (!QUEUE_IS_EMPTY(q->node)) { queue* h = QUEUE_HEAD(&q->node); QUEUE_REMOVE(h); - q->freeFunc(h); + if (q->freeFunc != NULL) (q->freeFunc)(h); q->size--; } } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 51b0784f73..fcaf82a824 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -36,7 +36,7 @@ typedef struct SSvrConn { void* pInst; // rpc init void* ahandle; // void* hostThrd; - STransQueue srvMsgs; + STransQueue resps; // SSvrRegArg regArg; bool broken; // conn broken; @@ -63,7 +63,7 @@ typedef struct SSvrConn { SHashObj* pQTable; } SSvrConn; -typedef struct SSvrMsg { +typedef struct SSvrRespMsg { SSvrConn* pConn; STransMsg msg; queue q; @@ -73,9 +73,7 @@ typedef struct SSvrMsg { FilteFunc func; int8_t sent; - queue sendReq; - -} SSvrMsg; +} SSvrRespMsg; typedef struct { int64_t ver; @@ -158,14 +156,14 @@ static void uvWorkAfterTask(uv_work_t* req, int status); static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvFreeCb(uv_handle_t* handle); -static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg); +static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg); -static int uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); -static void uvStartSendResp(SSvrMsg* msg); +static int uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb); +static void uvStartSendResp(SSvrRespMsg* msg); static void uvNotifyLinkBrokenToApp(SSvrConn* conn); -static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); +static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg); static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); // static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); @@ -174,13 +172,13 @@ static int32_t reallocConnRef(SSvrConn* conn); int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; } -static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); -static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); -static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); -static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); -static void uvHandleUpdate(SSvrMsg* pMsg, SWorkThrd* thrd); -static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, - uvHandleRegister, uvHandleUpdate}; +static void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd); +static void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd); +static void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd); +static void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd); +static void uvHandleUpdate(SSvrRespMsg* pMsg, SWorkThrd* thrd); +static void (*transAsyncHandle[])(SSvrRespMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister, uvHandleUpdate}; static void uvDestroyConn(uv_handle_t* handle); @@ -447,17 +445,17 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { (void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); } - STransMsg tmsg = {.code = code, - .msgType = pHead->msgType + 1, - .info.qId = qId, - .info.traceId = pHead->traceId, - .info.seqNum = htonl(pHead->seqNum)}; - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + STransMsg tmsg = {.code = code, + .msgType = pHead->msgType + 1, + .info.qId = qId, + .info.traceId = pHead->traceId, + .info.seqNum = htonl(pHead->seqNum)}; + SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); srvMsg->msg = tmsg; srvMsg->type = Normal; srvMsg->pConn = pConn; - transQueuePush(&pConn->srvMsgs, srvMsg); + transQueuePush(&pConn->resps, &srvMsg->q); uvStartSendRespImpl(srvMsg); return 1; @@ -635,8 +633,8 @@ void uvOnSendCb(uv_write_t* req, int status) { queue* head = QUEUE_HEAD(&src); QUEUE_REMOVE(head); - SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); - STraceId* trace = &smsg->msg.info.traceId; + SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); + STraceId* trace = &smsg->msg.info.traceId; tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId); destroySmsg(smsg); @@ -646,8 +644,8 @@ void uvOnSendCb(uv_write_t* req, int status) { queue* head = QUEUE_HEAD(&src); QUEUE_REMOVE(head); - SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); - STraceId* trace = &smsg->msg.info.traceId; + SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); + STraceId* trace = &smsg->msg.info.traceId; tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status)); destroySmsg(smsg); @@ -676,7 +674,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { taosMemoryFree(req); } -static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { +static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { SSvrConn* pConn = smsg->pConn; STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { @@ -699,7 +697,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { ASSERT(0); - // (void)transQueuePop(&pConn->srvMsgs); + // (void)transQueuePop(&pConn->resps); // destroySmsg(smsg); // return TSDB_CODE_INVALID_MSG; } @@ -728,30 +726,27 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { wb->len = len; return 0; } -static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* sendReqNode) { +static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* toSendQ) { + int32_t size = transQueueSize(&pConn->resps); + tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size); + if (size == 0) { + return 0; + } int32_t count = 0; - int32_t size = transQueueSize(&pConn->srvMsgs); uv_buf_t* pWb = taosMemoryCalloc(size, sizeof(uv_buf_t)); if (pWb == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size); - for (int32_t i = 0; i < transQueueSize(&pConn->srvMsgs); i++) { - SSvrMsg* pMsg = transQueueGet(&pConn->srvMsgs, i); - if (pMsg->sent == 1) { - continue; - } - uv_buf_t wb; + while (transQueueSize(&pConn->resps) > 0) { + queue* el = transQueuePop(&pConn->resps); + SSvrRespMsg* pMsg = QUEUE_DATA(el, SSvrRespMsg, q); + uv_buf_t wb; (void)uvPrepareSendData(pMsg, &wb); pWb[count] = wb; pMsg->sent = 1; - - QUEUE_PUSH(sendReqNode, &pMsg->sendReq); - - transQueueRm(&pConn->srvMsgs, i); - i--; + QUEUE_PUSH(toSendQ, &pMsg->q); count++; } @@ -766,7 +761,7 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf return 0; } -static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { +static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) { int32_t code = 0; SSvrConn* pConn = smsg->pConn; if (pConn->broken) { @@ -804,7 +799,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { (void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb); taosMemoryFree(pBuf); } -int32_t uvMayHandleReleaseResp(SSvrMsg* pMsg) { +int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) { SSvrConn* pConn = pMsg->pConn; int64_t qid = pMsg->msg.info.qId; if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { @@ -819,7 +814,7 @@ int32_t uvMayHandleReleaseResp(SSvrMsg* pMsg) { } return 0; } -static void uvStartSendResp(SSvrMsg* smsg) { +static void uvStartSendResp(SSvrRespMsg* smsg) { // impl SSvrConn* pConn = smsg->pConn; if (uvMayHandleReleaseResp(smsg) == TSDB_CODE_RPC_NO_STATE) { @@ -827,19 +822,19 @@ static void uvStartSendResp(SSvrMsg* smsg) { return; } - transQueuePush(&pConn->srvMsgs, smsg); + transQueuePush(&pConn->resps, &smsg->q); uvStartSendRespImpl(smsg); return; } -static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { +static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg) { if (smsg == NULL) { return; } transFreeMsg(smsg->msg.pCont); taosMemoryFree(smsg); } -static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } +static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrRespMsg*)smsg); } static void destroyAllConn(SWorkThrd* pThrd) { tTrace("thread %p destroy all conn ", pThrd); @@ -870,7 +865,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { queue* head = QUEUE_HEAD(&wq); QUEUE_REMOVE(head); - SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); + SSvrRespMsg* msg = QUEUE_DATA(head, SSvrRespMsg, q); if (msg == NULL) { tError("unexcept occurred, continue"); continue; @@ -1183,7 +1178,10 @@ void* transWorkerThread(void* arg) { return NULL; } - +void uvDestroyResp(void* e) { + SSvrRespMsg* pMsg = QUEUE_DATA(e, SSvrRespMsg, q); + destroySmsg(pMsg); +} static FORCE_INLINE SSvrConn* createConn(void* hThrd) { int32_t code = 0; SWorkThrd* pThrd = hThrd; @@ -1197,7 +1195,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); - if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { + if ((code = transQueueInit(&pConn->resps, uvDestroyResp)) != 0) { TAOS_CHECK_GOTO(code, &lino, _end); } @@ -1260,7 +1258,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { return pConn; _end: if (pConn) { - transQueueDestroy(&pConn->srvMsgs); + transQueueDestroy(&pConn->resps); (void)transDestroyBuffer(&pConn->readBuf); taosHashCleanup(pConn->pQTable); taosMemoryFree(pConn->pTcp); @@ -1334,11 +1332,11 @@ static void uvDestroyConn(uv_handle_t* handle) { STrans* pInst = thrd->pInst; tDebug("%s conn %p destroy", transLabel(pInst), conn); - for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) { - SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); - destroySmsg(msg); - } - transQueueDestroy(&conn->srvMsgs); + // for (int i = 0; i < transQueueSize(&conn->resps); i++) { + // SSvrRespMsg* msg = transQueueGet(&conn->resps, i); + // destroySmsg(msg); + // } + transQueueDestroy(&conn->resps); transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); @@ -1566,7 +1564,7 @@ End: return NULL; } -void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { +void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { uv_walk(thrd->loop, uvWalkCb, NULL); @@ -1575,12 +1573,12 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { } taosMemoryFree(msg); } -void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { +void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) { ASSERT(0); // int32_t code = 0; // SSvrConn* conn = msg->pConn; // if (conn->status == ConnAcquire) { - // if (!transQueuePush(&conn->srvMsgs, msg)) { + // if (!transQueuePush(&conn->resps, msg)) { // return; // } // uvStartSendRespImpl(msg); @@ -1591,13 +1589,13 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { // destroySmsg(msg); } -void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { +void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) { // send msg to client tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn); uvStartSendResp(msg); } -int32_t uvHandleStateReq(SSvrMsg* msg) { +int32_t uvHandleStateReq(SSvrRespMsg* msg) { int32_t code = 0; SSvrConn* conn = msg->pConn; tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, @@ -1613,14 +1611,14 @@ int32_t uvHandleStateReq(SSvrMsg* msg) { if (code == 0) tDebug("conn %p register brokenlink callback succ", conn); return code; } -void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { +void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd) { SSvrConn* conn = msg->pConn; tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn); int32_t code = uvHandleStateReq(msg); taosMemoryFree(msg); } -void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { +void uvHandleUpdate(SSvrRespMsg* msg, SWorkThrd* thrd) { SUpdateIpWhite* req = msg->arg; if (req == NULL) { tDebug("ip-white-list disable on trans"); @@ -1665,7 +1663,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { } (void)taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL); transAsyncPoolDestroy(pThrd->asyncPool); uvWhiteListDestroy(pThrd->pWhiteList); @@ -1675,7 +1673,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { taosMemoryFree(pThrd); } void sendQuitToWorkThrd(SWorkThrd* pThrd) { - SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); msg->type = Quit; tDebug("server send quit msg to work thread"); (void)transAsyncSend(pThrd->asyncPool, &msg->q); @@ -1752,7 +1750,7 @@ int32_t transReleaseSrvHandle(void* handle) { .info.qId = qId, .info.traceId = info->traceId}; - SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); if (m == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _return1; @@ -1803,7 +1801,7 @@ int32_t transSendResponse(const STransMsg* msg) { SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); if (m == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _return1; @@ -1851,7 +1849,7 @@ int32_t transRegisterMsg(const STransMsg* msg) { SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); if (m == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _return1; @@ -1895,7 +1893,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { for (int i = 0; i < svrObj->numOfThreads; i++) { SWorkThrd* pThrd = svrObj->pThreadObj[i]; - SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); if (msg == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; break;