diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 37f21e2511..aa3c27e537 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -28,6 +28,7 @@ extern "C" { #include "taoserror.h" #include "tglobal.h" #include "thash.h" +#include "theap.h" #include "tidpool.h" #include "tmd5.h" #include "tmempool.h" @@ -328,10 +329,39 @@ void transQueueClear(STransQueue* queue); */ void transQueueDestroy(STransQueue* queue); +/* + * delay queue based on uv loop and uv timer, and only used in retry + */ +typedef struct STaskArg { + void* param1; + void* param2; +} STaskArg; + +typedef struct SDelayTask { + void (*func)(void* arg); + void* arg; + uint64_t execTime; + HeapNode node; +} SDelayTask; + +typedef struct SDelayQueue { + uv_timer_t* timer; + Heap* heap; + uv_loop_t* loop; + void (*free)(void* arg); +} SDelayQueue; + +int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue); + +void transDestroyDelayQueue(SDelayQueue* queue); + +int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); + /* * init global func */ void transThreadOnce(); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d59db92799..7af466c0a7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -60,10 +60,10 @@ typedef struct SCliThrdObj { // msg queue queue msg; TdThreadMutex msgMtx; - - uint64_t nextTimeout; // next timeout - void* pTransInst; // - bool quit; + SDelayQueue* delayQueue; + uint64_t nextTimeout; // next timeout + void* pTransInst; // + bool quit; } SCliThrdObj; typedef struct SCliObj { @@ -837,12 +837,13 @@ static SCliThrdObj* createThrdObj() { uv_loop_init(pThrd->loop); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); - uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); + transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue); + pThrd->quit = false; return pThrd; } @@ -850,12 +851,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } + taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); - uv_timer_stop(&pThrd->timer); + transDestroyDelayQueue(pThrd->delayQueue); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -884,6 +886,16 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } +static void doDelayTask(void* param) { + STaskArg* arg = param; + + SCliMsg* pMsg = arg->param1; + SCliThrdObj* pThrd = arg->param2; + + cliHandleReq(pMsg, pThrd); + + taosMemoryFree(arg); +} int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -907,7 +919,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pCtx->retryCount < pEpSet->numOfEps) { pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; - cliHandleReq(pMsg, pThrd); + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); + cliDestroy((uv_handle_t*)pConn->stream); return -1; } @@ -919,8 +936,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); pCtx->epSet = emsg.epSet; } - cliHandleReq(pMsg, pThrd); - // release pConn + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = pMsg; + arg->param2 = pThrd; + + transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); addConnToPool(pThrd, pConn); return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index eb42029090..4d049089ad 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -358,4 +358,85 @@ void transQueueDestroy(STransQueue* queue) { transQueueClear(queue); taosArrayDestroy(queue->q); } + +static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { + SDelayTask* arg1 = container_of(a, SDelayTask, node); + SDelayTask* arg2 = container_of(b, SDelayTask, node); + if (arg1->execTime > arg2->execTime) { + return 0; + } else { + return 1; + } +} + +static void transDelayQueueTimeout(uv_timer_t* timer) { + SDelayQueue* queue = timer->data; + tTrace("timer %p timeout", timer); + uint64_t timeout = 0; + do { + HeapNode* minNode = heapMin(queue->heap); + if (minNode == NULL) break; + SDelayTask* task = container_of(minNode, SDelayTask, node); + if (task->execTime <= taosGetTimestampMs()) { + heapRemove(queue->heap, minNode); + task->func(task->arg); + taosMemoryFree(task); + timeout = 0; + } else { + timeout = task->execTime - taosGetTimestampMs(); + break; + } + } while (1); + if (timeout != 0) { + uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); + } +} +int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { + uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + uv_timer_init(loop, timer); + + Heap* heap = heapCreate(timeCompare); + + SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + q->heap = heap; + q->timer = timer; + q->loop = loop; + q->timer->data = q; + + *queue = q; + return 0; +} + +void transDestroyDelayQueue(SDelayQueue* queue) { + taosMemoryFree(queue->timer); + + while (heapSize(queue->heap) > 0) { + HeapNode* minNode = heapMin(queue->heap); + if (minNode == NULL) { + return; + } + heapRemove(queue->heap, minNode); + + SDelayTask* task = container_of(minNode, SDelayTask, node); + taosMemoryFree(task); + } + heapDestroy(queue->heap); + taosMemoryFree(queue); +} + +int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); + + task->func = func; + task->arg = arg; + task->execTime = taosGetTimestampMs() + timeoutMs; + + tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); + heapInsert(queue->heap, &task->node); + if (heapSize(queue->heap) == 1) { + uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); + } + + return 0; +} #endif