From cdaa4fa47e295b50b6a78d1edc953b8dc6357b36 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 3 Jul 2024 00:57:44 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 115 ++++++++++++++++++++------- source/util/src/theap.c | 46 ++++------- 2 files changed, 102 insertions(+), 59 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d625b30752..adb6223a0f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -247,7 +247,7 @@ typedef struct { } SHeap; int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); -int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); +int transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); void transHeapDestroy(SHeap* heap); int transHeapGet(SHeap* heap, SCliConn** p); int transHeapInsert(SHeap* heap, SCliConn* p); @@ -1702,12 +1702,11 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) uint32_t* v = taosHashGet(cache, fqdn, len); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); - if (addr == 0xffffffff) { + if (addr == (uint32_t)-1) { terrno = TSDB_CODE_RPC_FQDN_ERROR; tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr()); return addr; } - taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); } else { addr = *v; @@ -1717,7 +1716,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later uint32_t addr = taosGetIpv4FromFqdn(fqdn); - if (addr != 0xffffffff) { + if (addr != (uint32_t)-1) { size_t len = strlen(fqdn); uint32_t* v = taosHashGet(cache, fqdn, len); if (addr != *v) { @@ -1725,7 +1724,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } } return; @@ -1759,29 +1758,50 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } -static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { +static SHeap* getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key) { + int ret = 0; size_t klen = strlen(key); + SHeap* p = taosHashGet(pConnHeapCache, key, klen); if (p == NULL) { SHeap heap = {0}; - transHeapCreate(&heap, compareHeapNode); - taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap)); + ret = transHeapInit(&heap, compareHeapNode); + if (ret != 0) { + tError("failed to init heap cache for key:%s, reason: %s", key, tstrerror(terrno)); + terrno = 0; + return NULL; + } + + ret = taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap)); + if (ret != 0) { + transHeapDestroy(&heap); + terrno = TSDB_CODE_OUT_OF_MEMORY; + tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(terrno)); + terrno = 0; + } + p = taosHashGet(pConnHeapCache, key, klen); + return p; + } + return p; +} + +static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { + int ret = 0; + SCliConn* pConn = NULL; + SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCache, key); + if (p == NULL) { return NULL; } - SCliConn* pConn = NULL; - transHeapGet(p, &pConn); + ret = transHeapGet(p, &pConn); + return pConn; } -static void addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) { - size_t klen = strlen(key); - SHeap* p = taosHashGet(pConnHeapCacahe, key, klen); +static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) { + SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCacahe, key); if (p == NULL) { - SHeap heap = {0}; - transHeapCreate(&heap, compareHeapNode); - taosHashPut(pConnHeapCacahe, key, klen, &heap, sizeof(heap)); - p = taosHashGet(pConnHeapCacahe, key, klen); + return 0; } - transHeapInsert(p, pConn); + return transHeapInsert(p, pConn); } static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* pConn) { size_t klen = strlen(key); @@ -1797,9 +1817,10 @@ static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* } } void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { - STraceId* trace = &pMsg->msg.info.traceId; + int32_t code = 0; - STrans* pTransInst = pThrd->pTransInst; + STraceId* trace = &pMsg->msg.info.traceId; + STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { @@ -1821,7 +1842,12 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { pConn = cliCreateConn(pThrd); pConn->dstAddr = taosStrdup(addr); - addConnToHeapCache(pThrd->connHeapCache, addr, pConn); + code = addConnToHeapCache(pThrd->connHeapCache, addr, pConn); + if (code != 0) { + // do nothing + } else { + // do nothing + } return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); } @@ -2858,7 +2884,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); - if (pTransInst == NULL) { + if (pTransInst == NULL || pTransRsp == NULL) { transFreeMsg(pReq->pCont); taosMemoryFree(pTransRsp); return TSDB_CODE_RPC_BROKEN_LINK; @@ -2873,6 +2899,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); + if (sem == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tsem_init(sem, 0, 0); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -2915,9 +2945,25 @@ _RETURN: } int64_t transCreateSyncMsg(STransMsg* pTransMsg) { tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + if (sem == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int ret = tsem_init(sem, 0, 0); + if (ret != 0) { + taosMemoryFree(sem); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); + if (pSyncMsg == NULL) { + tsem_destroy(sem); + taosMemoryFree(sem); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } taosInitRWLatch(&pSyncMsg->latch); pSyncMsg->inited = 0; @@ -2929,13 +2975,16 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) { } int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); return TSDB_CODE_RPC_BROKEN_LINK; } + STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransMsg == NULL) { + transFreeMsg(pReq->pCont); + return TSDB_CODE_OUT_OF_MEMORY; + } SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { @@ -2953,6 +3002,13 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg); + if (pCtx->syncMsgRef < 0) { + transFreeMsg(pReq->pCont); + taosMemoryFree(pTransMsg); + taosMemoryFree(pCtx); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return terrno; + } int64_t ref = pCtx->syncMsgRef; STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref); @@ -3048,8 +3104,12 @@ int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { } return 1; } -int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) { +int transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) { heap->heap = heapCreate(cmpFunc); + if (heap->heap == NULL) { + return -1; + } + heap->cmpFunc = cmpFunc; return 0; } @@ -3063,7 +3123,6 @@ int transHeapGet(SHeap* heap, SCliConn** p) { *p = NULL; return -1; } - // HeapNode* minNode = headMin(heap->heap); HeapNode* minNode = heapMin(heap->heap); if (minNode == NULL) { *p = NULL; diff --git a/source/util/src/theap.c b/source/util/src/theap.c index 315ddf9367..ed117efd62 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -21,6 +21,7 @@ size_t heapSize(Heap* heap) { return heap->nelts; } Heap* heapCreate(HeapCompareFn fn) { Heap* heap = taosMemoryCalloc(1, sizeof(Heap)); if (heap == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -188,7 +189,6 @@ void heapRemove(Heap* heap, HeapNode* node) { void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); } - struct PriorityQueue { SArray* container; pq_comp_fn fn; @@ -204,9 +204,7 @@ PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param) return pq; } -void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) { - pq->fn = fn; -} +void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) { pq->fn = fn; } void destroyPriorityQueue(PriorityQueue* pq) { if (pq->deleteFn) @@ -218,15 +216,15 @@ void destroyPriorityQueue(PriorityQueue* pq) { static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ } static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ } -static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */} -static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) { - void * tmp = a->data; - a->data = b->data; - b->data = tmp; +static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */ } +static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) { + void* tmp = a->data; + a->data = b->data; + b->data = tmp; } #define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i))) -#define pqContainerSize(pq) (taosArrayGetSize((pq)->container)) +#define pqContainerSize(pq) (taosArrayGetSize((pq)->container)) size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); } @@ -288,9 +286,7 @@ static void pqRemove(PriorityQueue* pq, size_t i) { pqUpdate(pq, i); } -PriorityQueueNode* taosPQTop(PriorityQueue* pq) { - return pqContainerGetEle(pq, 0); -} +PriorityQueueNode* taosPQTop(PriorityQueue* pq) { return pqContainerGetEle(pq, 0); } PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) { taosArrayPush(pq->container, node); @@ -316,9 +312,7 @@ BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete delete return q; } -void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { - taosPQSetFn(q->queue, fn); -} +void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { taosPQSetFn(q->queue, fn); } void destroyBoundedQueue(BoundedQueue* q) { if (!q) return; @@ -343,22 +337,12 @@ PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { } } -PriorityQueueNode* taosBQTop(BoundedQueue* q) { - return taosPQTop(q->queue); -} +PriorityQueueNode* taosBQTop(BoundedQueue* q) { return taosPQTop(q->queue); } -void taosBQBuildHeap(BoundedQueue *q) { - pqBuildHeap(q->queue); -} +void taosBQBuildHeap(BoundedQueue* q) { pqBuildHeap(q->queue); } -size_t taosBQMaxSize(BoundedQueue* q) { - return q->maxSize; -} +size_t taosBQMaxSize(BoundedQueue* q) { return q->maxSize; } -size_t taosBQSize(BoundedQueue* q) { - return taosPQSize(q->queue); -} +size_t taosBQSize(BoundedQueue* q) { return taosPQSize(q->queue); } -void taosBQPop(BoundedQueue* q) { - taosPQPop(q->queue); -} +void taosBQPop(BoundedQueue* q) { taosPQPop(q->queue); }