From 620f4c58067749854c1892797b6cac60650ac6e6 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 3 Jul 2024 06:46:38 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 81 ++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index aa04bc336f..dfeea77035 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -85,6 +85,7 @@ typedef struct SCliConn { int64_t refId; int32_t seq; + int32_t shareCnt; } SCliConn; typedef struct SCliMsg { @@ -230,6 +231,10 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); +static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); +static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); +static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); + // thread obj static SCliThrd* createThrdObj(void* trans); static void destroyThrdObj(SCliThrd* pThrd); @@ -420,6 +425,15 @@ SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) { } return pMsg; } +bool cliShouldAddConnToPool(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + bool empty = transQueueEmpty(&conn->cliMsgs); + if (empty) { + delConnFromHeapCache(pThrd->connHeapCache, conn); + } + + return empty; +} void cliHandleResp_shareConn(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1113,12 +1127,42 @@ static void cliSendCb(uv_write_t* req, int status) { uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } +static void cliHandleBatch_shareConnExcept(SCliConn* conn) { + int32_t code = -1; + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { + SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, i); + ASSERT(pMsg->type != Release); + ASSERT(REQUEST_NO_RESP(&pMsg->msg) == 0); + + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; + STransMsg transMsg = {0}; + transMsg.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; + transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + transMsg.info.ahandle = NULL; + transMsg.info.cliVer = pTransInst->compatibilityVer; + transMsg.info.ahandle = pCtx->ahandle; + + int32_t ret = cliAppCb(conn, &transMsg, pMsg); + } + + // SCliConn* conn = req->data; + // if (status != 0) { + // tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); + // return; + // } + + // uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + // taosMemoryFree(req); +} static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { SCliConn* conn = req->data; + conn->shareCnt -= 1; if (status != 0) { tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); if (!uv_is_closing((uv_handle_t*)&conn->stream)) { - cliHandleExcept(conn); + cliHandleBatch_shareConnExcept(conn); } return; } @@ -1189,6 +1233,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { } uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; + pConn->shareCnt += 1; tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); @@ -1359,7 +1404,12 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip); if (ipaddr == (uint32_t)(-1)) { cliResetConnTimer(conn); - cliHandleFastFail(conn, -1); + if (conn->pBatch != NULL) { + cliHandleFastFail(conn, -1); + } else { + cliHandleBatch_shareConnExcept(conn); + } + return; } @@ -1774,19 +1824,17 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { return pConn; } -static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) { - SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCacahe, key); +static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { + SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr); if (p == NULL) { return 0; } return transHeapInsert(p, pConn); } -static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* pConn) { - size_t klen = strlen(key); - - SHeap* p = taosHashGet(pConnHeapCache, key, klen); +static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { + SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr)); if (p == NULL) { - tDebug("failed to get heap cache for key:%s, no need to del", key); + tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr); return; } int ret = transHeapDelete(p, pConn); @@ -1814,19 +1862,22 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { bool ignore = false; pConn = getConnFromPool(pThrd, addr, &ignore); if (pConn != NULL) { + addConnToHeapCache(pThrd->connHeapCache, pConn); + transQueuePush(&pConn->cliMsgs, pMsg); return cliSendBatch_shareConn(pConn); } + } else { + tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + transQueuePush(&pConn->cliMsgs, pMsg); + cliSendBatch_shareConn(pConn); + return; } pConn = cliCreateConn(pThrd); pConn->dstAddr = taosStrdup(addr); - code = addConnToHeapCache(pThrd->connHeapCache, addr, pConn); - if (code != 0) { - // do nothing - } else { - // do nothing - } + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + transQueuePush(&pConn->cliMsgs, pMsg); return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {