refactor transport

This commit is contained in:
Yihao Deng 2024-07-03 00:57:44 +00:00
parent 1ae87fe097
commit cdaa4fa47e
2 changed files with 102 additions and 59 deletions

View File

@ -247,7 +247,7 @@ typedef struct {
} SHeap;
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
int transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
void transHeapDestroy(SHeap* heap);
int transHeapGet(SHeap* heap, SCliConn** p);
int transHeapInsert(SHeap* heap, SCliConn* p);
@ -1702,12 +1702,11 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
uint32_t* v = taosHashGet(cache, fqdn, len);
if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) {
if (addr == (uint32_t)-1) {
terrno = TSDB_CODE_RPC_FQDN_ERROR;
tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr());
return addr;
}
taosHashPut(cache, fqdn, len, &addr, sizeof(addr));
} else {
addr = *v;
@ -1717,7 +1716,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
// impl later
uint32_t addr = taosGetIpv4FromFqdn(fqdn);
if (addr != 0xffffffff) {
if (addr != (uint32_t)-1) {
size_t len = strlen(fqdn);
uint32_t* v = taosHashGet(cache, fqdn, len);
if (addr != *v) {
@ -1725,7 +1724,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
tinet_ntoa(old, *v);
tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
}
}
return;
@ -1759,29 +1758,50 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg);
}
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
static SHeap* getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key) {
int ret = 0;
size_t klen = strlen(key);
SHeap* p = taosHashGet(pConnHeapCache, key, klen);
if (p == NULL) {
SHeap heap = {0};
transHeapCreate(&heap, compareHeapNode);
taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap));
ret = transHeapInit(&heap, compareHeapNode);
if (ret != 0) {
tError("failed to init heap cache for key:%s, reason: %s", key, tstrerror(terrno));
terrno = 0;
return NULL;
}
ret = taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap));
if (ret != 0) {
transHeapDestroy(&heap);
terrno = TSDB_CODE_OUT_OF_MEMORY;
tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(terrno));
terrno = 0;
}
p = taosHashGet(pConnHeapCache, key, klen);
return p;
}
return p;
}
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
int ret = 0;
SCliConn* pConn = NULL;
SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCache, key);
if (p == NULL) {
return NULL;
}
SCliConn* pConn = NULL;
transHeapGet(p, &pConn);
ret = transHeapGet(p, &pConn);
return pConn;
}
static void addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) {
size_t klen = strlen(key);
SHeap* p = taosHashGet(pConnHeapCacahe, key, klen);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) {
SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCacahe, key);
if (p == NULL) {
SHeap heap = {0};
transHeapCreate(&heap, compareHeapNode);
taosHashPut(pConnHeapCacahe, key, klen, &heap, sizeof(heap));
p = taosHashGet(pConnHeapCacahe, key, klen);
return 0;
}
transHeapInsert(p, pConn);
return transHeapInsert(p, pConn);
}
static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* pConn) {
size_t klen = strlen(key);
@ -1797,9 +1817,10 @@ static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn*
}
}
void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId;
int32_t code = 0;
STrans* pTransInst = pThrd->pTransInst;
STraceId* trace = &pMsg->msg.info.traceId;
STrans* pTransInst = pThrd->pTransInst;
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
@ -1821,7 +1842,12 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
pConn = cliCreateConn(pThrd);
pConn->dstAddr = taosStrdup(addr);
addConnToHeapCache(pThrd->connHeapCache, addr, pConn);
code = addConnToHeapCache(pThrd->connHeapCache, addr, pConn);
if (code != 0) {
// do nothing
} else {
// do nothing
}
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
}
@ -2858,7 +2884,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransInst == NULL) {
if (pTransInst == NULL || pTransRsp == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransRsp);
return TSDB_CODE_RPC_BROKEN_LINK;
@ -2873,6 +2899,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
if (sem == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(sem, 0, 0);
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
@ -2915,9 +2945,25 @@ _RETURN:
}
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
if (sem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int ret = tsem_init(sem, 0, 0);
if (ret != 0) {
taosMemoryFree(sem);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
if (pSyncMsg == NULL) {
tsem_destroy(sem);
taosMemoryFree(sem);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosInitRWLatch(&pSyncMsg->latch);
pSyncMsg->inited = 0;
@ -2929,13 +2975,16 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
}
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg);
return TSDB_CODE_RPC_BROKEN_LINK;
}
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransMsg == NULL) {
transFreeMsg(pReq->pCont);
return TSDB_CODE_OUT_OF_MEMORY;
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
@ -2953,6 +3002,13 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg);
if (pCtx->syncMsgRef < 0) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg);
taosMemoryFree(pCtx);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return terrno;
}
int64_t ref = pCtx->syncMsgRef;
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref);
@ -3048,8 +3104,12 @@ int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
}
return 1;
}
int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) {
int transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) {
heap->heap = heapCreate(cmpFunc);
if (heap->heap == NULL) {
return -1;
}
heap->cmpFunc = cmpFunc;
return 0;
}
@ -3063,7 +3123,6 @@ int transHeapGet(SHeap* heap, SCliConn** p) {
*p = NULL;
return -1;
}
// HeapNode* minNode = headMin(heap->heap);
HeapNode* minNode = heapMin(heap->heap);
if (minNode == NULL) {
*p = NULL;

View File

@ -21,6 +21,7 @@ size_t heapSize(Heap* heap) { return heap->nelts; }
Heap* heapCreate(HeapCompareFn fn) {
Heap* heap = taosMemoryCalloc(1, sizeof(Heap));
if (heap == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -188,7 +189,6 @@ void heapRemove(Heap* heap, HeapNode* node) {
void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); }
struct PriorityQueue {
SArray* container;
pq_comp_fn fn;
@ -204,9 +204,7 @@ PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param)
return pq;
}
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) {
pq->fn = fn;
}
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) { pq->fn = fn; }
void destroyPriorityQueue(PriorityQueue* pq) {
if (pq->deleteFn)
@ -218,15 +216,15 @@ void destroyPriorityQueue(PriorityQueue* pq) {
static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ }
static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ }
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */}
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void * tmp = a->data;
a->data = b->data;
b->data = tmp;
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */ }
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void* tmp = a->data;
a->data = b->data;
b->data = tmp;
}
#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i)))
#define pqContainerSize(pq) (taosArrayGetSize((pq)->container))
#define pqContainerSize(pq) (taosArrayGetSize((pq)->container))
size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); }
@ -288,9 +286,7 @@ static void pqRemove(PriorityQueue* pq, size_t i) {
pqUpdate(pq, i);
}
PriorityQueueNode* taosPQTop(PriorityQueue* pq) {
return pqContainerGetEle(pq, 0);
}
PriorityQueueNode* taosPQTop(PriorityQueue* pq) { return pqContainerGetEle(pq, 0); }
PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) {
taosArrayPush(pq->container, node);
@ -316,9 +312,7 @@ BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete delete
return q;
}
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) {
taosPQSetFn(q->queue, fn);
}
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { taosPQSetFn(q->queue, fn); }
void destroyBoundedQueue(BoundedQueue* q) {
if (!q) return;
@ -343,22 +337,12 @@ PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
}
}
PriorityQueueNode* taosBQTop(BoundedQueue* q) {
return taosPQTop(q->queue);
}
PriorityQueueNode* taosBQTop(BoundedQueue* q) { return taosPQTop(q->queue); }
void taosBQBuildHeap(BoundedQueue *q) {
pqBuildHeap(q->queue);
}
void taosBQBuildHeap(BoundedQueue* q) { pqBuildHeap(q->queue); }
size_t taosBQMaxSize(BoundedQueue* q) {
return q->maxSize;
}
size_t taosBQMaxSize(BoundedQueue* q) { return q->maxSize; }
size_t taosBQSize(BoundedQueue* q) {
return taosPQSize(q->queue);
}
size_t taosBQSize(BoundedQueue* q) { return taosPQSize(q->queue); }
void taosBQPop(BoundedQueue* q) {
taosPQPop(q->queue);
}
void taosBQPop(BoundedQueue* q) { taosPQPop(q->queue); }