From 4e7e83399f14de8d07b2f9d2f034c6359995d2c1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 27 Apr 2022 23:55:44 +0800 Subject: [PATCH] refator(rpc): refator rpc retry way --- source/libs/transport/inc/transComm.h | 22 +++++++++ source/libs/transport/src/transComm.c | 69 +++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 37f21e2511..db6b3daf98 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -28,6 +28,7 @@ extern "C" { #include "taoserror.h" #include "tglobal.h" #include "thash.h" +#include "theap.h" #include "tidpool.h" #include "tmd5.h" #include "tmempool.h" @@ -328,10 +329,31 @@ void transQueueClear(STransQueue* queue); */ void transQueueDestroy(STransQueue* queue); +typedef struct SDelayTask { + void (*func)(void* arg); + void* arg; + uint64_t execTime; + HeapNode node; +} SDelayTask; + +typedef struct SDelayQueue { + uv_timer_t* timer; + Heap* heap; + uv_loop_t* loop; + void (*free)(void* arg); +} SDelayQueue; + +int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue); + +void transDestroyDelayQueue(SDelayQueue* queue); + +int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); + /* * init global func */ void transThreadOnce(); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index eb42029090..00816eb709 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -358,4 +358,73 @@ void transQueueDestroy(STransQueue* queue) { transQueueClear(queue); taosArrayDestroy(queue->q); } + +static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { + SDelayTask* arg1 = container_of(a, SDelayTask, node); + SDelayTask* arg2 = container_of(b, SDelayTask, node); + if (arg1->execTime > arg2->execTime) { + return -1; + } else { + return 1; + } +} + +static void transDelayQueueTimeout(uv_timer_t* timer) { + SDelayQueue* queue = timer->data; + HeapNode* node = heapMin(queue->heap); + if (node == NULL) { + // DO NOTHING + } + heapRemove(queue->heap, node); + + SDelayTask* task = container_of(node, SDelayTask, node); + task->func(task->arg); + taosMemoryFree(task); + + node = heapMin(queue->heap); + if (node == NULL) { + return; + } + task = container_of(node, SDelayTask, node); + uint64_t timeout = task->execTime > uv_now(queue->loop) ? task->execTime - uv_now(queue->loop) : 0; + uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0); +} +int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) { + uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + uv_timer_init(loop, timer); + + Heap* heap = heapCreate(timeCompare); + + SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + q->heap = heap; + q->timer = timer; + q->loop = loop; + q->timer->data = q; + + *queue = q; + return 0; +} + +void transDestroyDelayQueue(SDelayQueue* queue) { + uv_timer_stop(queue->timer); + taosMemoryFree(queue->timer); + heapDestroy(queue->heap); + taosMemoryFree(queue); +} + +int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); + + task->func = func; + task->arg = arg; + task->execTime = uv_now(queue->loop) + timeoutMs; + + int size = heapSize(queue->heap); + heapInsert(queue->heap, &task->node); + if (size == 1) { + uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0); + } + + return 0; +} #endif