From 19a03b2eb27cf2049b0df9a2c9ae44cadf3204bb Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 08:55:08 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/trans.c | 24 ++--- source/libs/transport/src/transCli.c | 136 ++++++++++++++++++++------ source/libs/transport/src/transComm.c | 2 + source/libs/transport/src/transSvr.c | 1 + 4 files changed, 121 insertions(+), 42 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 58b1d76d23..266b6cb3a7 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -28,21 +28,19 @@ int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transRelease static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { *ip = taosGetIpv4FromFqdn(localFqdn); if (*ip == 0xFFFFFFFF) { - terrno = TSDB_CODE_RPC_FQDN_ERROR; - return -1; + return TSDB_CODE_RPC_FQDN_ERROR; } return 0; } void* rpcOpen(const SRpcInit* pInit) { int32_t code = rpcInit(); if (code != 0) { - return NULL; + TAOS_CHECK_GOTO(code, NULL, _end); } SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { - // return TSDB_CODE_OUT_OF_MEMORY; - return NULL; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } if (pInit->label) { int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label); @@ -88,10 +86,9 @@ void* rpcOpen(const SRpcInit* pInit) { uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { - if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) { - tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, terrstr()); - taosMemoryFree(pRpc); - return NULL; + if ((code = transValidLocalFqdn(pInit->localFqdn, &ip)) != 0) { + tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); } } @@ -109,14 +106,19 @@ void* rpcOpen(const SRpcInit* pInit) { (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); if (pRpc->tcphandle == NULL) { - taosMemoryFree(pRpc); - return NULL; + tError("failed to init rpc handle"); + TAOS_CHECK_GOTO(terrno, NULL, _end); } int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); transAcquireExHandle(transGetInstMgt(), refId); pRpc->refId = refId; return (void*)refId; +_end: + taosMemoryFree(pRpc); + terrno = code; + + return NULL; } void rpcClose(void* arg) { tInfo("start to close rpc"); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 01013d59c0..362f8cef81 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -226,10 +226,9 @@ static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); // thread obj -static SCliThrd* createThrdObj(void* trans); -static void destroyThrdObj(SCliThrd* pThrd); - -static void cliWalkCb(uv_handle_t* handle, void* arg); +static int32_t createThrdObj(void* trans, SCliThrd** pThrd); +static void destroyThrdObj(SCliThrd* pThrd); +static void cliWalkCb(uv_handle_t* handle, void* arg); #define CLI_RELEASE_UV(loop) \ do { \ @@ -2048,22 +2047,32 @@ static void* cliWorkThread(void* arg) { } void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + int32_t code = 0; SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj)); + if (cli == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } STrans* pTransInst = shandle; memcpy(cli->label, label, TSDB_LABEL_LEN); cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*)); + if (cli->pThreadObj == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrd* pThrd = createThrdObj(shandle); - if (pThrd == NULL) { + SCliThrd* pThrd = NULL; + code = createThrdObj(shandle, &pThrd); + if (code != 0) { goto _err; } int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err != 0) { - goto _err; + code = TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_GOTO(code, NULL, _err); } else { tDebug("success to create tranport-cli thread:%d", i); } @@ -2072,8 +2081,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, return cli; _err: - taosMemoryFree(cli->pThreadObj); - taosMemoryFree(cli); + if (cli) { + taosMemoryFree(cli->pThreadObj); + taosMemoryFree(cli); + } + terrno = code; return NULL; } @@ -2116,67 +2128,129 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { taosMemoryFree(pMsg); } -static SCliThrd* createThrdObj(void* trans) { +static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { int32_t code = 0; STrans* pTransInst = trans; SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - int err = uv_loop_init(pThrd->loop); - if (err != 0) { - tError("failed to init uv_loop, reason:%s", uv_err_name(err)); - taosMemoryFree(pThrd->loop); - taosThreadMutexDestroy(&pThrd->msgMtx); - taosMemoryFree(pThrd); - return NULL; + if (pThrd->loop == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } + + code = uv_loop_init(pThrd->loop); + if (code != 0) { + tError("failed to init uv_loop, reason:%s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _end); + } + int32_t nSync = pTransInst->supportBatch ? 4 : 8; code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool); if (code != 0) { tError("failed to init async pool since:%s", tstrerror(code)); - uv_loop_close(pThrd->loop); - taosMemoryFree(pThrd->loop); - taosThreadMutexDestroy(&pThrd->msgMtx); - taosMemoryFree(pThrd); - return NULL; + TAOS_CHECK_GOTO(code, NULL, _end); } pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - uv_prepare_init(pThrd->loop, pThrd->prepare); + if (pThrd->prepare == NULL) { + tError("failed to create prepre since:%s", tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); + } + + code = uv_prepare_init(pThrd->loop, pThrd->prepare); + if (code != 0) { + tError("failed to create prepre since:%s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->prepare->data = pThrd; - // uv_prepare_start(pThrd->prepare, cliPrepareCb); int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); + if (pThrd->timerList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } + for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } uv_timer_init(pThrd->loop, timer); taosArrayPush(pThrd->timerList, &timer); } pThrd->pool = createConnPool(4); - transDQCreate(pThrd->loop, &pThrd->delayQueue); + if (pThrd->pool == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } + if ((code = transDQCreate(pThrd->loop, &pThrd->delayQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } - transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + if ((code = transDQCreate(pThrd->loop, &pThrd->timeoutQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } - transDQCreate(pThrd->loop, &pThrd->waitConnQueue); - - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - pThrd->pTransInst = trans; + if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->fqdn2ipCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->failFastCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->batchCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _end); + } + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + pThrd->pTransInst = trans; pThrd->quit = false; - return pThrd; + return code; + +_end: + if (pThrd) { + uv_loop_close(pThrd->loop); + taosMemoryFree(pThrd->loop); + taosMemoryFree(pThrd->prepare); + taosThreadMutexDestroy(&pThrd->msgMtx); + transAsyncPoolDestroy(pThrd->asyncPool); + for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { + uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); + taosMemoryFree(timer); + } + taosArrayDestroy(pThrd->timerList); + taosMemoryFree(pThrd->prepare); + taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); + taosHashCleanup(pThrd->batchCache); + + taosMemoryFree(pThrd); + } + return code; } static void destroyThrdObj(SCliThrd* pThrd) { if (pThrd == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3acdef7f5e..8eda130b02 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -278,6 +278,8 @@ int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAs } void transAsyncPoolDestroy(SAsyncPool* pool) { + if (pool == NULL) return; + for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); SAsyncItem* item = async->data; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 82366f52b9..549eb849d5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1469,6 +1469,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, return srv; End: transCloseServer(srv); + terrno = code; return NULL; }