From 4e7e83399f14de8d07b2f9d2f034c6359995d2c1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 27 Apr 2022 23:55:44 +0800 Subject: [PATCH 1/2] refator(rpc): refator rpc retry way --- source/libs/transport/inc/transComm.h | 22 +++++++++ source/libs/transport/src/transComm.c | 69 +++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 37f21e2511..db6b3daf98 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -28,6 +28,7 @@ extern "C" { #include "taoserror.h" #include "tglobal.h" #include "thash.h" +#include "theap.h" #include "tidpool.h" #include "tmd5.h" #include "tmempool.h" @@ -328,10 +329,31 @@ void transQueueClear(STransQueue* queue); */ void transQueueDestroy(STransQueue* queue); +typedef struct SDelayTask { + void (*func)(void* arg); + void* arg; + uint64_t execTime; + HeapNode node; +} SDelayTask; + +typedef struct SDelayQueue { + uv_timer_t* timer; + Heap* heap; + uv_loop_t* loop; + void (*free)(void* arg); +} SDelayQueue; + +int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue); + +void transDestroyDelayQueue(SDelayQueue* queue); + +int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); + /* * init global func */ void transThreadOnce(); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index eb42029090..00816eb709 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -358,4 +358,73 @@ void transQueueDestroy(STransQueue* queue) { transQueueClear(queue); taosArrayDestroy(queue->q); } + +static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { + SDelayTask* arg1 = container_of(a, SDelayTask, node); + SDelayTask* arg2 = container_of(b, SDelayTask, node); + if (arg1->execTime > arg2->execTime) { + return -1; + } else { + return 1; + } +} + +static void transDelayQueueTimeout(uv_timer_t* timer) { + SDelayQueue* queue = timer->data; + HeapNode* node = heapMin(queue->heap); + if (node == NULL) { + // DO NOTHING + } + heapRemove(queue->heap, node); + + SDelayTask* task = container_of(node, SDelayTask, node); + task->func(task->arg); + taosMemoryFree(task); + + node = heapMin(queue->heap); + if (node == NULL) { + return; + } + task = container_of(node, SDelayTask, node); + uint64_t timeout = task->execTime > uv_now(queue->loop) ? task->execTime - uv_now(queue->loop) : 0; + uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); +} +int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { + uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + uv_timer_init(loop, timer); + + Heap* heap = heapCreate(timeCompare); + + SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + q->heap = heap; + q->timer = timer; + q->loop = loop; + q->timer->data = q; + + *queue = q; + return 0; +} + +void transDestroyDelayQueue(SDelayQueue* queue) { + uv_timer_stop(queue->timer); + taosMemoryFree(queue->timer); + heapDestroy(queue->heap); + taosMemoryFree(queue); +} + +int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); + + task->func = func; + task->arg = arg; + task->execTime = uv_now(queue->loop) + timeoutMs; + + int size = heapSize(queue->heap); + heapInsert(queue->heap, &task->node); + if (size == 1) { + uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); + } + + return 0; +} #endif From 9af1206cbdb84a20a83b3944a3856c5840e05cd2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 28 Apr 2022 11:56:00 +0800 Subject: [PATCH 2/2] refactor(rpc): fefactor retry way --- source/libs/transport/inc/transComm.h | 8 ++++ source/libs/transport/src/transCli.c | 38 ++++++++++++++----- source/libs/transport/src/transComm.c | 54 ++++++++++++++++----------- 3 files changed, 70 insertions(+), 30 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index db6b3daf98..aa3c27e537 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -329,6 +329,14 @@ void transQueueClear(STransQueue* queue); */ void transQueueDestroy(STransQueue* queue); +/* + * delay queue based on uv loop and uv timer, and only used in retry + */ +typedef struct STaskArg { + void* param1; + void* param2; +} STaskArg; + typedef struct SDelayTask { void (*func)(void* arg); void* arg; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 20406763de..842093c579 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -60,10 +60,10 @@ typedef struct SCliThrdObj { // msg queue queue msg; TdThreadMutex msgMtx; - - uint64_t nextTimeout; // next timeout - void* pTransInst; // - bool quit; + SDelayQueue* delayQueue; + uint64_t nextTimeout; // next timeout + void* pTransInst; // + bool quit; } SCliThrdObj; typedef struct SCliObj { @@ -838,12 +838,13 @@ static SCliThrdObj* createThrdObj() { uv_loop_init(pThrd->loop); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); - uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); + transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue); + pThrd->quit = false; return pThrd; } @@ -851,12 +852,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } + taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); - uv_timer_stop(&pThrd->timer); + transDestroyDelayQueue(pThrd->delayQueue); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -885,6 +887,16 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } +static void doDelayTask(void* param) { + STaskArg* arg = param; + + SCliMsg* pMsg = arg->param1; + SCliThrdObj* pThrd = arg->param2; + + cliHandleReq(pMsg, pThrd); + + taosMemoryFree(arg); +} int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -908,7 +920,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pCtx->retryCount < pEpSet->numOfEps) { pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; - cliHandleReq(pMsg, pThrd); + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + cliDestroy((uv_handle_t*)pConn->stream); return -1; } @@ -920,8 +937,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); pCtx->epSet = emsg.epSet; } - cliHandleReq(pMsg, pThrd); - // release pConn + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + + transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); addConnToPool(pThrd, pConn); return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 00816eb709..4d049089ad 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -363,7 +363,7 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { SDelayTask* arg1 = container_of(a, SDelayTask, node); SDelayTask* arg2 = container_of(b, SDelayTask, node); if (arg1->execTime > arg2->execTime) { - return -1; + return 0; } else { return 1; } @@ -371,23 +371,25 @@ static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { static void transDelayQueueTimeout(uv_timer_t* timer) { SDelayQueue* queue = timer->data; - HeapNode* node = heapMin(queue->heap); - if (node == NULL) { - // DO NOTHING + tTrace("timer %p timeout", timer); + uint64_t timeout = 0; + do { + HeapNode* minNode = heapMin(queue->heap); + if (minNode == NULL) break; + SDelayTask* task = container_of(minNode, SDelayTask, node); + if (task->execTime <= taosGetTimestampMs()) { + heapRemove(queue->heap, minNode); + task->func(task->arg); + taosMemoryFree(task); + timeout = 0; + } else { + timeout = task->execTime - taosGetTimestampMs(); + break; + } + } while (1); + if (timeout != 0) { + uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); } - heapRemove(queue->heap, node); - - SDelayTask* task = container_of(node, SDelayTask, node); - task->func(task->arg); - taosMemoryFree(task); - - node = heapMin(queue->heap); - if (node == NULL) { - return; - } - task = container_of(node, SDelayTask, node); - uint64_t timeout = task->execTime > uv_now(queue->loop) ? task->execTime - uv_now(queue->loop) : 0; - uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); } int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -406,8 +408,18 @@ int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { } void transDestroyDelayQueue(SDelayQueue* queue) { - uv_timer_stop(queue->timer); taosMemoryFree(queue->timer); + + while (heapSize(queue->heap) > 0) { + HeapNode* minNode = heapMin(queue->heap); + if (minNode == NULL) { + return; + } + heapRemove(queue->heap, minNode); + + SDelayTask* task = container_of(minNode, SDelayTask, node); + taosMemoryFree(task); + } heapDestroy(queue->heap); taosMemoryFree(queue); } @@ -417,11 +429,11 @@ int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* task->func = func; task->arg = arg; - task->execTime = uv_now(queue->loop) + timeoutMs; + task->execTime = taosGetTimestampMs() + timeoutMs; - int size = heapSize(queue->heap); + tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); - if (size == 1) { + if (heapSize(queue->heap) == 1) { uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); }