From ee7b67e01833f46c497717e0b1b990bb4faeee0e Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Jul 2024 06:50:01 +0000 Subject: [PATCH 01/20] refactor transport --- include/util/taoserror.h | 2 + source/libs/transport/inc/transComm.h | 18 +- source/libs/transport/src/thttp.c | 45 +++-- source/libs/transport/src/transCli.c | 228 ++++++++++++++++++++------ source/libs/transport/src/transComm.c | 35 ++-- source/libs/transport/src/transSvr.c | 213 ++++++++++++++++++------ source/util/src/terror.c | 4 +- 7 files changed, 397 insertions(+), 148 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 41c9184d27..741bd5ceb4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -90,6 +90,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) #define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) +#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026) +#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index acb9bd20f3..cc744fe14f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,11 +103,11 @@ typedef void* queue[2]; #define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) -typedef struct SRpcMsg STransMsg; -typedef SRpcCtx STransCtx; -typedef SRpcCtxVal STransCtxVal; -typedef SRpcInfo STrans; -typedef SRpcConnInfo STransHandleInfo; +typedef struct SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; +typedef SRpcInfo STrans; +typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { @@ -250,10 +250,10 @@ typedef struct { int8_t stop; } SAsyncPool; -SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); -void transAsyncPoolDestroy(SAsyncPool* pool); -int transAsyncSend(SAsyncPool* pool, queue* mq); -bool transAsyncPoolIsEmpty(SAsyncPool* pool); +int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool); +void transAsyncPoolDestroy(SAsyncPool* pool); +int transAsyncSend(SAsyncPool* pool, queue* mq); +bool transAsyncPoolIsEmpty(SAsyncPool* pool); #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \ do { \ diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index ba12774c18..10a96d8af0 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -76,9 +76,9 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); -static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); -static void httpDestroyMsg(SHttpMsg* msg); +static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); +static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); @@ -91,27 +91,27 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, - EHttpCompFlag flag) { - int32_t code = 0; + EHttpCompFlag flag) { + int32_t code = 0; int32_t len = 0; if (flag == HTTP_FLAT) { len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - uri, server, contLen); + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + uri, server, contLen); if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } } else if (flag == HTTP_GZIP) { len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Encoding: gzip\n" - "Content-Length: %d\n\n", - uri, server, contLen); + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + uri, server, contLen); if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } @@ -127,7 +127,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { void* pDest = taosMemoryMalloc(destLen); if (pDest == NULL) { - code= TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -184,7 +184,7 @@ _OVER: if (code == 0) { memcpy(pSrc, pDest, gzipStream.total_out); code = gzipStream.total_out; - } + } taosMemoryFree(pDest); return code; } @@ -635,8 +635,8 @@ void httpModuleDestroy2(SHttpModule* http) { static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId) { SHttpModule* load = NULL; - SHttpMsg *msg = NULL; - int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg); + SHttpMsg* msg = NULL; + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg); if (code != 0) { goto _ERROR; } @@ -718,9 +718,8 @@ int64_t transInitHttpChanImpl() { goto _ERROR; } - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (http->asyncPool == NULL) { - code = terrno; + code = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb, &http->asyncPool); + if (code != 0) { goto _ERROR; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index cf466f3cd9..fa7ae8c376 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2063,6 +2063,7 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { } static SCliThrd* createThrdObj(void* trans) { + int32_t code = 0; STrans* pTransInst = trans; SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); @@ -2080,10 +2081,9 @@ static SCliThrd* createThrdObj(void* trans) { return NULL; } int32_t nSync = pTransInst->supportBatch ? 4 : 8; - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); - - if (pThrd->asyncPool == NULL) { - tError("failed to init async pool"); + code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool); + if (code != 0) { + tError("failed to init async pool since:%s", tstrerror(code)); uv_loop_close(pThrd->loop); taosMemoryFree(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); @@ -2561,9 +2561,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { return pThrd; } int transReleaseCliHandle(void* handle) { - int idx = -1; - bool valid = false; - + int32_t code = 0; SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { return TSDB_CODE_RPC_BROKEN_LINK; @@ -2573,9 +2571,17 @@ int transReleaseCliHandle(void* handle) { TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCtx->ahandle = tmsg.info.ahandle; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cmsg == NULL) { + taosMemoryFree(pCtx); + return TSDB_CODE_OUT_OF_MEMORY; + } cmsg->msg = tmsg; cmsg->st = taosGetTimestampUs(); cmsg->type = Release; @@ -2584,15 +2590,19 @@ int transReleaseCliHandle(void* handle) { STraceId* trace = &tmsg.info.traceId; tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg); - if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &cmsg->q)) != 0) { destroyCmsg(cmsg); - return TSDB_CODE_RPC_BROKEN_LINK; + return code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code; } - return 0; + return code; } -static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { +static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); @@ -2602,13 +2612,20 @@ static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pRe if (ctx != NULL) pCtx->appCtx = *ctx; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + return TSDB_CODE_OUT_OF_MEMORY; + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; QUEUE_INIT(&cliMsg->seqq); - return cliMsg; + *pCliMsg = cliMsg; + + return 0; } int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { @@ -2617,7 +2634,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transFreeMsg(pReq->pCont); return TSDB_CODE_RPC_BROKEN_LINK; } - + int32_t code = 0; int64_t handle = (int64_t)pReq->info.handle; SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); if (pThrd == NULL) { @@ -2630,7 +2647,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran if (exh != NULL) { taosWLockLatch(&exh->latch); if (exh->handle == NULL && exh->inited != 0) { - SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + SCliMsg* pCliMsg = NULL; + code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + ASSERT(code == 0); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); tDebug("msg refId: %" PRId64 "", handle); @@ -2642,43 +2662,65 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetRefMgt(), handle); } } - SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + + SCliMsg* pCliMsg = NULL; + code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + if (code != 0) { + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; + } STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransRsp); return TSDB_CODE_RPC_BROKEN_LINK; } + int32_t code = 0; + + STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransRsp == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - taosMemoryFree(pTransRsp); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN1); } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + if (sem == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + + code = tsem_init(sem, 0, 0); + if (code != 0) { + taosMemoryFree(sem); + code = TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_GOTO(code, NULL, _RETURN1); + } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + sem_destroy(sem); + taosMemoryFree(sem); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; @@ -2687,6 +2729,13 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs pCtx->pRsp = pTransRsp; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + sem_destroy(sem); + taosMemoryFree(sem); + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -2697,11 +2746,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); - if (ret != 0) { + code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (code != 0) { destroyCmsg(cliMsg); - ret = TSDB_CODE_RPC_BROKEN_LINK; - goto _RETURN; + TAOS_CHECK_GOTO((code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code), NULL, _RETURN); } tsem_wait(sem); @@ -2712,13 +2760,28 @@ _RETURN: taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); taosMemoryFree(pTransRsp); - return ret; + return code; +_RETURN1: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosMemoryFree(pTransRsp); + taosMemoryFree(pReq->pCont); + return code; } -int64_t transCreateSyncMsg(STransMsg* pTransMsg) { +int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) { + int32_t code = 0; tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); - tsem2_init(sem, 0, 0); + if (sem == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (tsem2_init(sem, 0, 0) != 0) { + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _EXIT); + } STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); + if (pSyncMsg == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _EXIT); + } taosInitRWLatch(&pSyncMsg->latch); pSyncMsg->inited = 0; @@ -2726,39 +2789,69 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) { pSyncMsg->pSem = sem; pSyncMsg->hasEpSet = 0; - return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); + int64_t id = taosAddRef(transGetSyncMsgMgt(), pSyncMsg); + if (id < 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _EXIT); + } else { + *refId = id; + } + return 0; + +_EXIT: + tsem2_destroy(sem); + taosMemoryFree(sem); + taosMemoryFree(pSyncMsg); + return code; } int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + int32_t code = 0; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); return TSDB_CODE_RPC_BROKEN_LINK; } + STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransMsg == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN2); } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg); + + if ((code = transCreateSyncMsg(pTransMsg, &pCtx->syncMsgRef)) != 0) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(code, NULL, _RETURN2); + } int64_t ref = pCtx->syncMsgRef; STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref); + if (pSyncMsg == NULL) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _RETURN2); + } SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -2769,17 +2862,17 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); - if (ret != 0) { + code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (code != 0) { destroyCmsg(cliMsg); - ret = TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code, NULL, _RETURN); goto _RETURN; } - ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs); - if (ret < 0) { + code = tsem2_timewait(pSyncMsg->pSem, timeoutMs); + if (code < 0) { pRsp->code = TSDB_CODE_TIMEOUT_ERROR; - ret = TSDB_CODE_TIMEOUT_ERROR; + code = TSDB_CODE_TIMEOUT_ERROR; } else { memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); pSyncMsg->pRsp->pCont = NULL; @@ -2787,13 +2880,18 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr epsetAssign(pEpSet, &pSyncMsg->epSet); *epUpdated = 1; } - ret = 0; + code = 0; } _RETURN: transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); taosReleaseRef(transGetSyncMsgMgt(), ref); taosRemoveRef(transGetSyncMsgMgt(), ref); - return ret; + return code; +_RETURN2: + transFreeMsg(pReq->pCont); + taosMemoryFree(pTransMsg); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } /* * @@ -2810,11 +2908,24 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); cvtAddr.cvt = true; } - for (int i = 0; i < pTransInst->numOfThreads; i++) { + + int32_t code = 0; + int8_t i = 0; + for (i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pCtx->cvtAddr = cvtAddr; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } cliMsg->ctx = pCtx; cliMsg->type = Update; cliMsg->refId = (int64_t)shandle; @@ -2822,21 +2933,30 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); - if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { + if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) { destroyCmsg(cliMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + break; } } + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return 0; + return code; } int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); - ASSERT(exh == self); if (exh != self) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b9223e7b39..d31ba34afe 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -67,7 +67,11 @@ int32_t transDecompressMsg(char** msg, int32_t len) { STransCompMsg* pComp = (STransCompMsg*)pCont; int32_t oriLen = htonl(pComp->contLen); - char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + STransMsgHead* pNewHead = (STransMsgHead*)buf; int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content, len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen); @@ -78,7 +82,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) { taosMemoryFree(pHead); *msg = buf; if (decompLen != oriLen) { - return -1; + return TSDB_CODE_INVALID_MSG; } return 0; } @@ -222,19 +226,19 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) { // int ret = uv_tcp_keepalive(stream, 5, 60); } -SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { +int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); if (pool == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; + // return NULL; } + int32_t code = 0; pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); if (pool->asyncs == NULL) { taosMemoryFree(pool); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } int i = 0, err = 0; @@ -243,7 +247,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); if (item == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } item->pThrd = arg; @@ -254,7 +258,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) err = uv_async_init(loop, async, cb); if (err != 0) { tError("failed to init async, reason:%s", uv_err_name(err)); - terrno = TSDB_CODE_THIRDPARTY_ERROR; + code = TSDB_CODE_THIRDPARTY_ERROR; break; } } @@ -264,7 +268,9 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) pool = NULL; } - return pool; + *pPool = pool; + return 0; + // return pool; } void transAsyncPoolDestroy(SAsyncPool* pool) { @@ -289,7 +295,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool) { } int transAsyncSend(SAsyncPool* pool, queue* q) { if (atomic_load_8(&pool->stop) == 1) { - return -1; + return TSDB_CODE_RPC_ASYNC_MODULE_QUIT; } int idx = pool->index % pool->nAsync; @@ -303,7 +309,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { taosThreadMutexLock(&item->mtx); QUEUE_PUSH(&item->qmsg, q); taosThreadMutexUnlock(&item->mtx); - return uv_async_send(async); + int ret = uv_async_send(async); + if (ret != 0) { + tError("failed to send async,reason:%s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + return 0; } void transCtxInit(STransCtx* ctx) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 674bb86fb5..48fce78e79 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -181,8 +181,8 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); -static bool addHandleToAcceptloop(void* arg); +static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); +static int32_t addHandleToAcceptloop(void* arg); #define SRV_RELEASE_UV(loop) \ do { \ @@ -221,8 +221,16 @@ static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) { } SIpWhiteListTab* uvWhiteListCreate() { SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab)); + if (pWhiteList == NULL) { + return NULL; + } pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); + if (pWhiteList->pList == NULL) { + taosMemoryFree(pWhiteList); + return NULL; + } + pWhiteList->ver = -1; return pWhiteList; } @@ -1005,17 +1013,36 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { +static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { + int32_t code = 0; pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - if (0 != uv_loop_init(pThrd->loop)) { - return false; + if (pThrd->loop == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if ((code = uv_loop_init(pThrd->loop)) != 0) { + tError("failed to init loop since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } #if defined(WINDOWS) || defined(DARWIN) - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + if (code != 0) { + tError("failed to init pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #else - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - uv_pipe_open(pThrd->pipe, pThrd->fd); + code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + if (code != 0) { + tError("failed to init pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + code = uv_pipe_open(pThrd->pipe, pThrd->fd); + if (code != 0) { + tError("failed to open pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #endif pThrd->pipe->data = pThrd; @@ -1023,50 +1050,90 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->msg); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - uv_prepare_init(pThrd->loop, pThrd->prepare); - uv_prepare_start(pThrd->prepare, uvPrepareCb); + if (pThrd->prepare == NULL) { + tError("failed to init prepare"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = uv_prepare_init(pThrd->loop, pThrd->prepare); + if (code != 0) { + tError("failed to init prepare since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + code = uv_prepare_start(pThrd->prepare, uvPrepareCb); + if (code != 0) { + tError("failed to start prepare since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } pThrd->prepare->data = pThrd; // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb); + code = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb, &pThrd->asyncPool); + if (code != 0) { + tError("failed to init async pool since:%s", tstrerror(code)); + return code; + } #if defined(WINDOWS) || defined(DARWIN) - uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + code = uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + if (code != 0) { + tError("failed to start connect pipe:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + #else - uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + if (code != 0) { + tError("failed to start read pipe:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #endif - return true; + return 0; } -static bool addHandleToAcceptloop(void* arg) { +static int32_t addHandleToAcceptloop(void* arg) { // impl later SServerObj* srv = arg; - int err = 0; - if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) { - tError("failed to init accept server:%s", uv_err_name(err)); - return false; + int code = 0; + if ((code = uv_tcp_init(srv->loop, &srv->server)) != 0) { + tError("failed to init accept server since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } // register an async here to quit server gracefully srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t)); - uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + if (srv->pAcceptAsync == NULL) { + tError("failed to create async since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + if (code != 0) { + tError("failed to init async since:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } srv->pAcceptAsync->data = srv; struct sockaddr_in bind_addr; - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { - tError("failed to bind:%s", uv_err_name(err)); - return false; + if ((code = uv_ip4_addr("0.0.0.0", srv->port, &bind_addr)) != 0) { + tError("failed to bind addr since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } - if ((err = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) { - tError("failed to listen:%s", uv_err_name(err)); - terrno = TSDB_CODE_RPC_PORT_EADDRINUSE; - return false; + + if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { + tError("failed to bind since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } - return true; + if ((code = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) { + tError("failed to listen since %s", uv_err_name(code)); + return TSDB_CODE_RPC_PORT_EADDRINUSE; + } + return 0; } + void* transWorkerThread(void* arg) { setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; @@ -1079,6 +1146,9 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); + if (pConn == NULL) { + return NULL; + } transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); @@ -1204,24 +1274,41 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { } void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + int32_t code = 0; + SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); - srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); + if (srv == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tError("failed to init server since: %s", tstrerror(code)); + return NULL; + } + + srv->ip = ip; + srv->port = port; srv->numOfThreads = numOfThreads; srv->workerIdx = 0; srv->numOfWorkerReady = 0; + srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); - srv->ip = ip; - srv->port = port; - uv_loop_init(srv->loop); + if (srv->loop == NULL || srv->pThreadObj == NULL || srv->pipe == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } - char pipeName[PATH_MAX]; + code = uv_loop_init(srv->loop); + if (code != 0) { + tError("failed to init server since: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } if (false == taosValidIpAndPort(srv->ip, srv->port)) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + char pipeName[PATH_MAX]; #if defined(WINDOWS) || defined(DARWIN) int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); @@ -1259,7 +1346,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd, pipeName)) { + if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1276,27 +1363,53 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); + if (thrd == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } thrd->pTransInst = shandle; thrd->quit = false; thrd->pTransInst = shandle; thrd->pWhiteList = uvWhiteListCreate(); - - srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - srv->pThreadObj[i] = thrd; - - uv_os_sock_t fds[2]; - if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + if (thrd->pWhiteList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto End; } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + if (srv->pipe[i] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } + + srv->pThreadObj[i] = thrd; + + uv_os_sock_t fds[2]; + if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) { + tError("failed to create pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } + + code = uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + if (code != 0) { + tError("failed to init pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } + + code = uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + if (code != 0) { + tError("failed to init pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } thrd->pipe = &(srv->pipe[i][1]); // init read thrd->fd = fds[0]; - if (false == addHandleToWorkloop(thrd, pipeName)) { + if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1311,15 +1424,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } #endif - if (false == addHandleToAcceptloop(srv)) { + if ((code = addHandleToAcceptloop(srv)) != 0) { goto End; } - int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); - if (err == 0) { + code = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); + if (code == 0) { tDebug("success to create accept-thread"); } else { - tError("failed to create accept-thread"); + code = TAOS_SYSTEM_ERROR(errno); + tError("failed to create accept-thread since %s", tstrerror(code)); + goto End; // clear all resource later } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4db1475fa9..69d6e6e131 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -55,8 +55,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") From 83449e4b70206c46fd1237603928b9f0945d08cf Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Jul 2024 07:22:41 +0000 Subject: [PATCH 02/20] refactor transport --- source/libs/transport/src/transCli.c | 44 ++++++++++++++++------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fa7ae8c376..d0ce1c19bf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -196,8 +196,8 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); -static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn); -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); +static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later @@ -1225,6 +1225,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + int32_t code = 0; if (pThrd->quit == true) { cliDestroyBatch(pBatch); return; @@ -1253,15 +1254,13 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); - if (ipaddr == 0xffffffff) { + uint32_t ipaddr = 0; + if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) { uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - - cliHandleFastFail(conn, terrno); - terrno = 0; + cliHandleFastFail(conn, code); return; } struct sockaddr_in addr; @@ -1561,23 +1560,28 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { return 0; } -static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { + +static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { + int32_t code = 0; uint32_t addr = 0; size_t len = strlen(fqdn); uint32_t* v = taosHashGet(cache, fqdn, len); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { - terrno = TSDB_CODE_RPC_FQDN_ERROR; - tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr()); - return addr; + code = TSDB_CODE_RPC_FQDN_ERROR; + tError("failed to get ip from fqdn:%s since %s", fqdn, tstrerror(code)); + return code; } - taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); + if ((code = taosHashPut(cache, fqdn, len, &addr, sizeof(addr)) != 0)) { + return code; + } + *ip = addr; } else { - addr = *v; + *ip = *v; } - return addr; + return 0; } static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later @@ -1622,6 +1626,7 @@ static void doFreeTimeoutMsg(void* param) { doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } + void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; @@ -1671,15 +1676,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->dstAddr = taosStrdup(addr); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); - if (ipaddr == 0xffffffff) { + uint32_t ipaddr; + int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); + if (code != 0) { uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - cliHandleExcept(conn, terrno); - terrno = 0; + cliHandleExcept(conn, code); return; } @@ -1697,12 +1702,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { errno = 0; return; } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); cliHandleExcept(conn, -1); return; } + ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); if (ret != 0) { tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); @@ -2926,6 +2933,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { code = TSDB_CODE_OUT_OF_MEMORY; break; } + cliMsg->ctx = pCtx; cliMsg->type = Update; cliMsg->refId = (int64_t)shandle; From 10d6aeef8ddecdbf35c9b01973f1135bed0ce396 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Jul 2024 09:38:19 +0000 Subject: [PATCH 03/20] refactor transport --- include/libs/transport/trpc.h | 14 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 3 +- source/libs/transport/inc/transComm.h | 22 +-- source/libs/transport/src/trans.c | 2 +- source/libs/transport/src/transCli.c | 75 ++++++-- source/libs/transport/src/transComm.c | 85 +++++---- source/libs/transport/src/transSvr.c | 164 +++++++++++++----- 7 files changed, 261 insertions(+), 104 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b7a459f957..6c0d04354a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -164,13 +164,13 @@ int rpcRegisterBrokenLinkArg(SRpcMsg *msg); int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process -int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, - int32_t timeoutMs); -int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -void *rpcAllocHandle(); -void rpcSetIpWhite(void *thandl, void *arg); +int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, + int32_t timeoutMs); +int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +void *rpcAllocHandle(); +int32_t rpcSetIpWhite(void *thandl, void *arg); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index bf35319fae..40d070afc6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -64,10 +64,11 @@ static void dmConvertErrCode(tmsg_t msgType) { } } static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { + int32_t code = 0; SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite)); tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite); - rpcSetIpWhite(pTrans, &ipWhite); + code = rpcSetIpWhite(pTrans, &ipWhite); pData->ipWhiteVer = ipWhite.ver; tFreeSUpdateIpWhiteReq(&ipWhite); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index cc744fe14f..47eeeee5cb 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -279,6 +279,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \ exh2 ? exh2->refId : 0, id); \ + code = terrno; \ goto _return1; \ } \ } else if (id == 0) { \ @@ -287,6 +288,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); if (exh2 == NULL || id == exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, id, \ exh2 ? exh2->refId : 0); \ + code = terrno; \ goto _return1; \ } else { \ id = exh1->refId; \ @@ -316,14 +318,14 @@ void transUnrefCliHandle(void* handle); int transReleaseCliHandle(void* handle); int transReleaseSrvHandle(void* handle); -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, - int32_t timeoutMs); -int transSendResponse(const STransMsg* msg); -int transRegisterMsg(const STransMsg* msg); -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); -void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs); +int transSendResponse(const STransMsg* msg); +int transRegisterMsg(const STransMsg* msg); +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); int transSockInfo2Str(struct sockaddr* sockname, char* dst); @@ -363,7 +365,7 @@ typedef struct { * init queue * note: queue'size is small, default 1 */ -void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); +int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); /* * put arg into queue @@ -420,7 +422,7 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); +int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); void transDQCancel(SDelayQueue* queue, SDelayTask* task); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5ed2e00acd..d5a50ccf20 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -186,7 +186,7 @@ int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { return transSetDefaultAddr(thandle, ip, fqdn); } // server only -void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); } +int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); } void* rpcAllocHandle() { return (void*)transAllocHandle(); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d0ce1c19bf..2c583bb3c1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -180,12 +180,12 @@ static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); -static SCliConn* cliCreateConn(SCliThrd* thrd); -static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void cliDestroy(uv_handle_t* handle); -static void cliSend(SCliConn* pConn); -static void cliSendBatch(SCliConn* pConn); -static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); +static void cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); @@ -909,16 +909,35 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } -static SCliConn* cliCreateConn(SCliThrd* pThrd) { +static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { + int32_t code = 0; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); + if (conn == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + // read/write stream handle conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + if (conn->stream == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _failed); + } + + code = uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + if (code != 0) { + tError("failed to init tcp handle, code:%d, %s", code, uv_strerror(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _failed); + } conn->stream->data = conn; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _failed); + } + tDebug("no available timer, create a timer %p", timer); uv_timer_init(pThrd->loop, timer); } @@ -927,8 +946,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->timer = timer; conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); - transQueueInit(&conn->cliMsgs, NULL); - transInitBuffer(&conn->readBuf); + + TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); + + TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); + QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -936,9 +958,19 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transRefCliHandle(conn); atomic_add_fetch_32(&pThrd->connCount, 1); - allocConnRef(conn, false); - return conn; + TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); + + *pCliConn = conn; + return code; +_failed: + if (conn) { + taosMemoryFree(conn->stream); + transReqQueueClear(&conn->wreqQueue); + transDestroyBuffer(&conn->readBuf); + } + taosMemoryFree(conn); + return code; } static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; @@ -1250,9 +1282,22 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { return; } if (conn == NULL) { - conn = cliCreateConn(pThrd); + code = cliCreateConn(pThrd, &conn); + if (code != 0) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pTransInst->label, + pBatch->wLen, pBatch->batchSize, pTransInst->connLimitNum, tstrerror(code)); + cliDestroyBatch(pBatch); + return; + } + conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); + if (conn->dstAddr == NULL) { + tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + cliHandleFastFail(conn, -1); + return; + } uint32_t ipaddr = 0; if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) { @@ -1263,6 +1308,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliHandleFastFail(conn, code); return; } + struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; @@ -1628,6 +1674,7 @@ static void doFreeTimeoutMsg(void* param) { } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code = 0; STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); @@ -1666,7 +1713,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); } else { - conn = cliCreateConn(pThrd); + code = cliCreateConn(pThrd, &conn); int64_t refId = (int64_t)pMsg->msg.info.handle; if (refId != 0) specifyConnRef(conn, true, refId); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index d31ba34afe..53ee1bbca7 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -103,8 +103,12 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) { return r; } int transInitBuffer(SConnBuffer* buf) { - buf->cap = BUFFER_CAP; buf->buf = taosMemoryCalloc(1, BUFFER_CAP); + if (buf->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + buf->cap = BUFFER_CAP; buf->left = -1; buf->len = 0; buf->total = 0; @@ -419,9 +423,14 @@ void transReqQueueClear(queue* q) { } } -void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { +int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { queue->q = taosArrayInit(2, sizeof(void*)); queue->freeFunc = (void (*)(const void*))freeFunc; + + if (queue->q == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; } bool transQueuePush(STransQueue* queue, void* arg) { if (queue->q == NULL) { @@ -524,20 +533,44 @@ static void transDQTimeout(uv_timer_t* timer) { uv_timer_start(queue->timer, transDQTimeout, timeout, 0); } } -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { - uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - uv_timer_init(loop, timer); +int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { + int32_t code = 0; + Heap* heap = NULL; + uv_timer_t* timer = NULL; + SDelayQueue* q = NULL; - Heap* heap = heapCreate(timeCompare); + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + heap = heapCreate(timeCompare); + if (heap == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1); + } + + q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + if (q == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1); + } q->heap = heap; q->timer = timer; q->loop = loop; q->timer->data = q; + int err = uv_timer_init(loop, timer); + if (err != 0) { + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _return1); + } + *queue = q; return 0; + +_return1: + taosMemoryFree(timer); + heapDestroy(heap); + taosMemoryFree(q); + return TSDB_CODE_OUT_OF_MEMORY; } void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { @@ -720,29 +753,6 @@ void transDestroySyncMsg(void* msg) { taosMemoryFree(pSyncMsg); } -// void subnetIp2int(const char* const ip_addr, uint8_t* dst) { -// char ip_addr_cpy[20]; -// char ip[5]; - -// tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy)); - -// char *s_start, *s_end; -// s_start = ip_addr_cpy; -// s_end = ip_addr_cpy; - -// int32_t k = 0; - -// for (k = 0; *s_start != '\0'; s_start = s_end) { -// for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { -// } -// if (*s_end == '.') { -// *s_end = '\0'; -// s_end++; -// } -// dst[k++] = (char)atoi(s_start); -// } -// } - uint32_t subnetIpRang2Int(SIpV4Range* pRange) { uint32_t ip = pRange->ip; return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF); @@ -789,7 +799,11 @@ int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { struct in_addr addr; addr.s_addr = pRange->ip; - uv_inet_ntop(AF_INET, &addr, buf, 32); + int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32); + if (err != 0) { + tError("failed to convert ip to string, reason:%s", uv_strerror(err)); + return TSDB_CODE_THIRDPARTY_ERROR; + } len = strlen(buf); @@ -805,14 +819,23 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { *ppBuf = NULL; return 0; } + int32_t len = 0; char* pBuf = taosMemoryCalloc(1, pList->num * 36); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } for (int i = 0; i < pList->num; i++) { SIpV4Range* pRange = &pList->pIpRange[i]; char tbuf[32] = {0}; int tlen = transUtilSIpRangeToStr(pRange, tbuf); + if (tlen < 0) { + taosMemoryFree(pBuf); + return tlen; + } + len += sprintf(pBuf + len, "%s,", tbuf); } if (len > 0) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 48fce78e79..df8928bd02 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -122,7 +122,7 @@ typedef struct SServerObj { SIpWhiteListTab* uvWhiteListCreate(); void uvWhiteListDestroy(SIpWhiteListTab* pWhite); -void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver); +int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver); void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable); bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn); bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver); @@ -248,17 +248,26 @@ void uvWhiteListDestroy(SIpWhiteListTab* pWhite) { taosMemoryFree(pWhite); } -void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) { +int32_t uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) { char* tmp = NULL; int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp); + if (tlen < 0) { + return tlen; + } + + char* pBuf = taosMemoryCalloc(1, tlen + 64); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - char* pBuf = taosMemoryCalloc(1, tlen + 64); int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp); taosMemoryFree(tmp); *ppBuf = pBuf; + return len; } void uvWhiteListDebug(SIpWhiteListTab* pWrite) { + int32_t code = 0; SHashObj* pWhiteList = pWrite->pList; void* pIter = taosHashIterate(pWhiteList, NULL); while (pIter) { @@ -270,23 +279,35 @@ void uvWhiteListDebug(SIpWhiteListTab* pWrite) { SWhiteUserList* pUserList = *(SWhiteUserList**)pIter; char* buf = NULL; - uvWhiteListToStr(pUserList, user, &buf); - tDebug("ip-white-list %s", buf); + + code = uvWhiteListToStr(pUserList, user, &buf); + if (code != 0) { + tDebug("ip-white-list failed to debug to str since %s", buf); + } taosMemoryFree(buf); pIter = taosHashIterate(pWhiteList, pIter); } } -void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) { +int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) { + int32_t code = 0; SHashObj* pWhiteList = pWhite->pList; SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user)); if (ppUserList == NULL || *ppUserList == NULL) { SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); + if (pUserList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pUserList->ver = ver; pUserList->pList = plist; - taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*)); + code = taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*)); + if (code != 0) { + taosMemoryFree(pUserList); + return code; + } } else { SWhiteUserList* pUserList = *ppUserList; @@ -295,6 +316,7 @@ void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, in pUserList->pList = plist; } uvWhiteListDebug(pWhite); + return 0; } void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) { @@ -1502,30 +1524,41 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { } void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { SUpdateIpWhite* req = msg->arg; - if (req != NULL) { - for (int i = 0; i < req->numOfUser; i++) { - SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i]; - - int32_t sz = pUser->numOfRange * sizeof(SIpV4Range); - SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList)); - pList->num = pUser->numOfRange; - - memcpy(pList->pIpRange, pUser->pIpRanges, sz); - uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver); - } - - thrd->pWhiteList->ver = req->ver; - thrd->enableIpWhiteList = 1; - - tFreeSUpdateIpWhiteReq(req); - taosMemoryFree(req); - } else { + if (req == NULL) { tDebug("ip-white-list disable on trans"); thrd->enableIpWhiteList = 0; + taosMemoryFree(msg); } - taosMemoryFree(msg); - return; + int32_t code = 0; + for (int i = 0; i < req->numOfUser; i++) { + SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i]; + + int32_t sz = pUser->numOfRange * sizeof(SIpV4Range); + + SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList)); + if (pList == NULL) { + tError("failed to create ip-white-list since %s", tstrerror(code)); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pList->num = pUser->numOfRange; + memcpy(pList->pIpRange, pUser->pIpRanges, sz); + code = uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver); + if (code != 0) { + break; + } + } + + if (code == 0) { + thrd->pWhiteList->ver = req->ver; + thrd->enableIpWhiteList = 1; + } else { + tError("failed to update ip-white-list since %s", tstrerror(code)); + } + tFreeSUpdateIpWhiteReq(req); + taosMemoryFree(req); } + void destroyWorkThrd(SWorkThrd* pThrd) { if (pThrd == NULL) { return; @@ -1598,6 +1631,7 @@ void transUnrefSrvHandle(void* handle) { } int transReleaseSrvHandle(void* handle) { + int32_t code = 0; SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; int64_t refId = info->refId; @@ -1610,12 +1644,19 @@ int transReleaseSrvHandle(void* handle) { STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (m == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return1; + } + m->msg = tmsg; m->type = Release; tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); - if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); + transReleaseExHandle(transGetRefMgt(), refId); + return code; } transReleaseExHandle(transGetRefMgt(), refId); @@ -1623,13 +1664,15 @@ int transReleaseSrvHandle(void* handle) { _return1: tDebug("handle %p failed to send to release handle", exh); transReleaseExHandle(transGetRefMgt(), refId); - return -1; + return code; _return2: tDebug("handle %p failed to send to release handle", exh); - return -1; + return code; } int transSendResponse(const STransMsg* msg) { + int32_t code = 0; + if (msg->info.noResp) { rpcFreeCont(msg->pCont); tTrace("no need send resp"); @@ -1651,13 +1694,20 @@ int transSendResponse(const STransMsg* msg) { ASYNC_ERR_JRET(pThrd); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (m == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return1; + } + m->msg = tmsg; m->type = Normal; STraceId* trace = (STraceId*)&msg->info.traceId; tGDebug("conn %p start to send resp (1/2)", exh->handle); - if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); + transReleaseExHandle(transGetRefMgt(), refId); + return code; } transReleaseExHandle(transGetRefMgt(), refId); @@ -1667,13 +1717,15 @@ _return1: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return -1; + return code; _return2: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - return -1; + return code; } int transRegisterMsg(const STransMsg* msg) { + int32_t code = 0; + SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1687,13 +1739,20 @@ int transRegisterMsg(const STransMsg* msg) { ASYNC_ERR_JRET(pThrd); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (m == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return1; + } + m->msg = tmsg; m->type = Register; STrans* pTransInst = pThrd->pTransInst; tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); - if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); + transReleaseExHandle(transGetRefMgt(), refId); + return code; } transReleaseExHandle(transGetRefMgt(), refId); @@ -1703,29 +1762,54 @@ _return1: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return -1; + return code; _return2: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - return -1; + return code; } -void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { + +int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle); + if (pTransInst == NULL) { + return TSDB_CODE_RPC_MODULE_QUIT; + } + + int32_t code = 0; tDebug("ip-white-list update on rpc"); SServerObj* svrObj = pTransInst->tcphandle; for (int i = 0; i < svrObj->numOfThreads; i++) { SWorkThrd* pThrd = svrObj->pThreadObj[i]; - SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); - SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL); + SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (msg == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + + SUpdateIpWhite* pReq = NULL; + if (arg != NULL) { + if ((pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg)) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + } msg->type = Update; msg->arg = pReq; - transAsyncSend(pThrd->asyncPool, &msg->q); + if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) { + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + break; + } } transReleaseExHandle(transGetInstMgt(), (int64_t)thandle); + + if (code != 0) { + tError("ip-white-list update failed since %s", tstrerror(code)); + } + return code; } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } From 32c0c40c4d49c5cc4673e533fb3f52551390cce4 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Jul 2024 10:12:30 +0000 Subject: [PATCH 04/20] refactor transport --- include/common/tmsg.h | 8 +++---- source/common/src/tmsg.c | 36 ++++++++++++++++++++++++---- source/libs/transport/src/transSvr.c | 18 ++++++++------ 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5dea8a44e..9ab4fffb79 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1051,10 +1051,10 @@ typedef struct { SUpdateUserIpWhite* pUserIpWhite; } SUpdateIpWhite; -int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); -SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq, SUpdateIpWhite** pUpdate); typedef struct { int64_t ipWhiteVer; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 98e6530be0..a86b8e29fe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1774,20 +1774,35 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR return 0; } void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { - for (int i = 0; i < pReq->numOfUser; i++) { - SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; - taosMemoryFree(pUserWhite->pIpRanges); + if (pReq == NULL) return; + + if (pReq->pUserIpWhite) { + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; + taosMemoryFree(pUserWhite->pIpRanges); + } } taosMemoryFree(pReq->pUserIpWhite); // impl later return; } -SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { +int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) { + int32_t code = 0; + if (pReq == NULL) { + return 0; + } SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); + if (pClone == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } pClone->numOfUser = pReq->numOfUser; pClone->ver = pReq->ver; pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); + if (pClone->pUserIpWhite == NULL) { + taosMemoryFree(pClone); + return TSDB_CODE_OUT_OF_MEMORY; + } for (int i = 0; i < pReq->numOfUser; i++) { SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i]; @@ -1799,9 +1814,20 @@ SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { int32_t sz = pOld->numOfRange * sizeof(SIpV4Range); pNew->pIpRanges = taosMemoryCalloc(1, sz); + if (pNew->pIpRanges == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } memcpy(pNew->pIpRanges, pOld->pIpRanges, sz); } - return pClone; +_return: + if (code < 0) { + tFreeSUpdateIpWhiteReq(pClone); + taosMemoryFree(pClone); + } else { + *pUpdateMsg = pClone; + } + return code; } int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { SEncoder encoder = {0}; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index df8928bd02..82366f52b9 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -614,6 +614,10 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); + if (pMsg->pCont == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); @@ -628,7 +632,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { transQueuePop(&pConn->srvMsgs); destroySmsg(smsg); - return -1; + return TSDB_CODE_INVALID_MSG; } if (pConn->status == ConnNormal) { @@ -1528,6 +1532,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { tDebug("ip-white-list disable on trans"); thrd->enableIpWhiteList = 0; taosMemoryFree(msg); + return; } int32_t code = 0; for (int i = 0; i < req->numOfUser; i++) { @@ -1557,6 +1562,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { } tFreeSUpdateIpWhiteReq(req); taosMemoryFree(req); + taosMemoryFree(msg); } void destroyWorkThrd(SWorkThrd* pThrd) { @@ -1787,13 +1793,11 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { code = TSDB_CODE_OUT_OF_MEMORY; break; } - SUpdateIpWhite* pReq = NULL; - if (arg != NULL) { - if ((pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg)) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } + code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq); + if (code != 0) { + taosMemoryFree(msg); + break; } msg->type = Update; From ae7d4f1f85e6a013ed796500876e6abda573b501 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 05:46:08 +0000 Subject: [PATCH 05/20] refactor transport --- source/libs/transport/inc/transComm.h | 6 +++--- source/libs/transport/src/trans.c | 12 +++++++----- source/libs/transport/src/transComm.c | 8 ++++++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 47eeeee5cb..73c0fb6228 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -435,9 +435,9 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b); */ void transThreadOnce(); -void transInit(); -void transCleanup(); -void transPrintEpSet(SEpSet* pEpSet); +int32_t transInit(); +void transCleanup(); +void transPrintEpSet(SEpSet* pEpSet); void transFreeMsg(void* msg); int32_t transCompressMsg(char* msg, int32_t len); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index d5a50ccf20..58b1d76d23 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -34,10 +34,14 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { return 0; } void* rpcOpen(const SRpcInit* pInit) { - rpcInit(); + int32_t code = rpcInit(); + if (code != 0) { + return NULL; + } SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { + // return TSDB_CODE_OUT_OF_MEMORY; return NULL; } if (pInit->label) { @@ -202,10 +206,8 @@ int32_t rpcCvtErrCode(int32_t code) { return code; } -int32_t rpcInit() { - transInit(); - return 0; -} +int32_t rpcInit() { return transInit(); } + void rpcCleanup(void) { transCleanup(); transHttpEnvDestroy(); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 53ee1bbca7..3acdef7f5e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -692,9 +692,13 @@ static void transDestroyEnv() { transCloseRefMgt(transSyncMsgMgt); } -void transInit() { +int32_t transInit() { // init env - taosThreadOnce(&transModuleInit, transInitEnv); + int32_t code = taosThreadOnce(&transModuleInit, transInitEnv); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + } + return code; } int32_t transGetRefMgt() { return refMgt; } From 254dbd617192ee98bb319f94e5ed5cb82cfb428d Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 08:06:48 +0000 Subject: [PATCH 06/20] refactor transport --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2c583bb3c1..01013d59c0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2650,6 +2650,7 @@ int transReleaseCliHandle(void* handle) { } return code; } + static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); From 19a03b2eb27cf2049b0df9a2c9ae44cadf3204bb Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 08:55:08 +0000 Subject: [PATCH 07/20] refactor transport --- source/libs/transport/src/trans.c | 24 ++--- source/libs/transport/src/transCli.c | 136 ++++++++++++++++++++------ source/libs/transport/src/transComm.c | 2 + source/libs/transport/src/transSvr.c | 1 + 4 files changed, 121 insertions(+), 42 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 58b1d76d23..266b6cb3a7 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -28,21 +28,19 @@ int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transRelease static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { *ip = taosGetIpv4FromFqdn(localFqdn); if (*ip == 0xFFFFFFFF) { - terrno = TSDB_CODE_RPC_FQDN_ERROR; - return -1; + return TSDB_CODE_RPC_FQDN_ERROR; } return 0; } void* rpcOpen(const SRpcInit* pInit) { int32_t code = rpcInit(); if (code != 0) { - return NULL; + TAOS_CHECK_GOTO(code, NULL, _end); } SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { - // return TSDB_CODE_OUT_OF_MEMORY; - return NULL; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } if (pInit->label) { int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label); @@ -88,10 +86,9 @@ void* rpcOpen(const SRpcInit* pInit) { uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { - if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) { - tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, terrstr()); - taosMemoryFree(pRpc); - return NULL; + if ((code = transValidLocalFqdn(pInit->localFqdn, &ip)) != 0) { + tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); } } @@ -109,14 +106,19 @@ void* rpcOpen(const SRpcInit* pInit) { (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); if (pRpc->tcphandle == NULL) { - taosMemoryFree(pRpc); - return NULL; + tError("failed to init rpc handle"); + TAOS_CHECK_GOTO(terrno, NULL, _end); } int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); transAcquireExHandle(transGetInstMgt(), refId); pRpc->refId = refId; return (void*)refId; +_end: + taosMemoryFree(pRpc); + terrno = code; + + return NULL; } void rpcClose(void* arg) { tInfo("start to close rpc"); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 01013d59c0..362f8cef81 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -226,10 +226,9 @@ static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); // thread obj -static SCliThrd* createThrdObj(void* trans); -static void destroyThrdObj(SCliThrd* pThrd); - -static void cliWalkCb(uv_handle_t* handle, void* arg); +static int32_t createThrdObj(void* trans, SCliThrd** pThrd); +static void destroyThrdObj(SCliThrd* pThrd); +static void cliWalkCb(uv_handle_t* handle, void* arg); #define CLI_RELEASE_UV(loop) \ do { \ @@ -2048,22 +2047,32 @@ static void* cliWorkThread(void* arg) { } void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + int32_t code = 0; SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj)); + if (cli == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } STrans* pTransInst = shandle; memcpy(cli->label, label, TSDB_LABEL_LEN); cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*)); + if (cli->pThreadObj == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrd* pThrd = createThrdObj(shandle); - if (pThrd == NULL) { + SCliThrd* pThrd = NULL; + code = createThrdObj(shandle, &pThrd); + if (code != 0) { goto _err; } int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err != 0) { - goto _err; + code = TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_GOTO(code, NULL, _err); } else { tDebug("success to create tranport-cli thread:%d", i); } @@ -2072,8 +2081,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, return cli; _err: - taosMemoryFree(cli->pThreadObj); - taosMemoryFree(cli); + if (cli) { + taosMemoryFree(cli->pThreadObj); + taosMemoryFree(cli); + } + terrno = code; return NULL; } @@ -2116,67 +2128,129 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { taosMemoryFree(pMsg); } -static SCliThrd* createThrdObj(void* trans) { +static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { int32_t code = 0; STrans* pTransInst = trans; SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - int err = uv_loop_init(pThrd->loop); - if (err != 0) { - tError("failed to init uv_loop, reason:%s", uv_err_name(err)); - taosMemoryFree(pThrd->loop); - taosThreadMutexDestroy(&pThrd->msgMtx); - taosMemoryFree(pThrd); - return NULL; + if (pThrd->loop == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + + code = uv_loop_init(pThrd->loop); + if (code != 0) { + tError("failed to init uv_loop, reason:%s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _end); + } + int32_t nSync = pTransInst->supportBatch ? 4 : 8; code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool); if (code != 0) { tError("failed to init async pool since:%s", tstrerror(code)); - uv_loop_close(pThrd->loop); - taosMemoryFree(pThrd->loop); - taosThreadMutexDestroy(&pThrd->msgMtx); - taosMemoryFree(pThrd); - return NULL; + TAOS_CHECK_GOTO(code, NULL, _end); } pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - uv_prepare_init(pThrd->loop, pThrd->prepare); + if (pThrd->prepare == NULL) { + tError("failed to create prepre since:%s", tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); + } + + code = uv_prepare_init(pThrd->loop, pThrd->prepare); + if (code != 0) { + tError("failed to create prepre since:%s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->prepare->data = pThrd; - // uv_prepare_start(pThrd->prepare, cliPrepareCb); int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); + if (pThrd->timerList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } + for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } uv_timer_init(pThrd->loop, timer); taosArrayPush(pThrd->timerList, &timer); } pThrd->pool = createConnPool(4); - transDQCreate(pThrd->loop, &pThrd->delayQueue); + if (pThrd->pool == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } + if ((code = transDQCreate(pThrd->loop, &pThrd->delayQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } - transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + if ((code = transDQCreate(pThrd->loop, &pThrd->timeoutQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } - transDQCreate(pThrd->loop, &pThrd->waitConnQueue); - - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - pThrd->pTransInst = trans; + if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->fqdn2ipCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->failFastCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->batchCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + pThrd->pTransInst = trans; pThrd->quit = false; - return pThrd; + return code; + +_end: + if (pThrd) { + uv_loop_close(pThrd->loop); + taosMemoryFree(pThrd->loop); + taosMemoryFree(pThrd->prepare); + taosThreadMutexDestroy(&pThrd->msgMtx); + transAsyncPoolDestroy(pThrd->asyncPool); + for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { + uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); + taosMemoryFree(timer); + } + taosArrayDestroy(pThrd->timerList); + taosMemoryFree(pThrd->prepare); + taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); + taosHashCleanup(pThrd->batchCache); + + taosMemoryFree(pThrd); + } + return code; } static void destroyThrdObj(SCliThrd* pThrd) { if (pThrd == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3acdef7f5e..8eda130b02 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -278,6 +278,8 @@ int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAs } void transAsyncPoolDestroy(SAsyncPool* pool) { + if (pool == NULL) return; + for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); SAsyncItem* item = async->data; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 82366f52b9..549eb849d5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1469,6 +1469,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, return srv; End: transCloseServer(srv); + terrno = code; return NULL; } From 4a5c2f661cb3ba163d0eee2dc3c7d395a70f0875 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 09:54:22 +0000 Subject: [PATCH 08/20] refactor transport --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 362f8cef81..0b837d8141 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2229,6 +2229,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->pTransInst = trans; pThrd->quit = false; + *ppThrd = pThrd; return code; _end: From 4561a888b6071e35b330197bbf9f7fbc480d25cc Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 11:09:13 +0000 Subject: [PATCH 09/20] refactor transport --- source/common/src/tmsg.c | 1 - source/libs/transport/inc/transComm.h | 14 +-- source/libs/transport/src/transCli.c | 11 ++- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 128 ++++++++++++++++++++------ 5 files changed, 116 insertions(+), 40 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a86b8e29fe..313a7f501a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1783,7 +1783,6 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { } } taosMemoryFree(pReq->pUserIpWhite); - // impl later return; } int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 73c0fb6228..e66941244c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -299,13 +299,13 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); } \ } while (0) -int transInitBuffer(SConnBuffer* buf); -int transClearBuffer(SConnBuffer* buf); -int transDestroyBuffer(SConnBuffer* buf); -int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); -bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); +int32_t transInitBuffer(SConnBuffer* buf); +int32_t transClearBuffer(SConnBuffer* buf); +int32_t transDestroyBuffer(SConnBuffer* buf); +int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); +bool transReadComplete(SConnBuffer* connBuf); +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transSetConnOption(uv_tcp_t* stream, int keepalive); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0b837d8141..10c643f644 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -197,7 +197,7 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later @@ -1628,8 +1628,9 @@ static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, u } return 0; } -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { +static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later + int32_t code = 0; uint32_t addr = taosGetIpv4FromFqdn(fqdn); if (addr != 0xffffffff) { size_t len = strlen(fqdn); @@ -1639,10 +1640,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + code = taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } + } else { + code = TSDB_CODE_RPC_FQDN_ERROR; // TSDB_CODE_RPC_INVALID_FQDN; } - return; + return code; } static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 8eda130b02..2818a767b8 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -102,7 +102,7 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) { sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); return r; } -int transInitBuffer(SConnBuffer* buf) { +int32_t transInitBuffer(SConnBuffer* buf) { buf->buf = taosMemoryCalloc(1, BUFFER_CAP); if (buf->buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 549eb849d5..a5ef2341be 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -164,7 +164,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); -static int reallocConnRef(SSvrConn* conn); +static int32_t reallocConnRef(SSvrConn* conn); static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); @@ -798,7 +798,11 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { } static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - reallocConnRef(pConn); + int32_t code = reallocConnRef(pConn); + if (code != 0) { + destroyConn(pConn, true); + return true; + } tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; @@ -969,23 +973,21 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - // uv_handle_type pending = uv_pipe_pending_type(pipe); - SSvrConn* pConn = createConn(pThrd); + if (pConn == NULL) { + uv_close((uv_handle_t*)q, NULL); + return; + } - pConn->pTransInst = pThrd->pTransInst; - /* init conn timer*/ - // uv_timer_init(pThrd->loop, &pConn->pTimer); - // pConn->pTimer.data = pConn; - - pConn->hostThrd = pThrd; - - // init client handle - pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, pConn->pTcp); - pConn->pTcp->data = pConn; - - // transSetConnOption((uv_tcp_t*)pConn->pTcp); + // pConn->pTransInst = pThrd->pTransInst; + // /* init conn timer*/ + // // uv_timer_init(pThrd->loop, &pConn->pTimer); + // // pConn->pTimer.data = pConn; + // pConn->hostThrd = pThrd; + // // init client handle + // pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + // uv_tcp_init(pThrd->loop, pConn->pTcp); + // pConn->pTcp->data = pConn; if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; @@ -1169,38 +1171,84 @@ void* transWorkerThread(void* arg) { } static FORCE_INLINE SSvrConn* createConn(void* hThrd) { + int32_t code = 0; SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); if (pConn == NULL) { - return NULL; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); - transQueueInit(&pConn->srvMsgs, NULL); + if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } + + if ((code = transInitBuffer(&pConn->readBuf)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; - transInitBuffer(&pConn->readBuf); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + if (exh == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + exh->handle = pConn; exh->pThrd = pThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + } + QUEUE_INIT(&exh->q); - transAcquireExHandle(transGetRefMgt(), exh->refId); + + SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (pSelf != exh) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + } STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); + + pConn->pTransInst = pThrd->pTransInst; + /* init conn timer*/ + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; + pConn->hostThrd = pThrd; + // init client handle + pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + if (pConn->pTcp == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + + code = uv_tcp_init(pThrd->loop, pConn->pTcp); + if (code != 0) { + tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); + } + pConn->pTcp->data = pConn; + return pConn; +_end: + if (pConn) { + transQueueDestroy(&pConn->srvMsgs); + transDestroyBuffer(&pConn->readBuf); + taosMemoryFree(pConn->pTcp); + taosMemoryFree(pConn); + pConn = NULL; + } + tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code)); + return NULL; } static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) { @@ -1221,16 +1269,33 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { conn->regArg.init = 0; } } -static int reallocConnRef(SSvrConn* conn) { - transReleaseExHandle(transGetRefMgt(), conn->refId); - transRemoveExHandle(transGetRefMgt(), conn->refId); +static int32_t reallocConnRef(SSvrConn* conn) { + if (conn->refId > 0) { + transReleaseExHandle(transGetRefMgt(), conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); + } // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + QUEUE_INIT(&exh->q); - transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (pSelf != exh) { + tError("conn %p failed to acquire handle", conn); + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + conn->refId = exh->refId; return 0; @@ -1483,9 +1548,14 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { taosMemoryFree(msg); } void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { + int32_t code = 0; SSvrConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - reallocConnRef(conn); + code = reallocConnRef(conn); + if (code != 0) { + destroyConn(conn, true); + return; + } if (!transQueuePush(&conn->srvMsgs, msg)) { return; } @@ -1794,6 +1864,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { code = TSDB_CODE_OUT_OF_MEMORY; break; } + SUpdateIpWhite* pReq = NULL; code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq); if (code != 0) { @@ -1806,6 +1877,9 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) { code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + tFreeSUpdateIpWhiteReq(pReq); + taosMemoryFree(pReq); + taosMemoryFree(msg); break; } } From 7880800beea977eee6f140e3a02f7630b9e76393 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 11:25:36 +0000 Subject: [PATCH 10/20] refactor transport --- source/libs/transport/src/transCli.c | 5 +++- source/libs/transport/src/transComm.c | 35 ++++++++++++++++++--------- source/libs/transport/src/transSvr.c | 6 ++++- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 10c643f644..dc5e16e671 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -870,7 +870,10 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - transAllocBuffer(pBuf, buf); + int32_t code = transAllocBuffer(pBuf, buf); + if (code < 0) { + cliDestroyConn(conn, true); + } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 2818a767b8..8a64b7d808 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -115,17 +115,20 @@ int32_t transInitBuffer(SConnBuffer* buf) { buf->invalid = 0; return 0; } -int transDestroyBuffer(SConnBuffer* p) { +int32_t transDestroyBuffer(SConnBuffer* p) { taosMemoryFree(p->buf); p->buf = NULL; return 0; } -int transClearBuffer(SConnBuffer* buf) { +int32_t transClearBuffer(SConnBuffer* buf) { SConnBuffer* p = buf; if (p->cap > BUFFER_CAP) { p->cap = BUFFER_CAP; p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP); + if (p->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } p->left = -1; p->len = 0; @@ -134,27 +137,31 @@ int transClearBuffer(SConnBuffer* buf) { return 0; } -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { +int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { static const int HEADSIZE = sizeof(STransMsgHead); - - SConnBuffer* p = connBuf; + int32_t code = 0; + SConnBuffer* p = connBuf; if (p->left != 0 || p->total <= 0) { - return -1; + return TSDB_CODE_INVALID_MSG; } int total = p->total; if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); + if (*buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(*buf, p->buf, total); - if (transResetBuffer(connBuf, resetBuf) < 0) { - return -1; + if ((code = transResetBuffer(connBuf, resetBuf)) < 0) { + return code; } } else { total = -1; + return TSDB_CODE_INVALID_MSG; } return total; } -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { +int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { SConnBuffer* p = connBuf; if (p->total < p->len) { int left = p->len - p->total; @@ -170,15 +177,18 @@ int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { if (resetBuf) { p->cap = BUFFER_CAP; p->buf = taosMemoryRealloc(p->buf, p->cap); + if (p->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } } } else { ASSERTS(0, "invalid read from sock buf"); - return -1; + return TSDB_CODE_INVALID_MSG; } return 0; } -int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { +int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { /* * formate of data buffer: * |<--------------------------data from socket------------------------------->| @@ -195,6 +205,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { } else { p->cap = p->left + p->len; p->buf = taosMemoryRealloc(p->buf, p->cap); + if (p->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } uvBuf->base = p->buf + p->len; uvBuf->len = p->left; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a5ef2341be..abffdac7a4 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -202,7 +202,11 @@ static int32_t addHandleToAcceptloop(void* arg); void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSvrConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - transAllocBuffer(pBuf, buf); + int32_t code = transAllocBuffer(pBuf, buf); + if (code < 0) { + tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); + destroyConn(conn, true); + } } // refers specifically to query or insert timeout From 0b4031ed6a9e66872641783b265a4c025c799466 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 11:49:31 +0000 Subject: [PATCH 11/20] refactor tq backend --- source/libs/transport/src/transCli.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dc5e16e671..20355cc10f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1601,7 +1601,9 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { memset(pResp, 0, sizeof(STransMsg)); - pResp->code = TSDB_CODE_RPC_BROKEN_LINK; + if (pResp->code == 0) { + pResp->code = TSDB_CODE_RPC_BROKEN_LINK; + } pResp->msgType = pMsg->msg.msgType + 1; pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL; pResp->info.traceId = pMsg->msg.info.traceId; @@ -1697,7 +1699,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server - STransMsg resp; + STransMsg resp = {0}; cliBuildExceptResp(pMsg, &resp); // refactorr later resp.info.cliVer = pTransInst->compatibilityVer; @@ -1719,6 +1721,18 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliSend(conn); } else { code = cliCreateConn(pThrd, &conn); + if (code != 0) { + tError("%s failed to create conn, reason:%s", pTransInst->label, tstrerror(code)); + STransMsg resp = {.code = code}; + cliBuildExceptResp(pMsg, &resp); + + resp.info.cliVer = pTransInst->compatibilityVer; + if (pMsg->type != Release) { + pTransInst->cfp(pTransInst->parent, &resp, NULL); + } + destroyCmsg(pMsg); + return; + } int64_t refId = (int64_t)pMsg->msg.info.handle; if (refId != 0) specifyConnRef(conn, true, refId); From 6eb9774edb340c67968688424bd6fbf6eb23a364 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 11:52:20 +0000 Subject: [PATCH 12/20] refactor transport --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 20355cc10f..5f1c19a95b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1599,7 +1599,7 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; - memset(pResp, 0, sizeof(STransMsg)); + // memset(pResp, 0, sizeof(STransMsg)); if (pResp->code == 0) { pResp->code = TSDB_CODE_RPC_BROKEN_LINK; From bd2e12968668fe638ee2ef7dacd4d07d53558598 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 01:00:20 +0000 Subject: [PATCH 13/20] refactor transport --- source/libs/transport/src/transCli.c | 58 ++++++++++++++++------------ 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5f1c19a95b..2d537132c4 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -114,7 +114,7 @@ typedef struct SCliThrd { SDelayQueue* timeoutQueue; SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout - void* pTransInst; // + STrans* pTransInst; // int connCount; void (*destroyAhandleFp)(void* ahandle); @@ -2052,12 +2052,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { } static void* cliWorkThread(void* arg) { + char threadName[TSDB_LABEL_LEN] = {0}; + SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - char threadName[TSDB_LABEL_LEN] = {0}; - STrans* pInst = pThrd->pTransInst; - strtolower(threadName, pInst->label); + strtolower(threadName, pThrd->pTransInst->label); setThreadName(threadName); uv_run(pThrd->loop, UV_RUN_DEFAULT); @@ -2121,11 +2121,9 @@ static FORCE_INLINE void destroyCmsg(void* arg) { taosMemoryFree(pMsg); } static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { - SCliMsg* pMsg = arg; - if (pMsg == NULL) { - return; - } + if (arg == NULL) return; + SCliMsg* pMsg = arg; SCliThrd* pThrd = param; if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); @@ -2168,8 +2166,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { code = uv_loop_init(pThrd->loop); if (code != 0) { tError("failed to init uv_loop, reason:%s", uv_err_name(code)); - code = TSDB_CODE_THIRDPARTY_ERROR; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); } int32_t nSync = pTransInst->supportBatch ? 4 : 8; @@ -2188,8 +2185,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { code = uv_prepare_init(pThrd->loop, pThrd->prepare); if (code != 0) { tError("failed to create prepre since:%s", uv_err_name(code)); - code = TSDB_CODE_THIRDPARTY_ERROR; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); } pThrd->prepare->data = pThrd; @@ -2197,14 +2193,13 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); if (pThrd->timerList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); if (timer == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } uv_timer_init(pThrd->loop, timer); taosArrayPush(pThrd->timerList, &timer); @@ -2213,7 +2208,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->pool = createConnPool(4); if (pThrd->pool == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } if ((code = transDQCreate(pThrd->loop, &pThrd->delayQueue)) != 0) { TAOS_CHECK_GOTO(code, NULL, _end); @@ -2230,19 +2225,16 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->fqdn2ipCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->failFastCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->batchCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); @@ -2324,12 +2316,23 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) { taosMemoryFree(ctx); } -void cliSendQuit(SCliThrd* thrd) { +int32_t cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully + int32_t code = 0; SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (msg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + msg->type = Quit; - transAsyncSend(thrd->asyncPool, &msg->q); + if ((code = transAsyncSend(thrd->asyncPool, &msg->q)) != 0) { + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + taosMemoryFree(msg); + return code; + } + atomic_store_8(&thrd->asyncPool->stop, 1); + return 0; } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { @@ -2653,9 +2656,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } void transCloseClient(void* arg) { + int32_t code = 0; SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - cliSendQuit(cli->pThreadObj[i]); + code = cliSendQuit(cli->pThreadObj[i]); + if (code != 0) { + tError("failed to send quit to thread:%d, reason:%s", i, tstrerror(code)); + } + destroyThrdObj(cli->pThreadObj[i]); } taosMemoryFree(cli->pThreadObj); From 2a66965863d48db332ad90b2ee151a13495e81be Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 02:20:59 +0000 Subject: [PATCH 14/20] refactor transport --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2d537132c4..327598d181 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2888,7 +2888,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); if (cliMsg == NULL) { - sem_destroy(sem); + tsem_destroy(sem); taosMemoryFree(sem); taosMemoryFree(pCtx); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index abffdac7a4..7964a81afb 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1109,11 +1109,7 @@ static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { return code; } #if defined(WINDOWS) || defined(DARWIN) - code = uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); - if (code != 0) { - tError("failed to start connect pipe:%s", uv_err_name(code)); - return TSDB_CODE_THIRDPARTY_ERROR; - } + uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); #else code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); From 54476daaba7a92e5f08a2f265ed26db42779dd35 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 02:48:13 +0000 Subject: [PATCH 15/20] refactor transport --- source/libs/transport/src/transCli.c | 3 ++- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 327598d181..f27ab807a7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -872,7 +872,8 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_ SConnBuffer* pBuf = &conn->readBuf; int32_t code = transAllocBuffer(pBuf, buf); if (code < 0) { - cliDestroyConn(conn, true); + tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); + // cliDestroyConn(conn, true); } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 7964a81afb..2f6c687df2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -205,7 +205,7 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b int32_t code = transAllocBuffer(pBuf, buf); if (code < 0) { tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); - destroyConn(conn, true); + // destroyConn(conn, true); } } From c4abac2cfa96f1ee285d7848b2ec551acd604c99 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 05:15:16 +0000 Subject: [PATCH 16/20] refactor transport --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f27ab807a7..1e5207c92e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2875,7 +2875,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); if (pCtx == NULL) { - sem_destroy(sem); + tsem_destroy(sem); taosMemoryFree(sem); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); } From cd030c186ae6c1622a43710bb38989e65a1b69a3 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 25 Jul 2024 00:42:01 +0000 Subject: [PATCH 17/20] refactor transport --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1e5207c92e..d5739cde58 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -971,6 +971,7 @@ _failed: taosMemoryFree(conn->stream); transReqQueueClear(&conn->wreqQueue); transDestroyBuffer(&conn->readBuf); + transQueueDestroy(&conn->cliMsgs); } taosMemoryFree(conn); return code; From cc9e8cf14b7bd406b861d57e040c736bc8828c57 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 25 Jul 2024 07:01:50 +0000 Subject: [PATCH 18/20] refactor transport --- source/libs/transport/src/transCli.c | 52 +++++++++++++++++++++++---- source/libs/transport/src/transComm.c | 4 +++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d5739cde58..8f83b1bde2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -713,10 +713,22 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } arg->param1 = *pMsg; arg->param2 = pThrd; - (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + if (task == NULL) { + taosMemoryFree(arg); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } + (*pMsg)->ctx->task = task; tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; @@ -724,9 +736,23 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { // send msg in delay queue if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } arg->param1 = *pMsg; arg->param2 = pThrd; - (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + if (task == NULL) { + taosMemoryFree(arg); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } + + (*pMsg)->ctx->task = task; tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); @@ -766,7 +792,11 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } - allocConnRef(conn, true); + int32_t code = allocConnRef(conn, true); + if (code != 0) { + cliDestroyConn(conn, true); + return; + } SCliThrd* thrd = conn->hostThrd; if (conn->timer != NULL) { @@ -811,6 +841,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + if (arg == NULL) return; arg->param1 = conn; arg->param2 = thrd; @@ -826,9 +857,12 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { } SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + exh->refId = transAddExHandle(transGetRefMgt(), exh); - SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); - if (self != exh) { + if (exh->refId < 0) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; } @@ -838,8 +872,14 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->handle = conn; exh->pThrd = conn->hostThrd; + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (self != exh) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + conn->refId = exh->refId; - if (conn->refId == -1) { + if (conn->refId < 0) { taosMemoryFree(exh); } return 0; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 8a64b7d808..aa6b377423 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -637,6 +637,10 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) { SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); + if (task == NULL) { + return NULL; + } + task->func = func; task->arg = arg; task->execTime = now + timeoutMs; From eb4a8f1988cc9859061d651ca98a07e787595ba0 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 25 Jul 2024 07:40:58 +0000 Subject: [PATCH 19/20] refactor transport --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8f83b1bde2..69c359621c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -743,8 +743,8 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } arg->param1 = *pMsg; arg->param2 = pThrd; - SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); if (task == NULL) { taosMemoryFree(arg); doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); From caea4f84b7712f01a3682385bb9c6a98766a57e5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 Jul 2024 08:51:23 +0800 Subject: [PATCH 20/20] refactor transport --- source/libs/transport/src/transCli.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2d3827ee56..b51964083f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -3166,6 +3166,5 @@ int64_t transAllocHandle() { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); - return exh->refId; }