From 32c0c40c4d49c5cc4673e533fb3f52551390cce4 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Jul 2024 10:12:30 +0000 Subject: [PATCH] refactor transport --- include/common/tmsg.h | 8 +++---- source/common/src/tmsg.c | 36 ++++++++++++++++++++++++---- source/libs/transport/src/transSvr.c | 18 ++++++++------ 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5dea8a44e..9ab4fffb79 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1051,10 +1051,10 @@ typedef struct { SUpdateUserIpWhite* pUserIpWhite; } SUpdateIpWhite; -int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); -SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq, SUpdateIpWhite** pUpdate); typedef struct { int64_t ipWhiteVer; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 98e6530be0..a86b8e29fe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1774,20 +1774,35 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR return 0; } void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { - for (int i = 0; i < pReq->numOfUser; i++) { - SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; - taosMemoryFree(pUserWhite->pIpRanges); + if (pReq == NULL) return; + + if (pReq->pUserIpWhite) { + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; + taosMemoryFree(pUserWhite->pIpRanges); + } } taosMemoryFree(pReq->pUserIpWhite); // impl later return; } -SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { +int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) { + int32_t code = 0; + if (pReq == NULL) { + return 0; + } SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); + if (pClone == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } pClone->numOfUser = pReq->numOfUser; pClone->ver = pReq->ver; pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); + if (pClone->pUserIpWhite == NULL) { + taosMemoryFree(pClone); + return TSDB_CODE_OUT_OF_MEMORY; + } for (int i = 0; i < pReq->numOfUser; i++) { SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i]; @@ -1799,9 +1814,20 @@ SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { int32_t sz = pOld->numOfRange * sizeof(SIpV4Range); pNew->pIpRanges = taosMemoryCalloc(1, sz); + if (pNew->pIpRanges == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } memcpy(pNew->pIpRanges, pOld->pIpRanges, sz); } - return pClone; +_return: + if (code < 0) { + tFreeSUpdateIpWhiteReq(pClone); + taosMemoryFree(pClone); + } else { + *pUpdateMsg = pClone; + } + return code; } int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { SEncoder encoder = {0}; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index df8928bd02..82366f52b9 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -614,6 +614,10 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); + if (pMsg->pCont == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); @@ -628,7 +632,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { transQueuePop(&pConn->srvMsgs); destroySmsg(smsg); - return -1; + return TSDB_CODE_INVALID_MSG; } if (pConn->status == ConnNormal) { @@ -1528,6 +1532,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { tDebug("ip-white-list disable on trans"); thrd->enableIpWhiteList = 0; taosMemoryFree(msg); + return; } int32_t code = 0; for (int i = 0; i < req->numOfUser; i++) { @@ -1557,6 +1562,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { } tFreeSUpdateIpWhiteReq(req); taosMemoryFree(req); + taosMemoryFree(msg); } void destroyWorkThrd(SWorkThrd* pThrd) { @@ -1787,13 +1793,11 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { code = TSDB_CODE_OUT_OF_MEMORY; break; } - SUpdateIpWhite* pReq = NULL; - if (arg != NULL) { - if ((pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg)) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } + code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq); + if (code != 0) { + taosMemoryFree(msg); + break; } msg->type = Update;