refator(rpc): refator rpc retry way
This commit is contained in:
parent
707be3a826
commit
4e7e83399f
|
@ -28,6 +28,7 @@ extern "C" {
|
|||
#include "taoserror.h"
|
||||
#include "tglobal.h"
|
||||
#include "thash.h"
|
||||
#include "theap.h"
|
||||
#include "tidpool.h"
|
||||
#include "tmd5.h"
|
||||
#include "tmempool.h"
|
||||
|
@ -328,10 +329,31 @@ void transQueueClear(STransQueue* queue);
|
|||
*/
|
||||
void transQueueDestroy(STransQueue* queue);
|
||||
|
||||
typedef struct SDelayTask {
|
||||
void (*func)(void* arg);
|
||||
void* arg;
|
||||
uint64_t execTime;
|
||||
HeapNode node;
|
||||
} SDelayTask;
|
||||
|
||||
typedef struct SDelayQueue {
|
||||
uv_timer_t* timer;
|
||||
Heap* heap;
|
||||
uv_loop_t* loop;
|
||||
void (*free)(void* arg);
|
||||
} SDelayQueue;
|
||||
|
||||
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue);
|
||||
|
||||
void transDestroyDelayQueue(SDelayQueue* queue);
|
||||
|
||||
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||
|
||||
/*
|
||||
* init global func
|
||||
*/
|
||||
void transThreadOnce();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -358,4 +358,73 @@ void transQueueDestroy(STransQueue* queue) {
|
|||
transQueueClear(queue);
|
||||
taosArrayDestroy(queue->q);
|
||||
}
|
||||
|
||||
static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
||||
SDelayTask* arg1 = container_of(a, SDelayTask, node);
|
||||
SDelayTask* arg2 = container_of(b, SDelayTask, node);
|
||||
if (arg1->execTime > arg2->execTime) {
|
||||
return -1;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void transDelayQueueTimeout(uv_timer_t* timer) {
|
||||
SDelayQueue* queue = timer->data;
|
||||
HeapNode* node = heapMin(queue->heap);
|
||||
if (node == NULL) {
|
||||
// DO NOTHING
|
||||
}
|
||||
heapRemove(queue->heap, node);
|
||||
|
||||
SDelayTask* task = container_of(node, SDelayTask, node);
|
||||
task->func(task->arg);
|
||||
taosMemoryFree(task);
|
||||
|
||||
node = heapMin(queue->heap);
|
||||
if (node == NULL) {
|
||||
return;
|
||||
}
|
||||
task = container_of(node, SDelayTask, node);
|
||||
uint64_t timeout = task->execTime > uv_now(queue->loop) ? task->execTime - uv_now(queue->loop) : 0;
|
||||
uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0);
|
||||
}
|
||||
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
|
||||
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||
uv_timer_init(loop, timer);
|
||||
|
||||
Heap* heap = heapCreate(timeCompare);
|
||||
|
||||
SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
|
||||
q->heap = heap;
|
||||
q->timer = timer;
|
||||
q->loop = loop;
|
||||
q->timer->data = q;
|
||||
|
||||
*queue = q;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void transDestroyDelayQueue(SDelayQueue* queue) {
|
||||
uv_timer_stop(queue->timer);
|
||||
taosMemoryFree(queue->timer);
|
||||
heapDestroy(queue->heap);
|
||||
taosMemoryFree(queue);
|
||||
}
|
||||
|
||||
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
||||
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
||||
|
||||
task->func = func;
|
||||
task->arg = arg;
|
||||
task->execTime = uv_now(queue->loop) + timeoutMs;
|
||||
|
||||
int size = heapSize(queue->heap);
|
||||
heapInsert(queue->heap, &task->node);
|
||||
if (size == 1) {
|
||||
uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue