From 4561a888b6071e35b330197bbf9f7fbc480d25cc Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 11:09:13 +0000 Subject: [PATCH] refactor transport --- source/common/src/tmsg.c | 1 - source/libs/transport/inc/transComm.h | 14 +-- source/libs/transport/src/transCli.c | 11 ++- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 128 ++++++++++++++++++++------ 5 files changed, 116 insertions(+), 40 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a86b8e29fe..313a7f501a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1783,7 +1783,6 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { } } taosMemoryFree(pReq->pUserIpWhite); - // impl later return; } int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 73c0fb6228..e66941244c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -299,13 +299,13 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); } \ } while (0) -int transInitBuffer(SConnBuffer* buf); -int transClearBuffer(SConnBuffer* buf); -int transDestroyBuffer(SConnBuffer* buf); -int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); -bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); +int32_t transInitBuffer(SConnBuffer* buf); +int32_t transClearBuffer(SConnBuffer* buf); +int32_t transDestroyBuffer(SConnBuffer* buf); +int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); +bool transReadComplete(SConnBuffer* connBuf); +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transSetConnOption(uv_tcp_t* stream, int keepalive); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0b837d8141..10c643f644 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -197,7 +197,7 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later @@ -1628,8 +1628,9 @@ static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, u } return 0; } -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { +static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later + int32_t code = 0; uint32_t addr = taosGetIpv4FromFqdn(fqdn); if (addr != 0xffffffff) { size_t len = strlen(fqdn); @@ -1639,10 +1640,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + code = taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } + } else { + code = TSDB_CODE_RPC_FQDN_ERROR; // TSDB_CODE_RPC_INVALID_FQDN; } - return; + return code; } static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 8eda130b02..2818a767b8 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -102,7 +102,7 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) { sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); return r; } -int transInitBuffer(SConnBuffer* buf) { +int32_t transInitBuffer(SConnBuffer* buf) { buf->buf = taosMemoryCalloc(1, BUFFER_CAP); if (buf->buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 549eb849d5..a5ef2341be 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -164,7 +164,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); -static int reallocConnRef(SSvrConn* conn); +static int32_t reallocConnRef(SSvrConn* conn); static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); @@ -798,7 +798,11 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { } static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - reallocConnRef(pConn); + int32_t code = reallocConnRef(pConn); + if (code != 0) { + destroyConn(pConn, true); + return true; + } tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; @@ -969,23 +973,21 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - // uv_handle_type pending = uv_pipe_pending_type(pipe); - SSvrConn* pConn = createConn(pThrd); + if (pConn == NULL) { + uv_close((uv_handle_t*)q, NULL); + return; + } - pConn->pTransInst = pThrd->pTransInst; - /* init conn timer*/ - // uv_timer_init(pThrd->loop, &pConn->pTimer); - // pConn->pTimer.data = pConn; - - pConn->hostThrd = pThrd; - - // init client handle - pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, pConn->pTcp); - pConn->pTcp->data = pConn; - - // transSetConnOption((uv_tcp_t*)pConn->pTcp); + // pConn->pTransInst = pThrd->pTransInst; + // /* init conn timer*/ + // // uv_timer_init(pThrd->loop, &pConn->pTimer); + // // pConn->pTimer.data = pConn; + // pConn->hostThrd = pThrd; + // // init client handle + // pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + // uv_tcp_init(pThrd->loop, pConn->pTcp); + // pConn->pTcp->data = pConn; if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; @@ -1169,38 +1171,84 @@ void* transWorkerThread(void* arg) { } static FORCE_INLINE SSvrConn* createConn(void* hThrd) { + int32_t code = 0; SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); if (pConn == NULL) { - return NULL; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); - transQueueInit(&pConn->srvMsgs, NULL); + if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } + + if ((code = transInitBuffer(&pConn->readBuf)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; - transInitBuffer(&pConn->readBuf); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + if (exh == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + exh->handle = pConn; exh->pThrd = pThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + } + QUEUE_INIT(&exh->q); - transAcquireExHandle(transGetRefMgt(), exh->refId); + + SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (pSelf != exh) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + } STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); + + pConn->pTransInst = pThrd->pTransInst; + /* init conn timer*/ + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; + pConn->hostThrd = pThrd; + // init client handle + pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + if (pConn->pTcp == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + + code = uv_tcp_init(pThrd->loop, pConn->pTcp); + if (code != 0) { + tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); + } + pConn->pTcp->data = pConn; + return pConn; +_end: + if (pConn) { + transQueueDestroy(&pConn->srvMsgs); + transDestroyBuffer(&pConn->readBuf); + taosMemoryFree(pConn->pTcp); + taosMemoryFree(pConn); + pConn = NULL; + } + tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code)); + return NULL; } static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) { @@ -1221,16 +1269,33 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { conn->regArg.init = 0; } } -static int reallocConnRef(SSvrConn* conn) { - transReleaseExHandle(transGetRefMgt(), conn->refId); - transRemoveExHandle(transGetRefMgt(), conn->refId); +static int32_t reallocConnRef(SSvrConn* conn) { + if (conn->refId > 0) { + transReleaseExHandle(transGetRefMgt(), conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); + } // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + QUEUE_INIT(&exh->q); - transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (pSelf != exh) { + tError("conn %p failed to acquire handle", conn); + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + conn->refId = exh->refId; return 0; @@ -1483,9 +1548,14 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { taosMemoryFree(msg); } void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { + int32_t code = 0; SSvrConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - reallocConnRef(conn); + code = reallocConnRef(conn); + if (code != 0) { + destroyConn(conn, true); + return; + } if (!transQueuePush(&conn->srvMsgs, msg)) { return; } @@ -1794,6 +1864,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { code = TSDB_CODE_OUT_OF_MEMORY; break; } + SUpdateIpWhite* pReq = NULL; code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq); if (code != 0) { @@ -1806,6 +1877,9 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) { code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + tFreeSUpdateIpWhiteReq(pReq); + taosMemoryFree(pReq); + taosMemoryFree(msg); break; } }