From 603de3976c68495f9c71629236e53c69dc513984 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 3 Sep 2024 19:40:13 +0800 Subject: [PATCH] refactor transport --- include/libs/transport/trpc.h | 5 +- source/libs/transport/src/transCli.c | 683 ++++++++++++++------------- source/libs/transport/src/transSvr.c | 12 +- 3 files changed, 356 insertions(+), 344 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 486a5e35c3..cc9d789430 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -63,6 +63,7 @@ typedef struct SRpcHandleInfo { int8_t forbiddenIp; int8_t notFreeAhandle; int8_t compressed; + int32_t seqNum; } SRpcHandleInfo; typedef struct SRpcMsg { @@ -125,8 +126,8 @@ typedef struct SRpcInit { int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; - int8_t shareConn; // 0: no share, 1. share - int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait + int8_t shareConn; // 0: no share, 1. share + int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait void *parent; } SRpcInit; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index edcf933aab..a3cd6eacd0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -169,7 +169,9 @@ static void doCloseIdleConn(void* param); static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port); static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); - +static void cliSendBatch_shareConnCb(uv_write_t* req, int status); +void cliSendBatch_shareConn(SCliConn* pConn); +int32_t cliSend2(SCliConn* conn); // register conn timer static void cliConnTimeout(uv_timer_t* handle); // register timer for read @@ -236,9 +238,9 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq); static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq); static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq); -static void cliDealReq(queue* h, SCliThrd* pThrd); -static void cliBatchDealReq(queue* h, SCliThrd* pThrd); -static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq}; +static void cliDoReq(queue* h, SCliThrd* pThrd); +static void cliDoBatchReq(queue* h, SCliThrd* pThrd); +static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDoReq, cliDoBatchReq}; static void (*cliAsyncHandle[])(SCliThrd* pThrd, SCliReq* pReq) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate, cliHandleFreeById}; @@ -392,7 +394,7 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->reqs)) { SCliReq* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - (void)cliSend(conn); + (void)cliSend2(conn); return true; } return false; @@ -419,7 +421,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { (void)transQueuePush(&conn->reqs, t); tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); (void)transReleaseExHandle(transGetRefMgt(), refId); - (void)cliSend(conn); + (void)cliSend2(conn); return true; } taosWUnLockLatch(&exh->latch); @@ -457,20 +459,6 @@ void cliResetConnTimer(SCliConn* conn) { void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } -SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) { - SCliReq* pReq = NULL; - for (int i = 0; i < transQueueSize(&conn->reqs); i++) { - pReq = transQueueGet(&conn->reqs, i); - if (pReq->seq == seq) { - transQueueRm(&conn->reqs, i); - break; - } - } - if (pReq == NULL) { - ASSERT(0); - } - return pReq; -} bool cliShouldAddConnToPool(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; bool empty = transQueueEmpty(&conn->reqs); @@ -521,7 +509,6 @@ void cliHandleResp2(SCliConn* conn) { cliResetConnTimer(conn); - // int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; STransMsgHead* pHead = NULL; int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0); if (msgLen < 0) { @@ -539,7 +526,7 @@ void cliHandleResp2(SCliConn* conn) { } SCliReq* pReq = NULL; - int32_t seq = pHead->seqNum; + int32_t seq = htonl(pHead->seqNum); code = cliGetReqBySeq(conn, seq, &pReq); if (code != 0) { tDebug("%s conn %p recv unexpected packet, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); @@ -574,206 +561,191 @@ void cliHandleResp2(SCliConn* conn) { (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } -void cliHandleResp(SCliConn* conn) { - int32_t code = 0; - SCliThrd* pThrd = conn->hostThrd; - STrans* pInst = pThrd->pInst; +// void cliHandleResp(SCliConn* conn) { +// int32_t code = 0; +// SCliThrd* pThrd = conn->hostThrd; +// STrans* pInst = pThrd->pInst; - cliResetConnTimer(conn); +// cliResetConnTimer(conn); - STransMsgHead* pHead = NULL; +// STransMsgHead* pHead = NULL; - int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf); - if (msgLen <= 0) { - taosMemoryFree(pHead); - tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); - // TODO: notify cb - pThrd->notifyExceptCb(pThrd, NULL, NULL); - return; - } +// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; +// int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf); +// if (msgLen <= 0) { +// taosMemoryFree(pHead); +// tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); +// // TODO: notify cb +// pThrd->notifyExceptCb(pThrd, NULL, NULL); +// return; +// } - if (resetBuf == 0) { - tTrace("%s conn %p not reset read buf", transLabel(pInst), conn); - } +// if (resetBuf == 0) { +// tTrace("%s conn %p not reset read buf", transLabel(pInst), conn); +// } - if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { - tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); - // TODO: notify cb - } - pHead->code = htonl(pHead->code); - pHead->msgLen = htonl(pHead->msgLen); - if (cliRecvReleaseReq(conn, pHead)) { - return; - } +// if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { +// tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); +// // TODO: notify cb +// } +// pHead->code = htonl(pHead->code); +// pHead->msgLen = htonl(pHead->msgLen); +// if (cliRecvReleaseReq(conn, pHead)) { +// return; +// } - STransMsg transMsg = {0}; - transMsg.contLen = transContLenFromMsg(pHead->msgLen); - transMsg.pCont = transContFromHead((char*)pHead); - transMsg.code = pHead->code; - transMsg.msgType = pHead->msgType; - transMsg.info.ahandle = NULL; - transMsg.info.traceId = pHead->traceId; - transMsg.info.hasEpSet = pHead->hasEpSet; - transMsg.info.cliVer = htonl(pHead->compatibilityVer); +// STransMsg transMsg = {0}; +// transMsg.contLen = transContLenFromMsg(pHead->msgLen); +// transMsg.pCont = transContFromHead((char*)pHead); +// transMsg.code = pHead->code; +// transMsg.msgType = pHead->msgType; +// transMsg.info.ahandle = NULL; +// transMsg.info.traceId = pHead->traceId; +// transMsg.info.hasEpSet = pHead->hasEpSet; +// transMsg.info.cliVer = htonl(pHead->compatibilityVer); - SCliReq* pReq = NULL; - SReqCtx* pCtx = NULL; - if (CONN_NO_PERSIST_BY_APP(conn)) { - pReq = transQueuePop(&conn->reqs); +// SCliReq* pReq = NULL; +// SReqCtx* pCtx = NULL; +// if (CONN_NO_PERSIST_BY_APP(conn)) { +// pReq = transQueuePop(&conn->reqs); - pCtx = pReq ? pReq->ctx : NULL; - transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); - } else { - uint64_t ahandle = (uint64_t)pHead->ahandle; - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); - if (pReq == NULL) { - transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn, - transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); - if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { - transMsg.code = TSDB_CODE_RPC_BROKEN_LINK; - transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn, - transMsg.info.ahandle); - } - } else { - pCtx = pReq->ctx; - transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); - } - } - // buf's mem alread translated to transMsg.pCont - if (!CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.info.handle = (void*)conn->refId; - transMsg.info.refId = (int64_t)(void*)conn->refId; - tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); - } +// pCtx = pReq ? pReq->ctx : NULL; +// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; +// tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); +// } else { +// uint64_t ahandle = (uint64_t)pHead->ahandle; +// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); +// if (pReq == NULL) { +// transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); +// tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn, +// transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); +// if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { +// transMsg.code = TSDB_CODE_RPC_BROKEN_LINK; +// transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); +// tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn, +// transMsg.info.ahandle); +// } +// } else { +// pCtx = pReq->ctx; +// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; +// tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); +// } +// } +// // buf's mem alread translated to transMsg.pCont +// if (!CONN_NO_PERSIST_BY_APP(conn)) { +// transMsg.info.handle = (void*)conn->refId; +// transMsg.info.refId = (int64_t)(void*)conn->refId; +// tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); +// } - STraceId* trace = &transMsg.info.traceId; - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, - TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); +// STraceId* trace = &transMsg.info.traceId; +// tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, +// TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); - if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { - tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); - transFreeMsg(transMsg.pCont); - return; - } - if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { - tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); - transFreeMsg(transMsg.pCont); - return; - } +// if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { +// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); +// transFreeMsg(transMsg.pCont); +// return; +// } +// if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { +// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); +// transFreeMsg(transMsg.pCont); +// return; +// } - if (pReq == NULL || (pReq && pReq->type != Release)) { - if (cliNotifyCb(conn, pReq, &transMsg) != 0) { - return; - } - } - int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); - tDebug("conn %p msg refId: %" PRId64 "", conn, refId); - destroyReq(pReq); +// if (pReq == NULL || (pReq && pReq->type != Release)) { +// if (cliNotifyCb(conn, pReq, &transMsg) != 0) { +// return; +// } +// } +// int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); +// tDebug("conn %p msg refId: %" PRId64 "", conn, refId); +// destroyReq(pReq); - if (cliConnSendSeqMsg(refId, conn)) { - return; - } +// if (cliConnSendSeqMsg(refId, conn)) { +// return; +// } - if (cliMaySendCachedMsg(conn) == true) { - return; - } +// if (cliMaySendCachedMsg(conn) == true) { +// return; +// } - if (CONN_NO_PERSIST_BY_APP(conn)) { - return addConnToPool(pThrd->pool, conn); - } +// if (CONN_NO_PERSIST_BY_APP(conn)) { +// return addConnToPool(pThrd->pool, conn); +// } - (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); -} -static void cliDestroyMsgInExhandle(int64_t refId) { - if (refId == 0) return; - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); - if (exh) { - taosWLockLatch(&exh->latch); - while (!QUEUE_IS_EMPTY(&exh->q)) { - queue* h = QUEUE_HEAD(&exh->q); - QUEUE_REMOVE(h); - SCliReq* t = QUEUE_DATA(h, SCliReq, seqq); - destroyReq(t); - } - taosWUnLockLatch(&exh->latch); - (void)transReleaseExHandle(transGetRefMgt(), refId); - } -} +// (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); +// } void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { - if (transQueueEmpty(&pConn->reqs)) { - if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { - tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); - if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); - transUnrefCliHandle(pConn); - return; - } - } - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - bool once = false; - do { - SCliReq* pReq = transQueuePop(&pConn->reqs); + // if (transQueueEmpty(&pConn->reqs)) { + // if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { + // tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); + // if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); + // transUnrefCliHandle(pConn); + // return; + // } + // } + // SCliThrd* pThrd = pConn->hostThrd; + // STrans* pInst = pThrd->pInst; + // bool once = false; + // do { + // SCliReq* pReq = transQueuePop(&pConn->reqs); - if (pReq == NULL && once) { - break; - } + // if (pReq == NULL && once) { + // break; + // } - if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) { - destroyReq(pReq); - break; - } + // if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) { + // destroyReq(pReq); + // break; + // } - SReqCtx* pCtx = pReq ? pReq->ctx : NULL; + // SReqCtx* pCtx = pReq ? pReq->ctx : NULL; - STransMsg transMsg = {0}; - transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; - transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0; - transMsg.info.ahandle = NULL; - transMsg.info.cliVer = pInst->compatibilityVer; + // STransMsg transMsg = {0}; + // transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; + // transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0; + // transMsg.info.ahandle = NULL; + // transMsg.info.cliVer = pInst->compatibilityVer; - if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle, - TMSG_INFO(transMsg.msgType)); - if (transMsg.info.ahandle == NULL) { - int32_t msgType = 0; - transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType); - transMsg.msgType = msgType; - tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, - transMsg.info.ahandle); - } - } else { - transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL; - } + // if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { + // transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); + // tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle, + // TMSG_INFO(transMsg.msgType)); + // if (transMsg.info.ahandle == NULL) { + // int32_t msgType = 0; + // transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType); + // transMsg.msgType = msgType; + // tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, + // transMsg.info.ahandle); + // } + // } else { + // transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL; + // } - if (pCtx == NULL || pCtx->pSem == NULL) { - if (transMsg.info.ahandle == NULL) { - if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) { - destroyReq(pReq); - once = true; - continue; - } - } - } + // if (pCtx == NULL || pCtx->pSem == NULL) { + // if (transMsg.info.ahandle == NULL) { + // if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) { + // destroyReq(pReq); + // once = true; + // continue; + // } + // } + // } - if (pReq == NULL || (pReq && pReq->type != Release)) { - int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); - cliDestroyMsgInExhandle(refId); - if (cliNotifyCb(pConn, pReq, &transMsg) != 0) { - return; - } - } - destroyReq(pReq); - tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); - } while (!transQueueEmpty(&pConn->reqs)); - if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); - transUnrefCliHandle(pConn); + // if (pReq == NULL || (pReq && pReq->type != Release)) { + // int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle)); + // cliDestroyMsgInExhandle(refId); + // if (cliNotifyCb(pConn, pReq, &transMsg) != 0) { + // return; + // } + // } + // destroyReq(pReq); + // tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); + // } while (!transQueueEmpty(&pConn->reqs)); + // if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); + // transUnrefCliHandle(pConn); } void cliHandleExcept(SCliConn* conn, int32_t code) { tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); @@ -946,6 +918,7 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p tDebug("conn %p get from pool, pool size:%d, dst:%s", conn, conn->list->size, conn->dstAddr); + *ppConn = conn; return 0; } @@ -989,79 +962,79 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) { plist->list = nList; } - STraceId* trace = &(*pReq)->msg.info.traceId; - // no avaliable conn in pool - if (QUEUE_IS_EMPTY(&plist->conns)) { - SMsgList* list = plist->list; - if ((list)->numOfConn >= pInst->connLimitNum) { - STraceId* trace = &(*pReq)->msg.info.traceId; - if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) { - tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType), - tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); - doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); - *pReq = NULL; - return NULL; - } + // STraceId* trace = &(*pReq)->msg.info.traceId; + // // no avaliable conn in pool + // if (QUEUE_IS_EMPTY(&plist->conns)) { + // SMsgList* list = plist->list; + // if ((list)->numOfConn >= pInst->connLimitNum) { + // STraceId* trace = &(*pReq)->msg.info.traceId; + // if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) { + // tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType), + // tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); + // doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + // *pReq = NULL; + // return NULL; + // } - STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - if (arg == NULL) { - doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pReq = NULL; - return NULL; - } - arg->param1 = *pReq; - arg->param2 = pThrd; + // STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + // if (arg == NULL) { + // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + // *pReq = NULL; + // return NULL; + // } + // arg->param1 = *pReq; + // arg->param2 = pThrd; - SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); - if (task == NULL) { - taosMemoryFree(arg); - doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pReq = NULL; - return NULL; - } - (*pReq)->ctx->task = task; - tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); - *pReq = NULL; - } else { - // send msg in delay queue - if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { - STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - if (arg == NULL) { - doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pReq = NULL; - return NULL; - } - arg->param1 = *pReq; - arg->param2 = pThrd; + // SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); + // if (task == NULL) { + // taosMemoryFree(arg); + // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + // *pReq = NULL; + // return NULL; + // } + // (*pReq)->ctx->task = task; + // tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); + // QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); + // *pReq = NULL; + // } else { + // // send msg in delay queue + // if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { + // STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + // if (arg == NULL) { + // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + // *pReq = NULL; + // return NULL; + // } + // arg->param1 = *pReq; + // arg->param2 = pThrd; - SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); - if (task == NULL) { - taosMemoryFree(arg); - doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); - *pReq = NULL; - return NULL; - } + // SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); + // if (task == NULL) { + // taosMemoryFree(arg); + // doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY); + // *pReq = NULL; + // return NULL; + // } - (*pReq)->ctx->task = task; - tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); + // (*pReq)->ctx->task = task; + // tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); - queue* h = QUEUE_HEAD(&(list)->msgQ); - QUEUE_REMOVE(h); - SCliReq* ans = QUEUE_DATA(h, SCliReq, q); + // QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q); + // queue* h = QUEUE_HEAD(&(list)->msgQ); + // QUEUE_REMOVE(h); + // SCliReq* ans = QUEUE_DATA(h, SCliReq, q); - *pReq = ans; + // *pReq = ans; - trace = &(*pReq)->msg.info.traceId; - tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); - transDQCancel(pThrd->waitConnQueue, ans->ctx->task); - } - list->numOfConn++; - } - tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key); - return NULL; - } + // trace = &(*pReq)->msg.info.traceId; + // tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType)); + // transDQCancel(pThrd->waitConnQueue, ans->ctx->task); + // } + // list->numOfConn++; + // } + // tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key); + // return NULL; + // } queue* h = QUEUE_TAIL(&plist->conns); plist->size -= 1; @@ -1081,6 +1054,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } + + conn->seq = 0; int32_t code = allocConnRef(conn, true); if (code != 0) { cliDestroyConn(conn, true); @@ -1115,7 +1090,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { (void)transQueuePush(&conn->reqs, pReq); conn->status = ConnNormal; - (void)cliSend(conn); + (void)cliSend2(conn); return; } @@ -1219,7 +1194,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { cliHandleExcept(conn, -1); break; } else { - cliHandleResp(conn); + cliHandleResp2(conn); } } return; @@ -1373,59 +1348,59 @@ static void cliDestroy(uv_handle_t* handle) { taosMemoryFree(conn); } -static bool cliHandleNoResp(SCliConn* conn) { - bool res = false; - if (!transQueueEmpty(&conn->reqs)) { - SCliReq* pReq = transQueueGet(&conn->reqs, 0); - if (REQUEST_NO_RESP(&pReq->msg)) { - (void)transQueuePop(&conn->reqs); - destroyReq(pReq); - res = true; - } - if (res == true) { - if (cliMaySendCachedMsg(conn) == false) { - SCliThrd* thrd = conn->hostThrd; - addConnToPool(thrd->pool, conn); - res = false; - } else { - res = true; - } - } - } - return res; -} +// static bool cliHandleNoResp(SCliConn* conn) { +// bool res = false; +// if (!transQueueEmpty(&conn->reqs)) { +// SCliReq* pReq = transQueueGet(&conn->reqs, 0); +// if (REQUEST_NO_RESP(&pReq->msg)) { +// (void)transQueuePop(&conn->reqs); +// destroyReq(pReq); +// res = true; +// } +// if (res == true) { +// if (cliMaySendCachedMsg(conn) == false) { +// SCliThrd* thrd = conn->hostThrd; +// addConnToPool(thrd->pool, conn); +// res = false; +// } else { +// res = true; +// } +// } +// } +// return res; +// } static void cliSendCb(uv_write_t* req, int status) { STUB_RAND_NETWORK_ERR(status); - SCliConn* pConn = transReqQueueRemove(req); - if (pConn == NULL) return; + // SCliConn* pConn = transReqQueueRemove(req); + // if (pConn == NULL) return; - SCliReq* pReq = transQueueGet(&pConn->reqs, 0); - if (pReq != NULL) { - int64_t cost = taosGetTimestampUs() - pReq->st; - if (cost > 1000 * 50) { - tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); - } - } - if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) { - rpcFreeCont(pReq->msg.pCont); - pReq->msg.pCont = 0; - } + // SCliReq* pReq = transQueueGet(&pConn->reqs, 0); + // if (pReq != NULL) { + // int64_t cost = taosGetTimestampUs() - pReq->st; + // if (cost > 1000 * 50) { + // tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); + // } + // } + // if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) { + // rpcFreeCont(pReq->msg.pCont); + // pReq->msg.pCont = 0; + // } - if (status == 0) { - tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); - } else { - if (!uv_is_closing((uv_handle_t*)&pConn->stream)) { - tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); - cliHandleExcept(pConn, -1); - } - return; - } - if (cliHandleNoResp(pConn) == true) { - tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn); - return; - } - (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); + // if (status == 0) { + // tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); + // } else { + // if (!uv_is_closing((uv_handle_t*)&pConn->stream)) { + // tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); + // cliHandleExcept(pConn, -1); + // } + // return; + // } + // if (cliHandleNoResp(pConn) == true) { + // tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn); + // return; + // } + // (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } static void cliHandleBatch_shareConnExcept(SCliConn* conn) { @@ -1459,9 +1434,34 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { if (T_REF_VAL_GET(conn) > 1) transUnrefCliHandle(conn); transUnrefCliHandle(conn); } + +static void cliConnRmReqs(SCliConn* conn) { + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* pReq = transQueueGet(&conn->reqs, i); + if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { + transQueueRm(&conn->reqs, i); + destroyReq(pReq); + } + } +} + +static int32_t cliShouldSendMsg(SCliConn* conn) { + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* pReq = transQueueGet(&conn->reqs, i); + if (pReq->sent == 0) { + // pReq->sent = 1; + // pReq->seq = conn->seq; + return 1; + } + } + return 0; +} static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { SCliConn* conn = req->data; + SCliThrd* pThrd = conn->hostThrd; conn->shareCnt -= 1; + + cliConnRmReqs(conn); if (status != 0) { tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); if (!uv_is_closing((uv_handle_t*)&conn->stream)) { @@ -1469,7 +1469,16 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { } return; } - uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + int ret = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + + if (ret != 0) { + tError("%s conn %p failed to start read, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(ret)); + cliHandleBatch_shareConnExcept(conn); + } + + if (cliShouldSendMsg(conn) == 1) { + cliSendBatch_shareConn(conn); + } taosMemoryFree(req); } void cliSendBatch_shareConn(SCliConn* pConn) { @@ -1518,7 +1527,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pHead->compatibilityVer = htonl(pInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); - pHead->seqNum = pConn->seq; + pHead->seqNum = htonl(pConn->seq); if (pHead->comp == 0) { if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { @@ -1532,7 +1541,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { totalLen += msgLen; pCliMsg->sent = 1; - pCliMsg->seq = pHead->seqNum; + pCliMsg->seq = pConn->seq; } uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); @@ -1577,7 +1586,7 @@ void cliSendBatch(SCliConn* pConn) { } pReq->contLen = 0; } - + pConn->seq++; int msgLen = transMsgLenFromCont(pReq->contLen); STransMsgHead* pHead = transHeadFromCont(pReq->pCont); @@ -1595,6 +1604,7 @@ void cliSendBatch(SCliConn* pConn) { pHead->compatibilityVer = htonl(pInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); + pHead->seqNum = htonl(pConn->seq); if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { @@ -1637,10 +1647,14 @@ _exception: int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; transQueuePush(&pConn->reqs, pCliMsg); - code = cliSend(pConn); + code = cliSend2(pConn); return code; } +int32_t cliSend2(SCliConn* pConn) { + cliSendBatch_shareConn(pConn); + return 0; +} int32_t cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; @@ -1677,7 +1691,7 @@ int32_t cliSend(SCliConn* pConn) { pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; pHead->compatibilityVer = htonl(pInst->compatibilityVer); - pHead->seqNum = pConn->seq++; + pHead->seqNum = htonl(pConn->seq++); } pHead->timestamp = taosHton64(taosGetTimestampUs()); @@ -1957,7 +1971,7 @@ void cliConnCb(uv_connect_t* req, int status) { return cliSendBatch_shareConn(pConn); } - (void)cliSend(pConn); + (void)cliSend2(pConn); } static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { @@ -2019,7 +2033,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { if (!transQueuePush(&conn->reqs, pReq)) { return; } - (void)cliSend(conn); + (void)cliSend2(conn); } else { tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); destroyReq(pReq); @@ -2298,15 +2312,15 @@ _exception: } void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { - STrans* pInst = pThrd->pInst; - if (pInst->shareConn == 1) { - return cliHandleReq__shareConn(pThrd, pReq); - } else { - return cliHandleReq__noShareConn(pThrd, pReq); - } + // STrans* pInst = pThrd->pInst; + // if (pInst->shareConn == 1) { + // return cliHandleReq__shareConn(pThrd, pReq); + // } else { + return cliHandleReq__noShareConn(pThrd, pReq); + //} } -static void cliDealReq(queue* wq, SCliThrd* pThrd) { +static void cliDoReq(queue* wq, SCliThrd* pThrd) { int count = 0; while (!QUEUE_IS_EMPTY(wq)) { @@ -2464,7 +2478,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* p *ppBatch = pBatch; return 0; } -static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { +static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { STrans* pInst = pThrd->pInst; int32_t code = 0; @@ -2674,9 +2688,8 @@ _err: terrno = code; return NULL; } - int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { - SCliThrd* pThrd = pThrd; + SCliThrd* pThrd = thrd; return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); } int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { @@ -3264,12 +3277,6 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - // if (pReq == NULL || pReq->ctx == NULL) { - // tTrace("%s conn %p handle resp", pInst->label, pConn); - // pInst->cfp(pInst->parent, pResp, NULL); - // return 0; - // } - if (cliMayRetry(pConn, pReq, pResp)) { return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 8acbe9f273..6b5c15eae3 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -65,9 +65,9 @@ typedef struct SSvrMsg { STransMsg msg; queue q; STransMsgType type; - - void* arg; - FilteFunc func; + int32_t seqNum; + void* arg; + FilteFunc func; } SSvrMsg; @@ -471,7 +471,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - if (pHead->seqNum != 0) { + if (pHead->seqNum == 0) { ASSERT(0); } // pHead->noResp = 1, @@ -487,6 +487,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.forbiddenIp = forbiddenIp; transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0; + transMsg.info.seqNum = htonl(pHead->seqNum); uvMaySetConnAcquired(pConn, pHead); @@ -652,6 +653,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer); pHead->version = TRANS_VER; + pHead->seqNum = htonl(pMsg->info.seqNum); // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { @@ -793,6 +795,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; + msg->seqNum = transMsg.info.seqNum; + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1);