From 2f66f50c0428bd45759fb13f84b4f3a94e11dca4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 28 Apr 2022 14:49:25 +0800 Subject: [PATCH] refactor(rpc): refactor timeout --- source/libs/index/inc/indexUtil.h | 4 ++-- source/libs/transport/src/transCli.c | 1 - source/libs/transport/src/transComm.c | 11 ++++++++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/index/inc/indexUtil.h b/source/libs/index/inc/indexUtil.h index 814d61afd7..f1676ed411 100644 --- a/source/libs/index/inc/indexUtil.h +++ b/source/libs/index/inc/indexUtil.h @@ -68,7 +68,7 @@ extern "C" { */ void iIntersection(SArray *interResults, SArray *finalResult); -/* multi sorted result intersection +/* multi sorted result union * input: [1, 2, 4, 5] * [2, 3, 4, 5] * [1, 4, 5] @@ -76,7 +76,7 @@ void iIntersection(SArray *interResults, SArray *finalResult); */ void iUnion(SArray *interResults, SArray *finalResult); -/* sorted array +/* see example * total: [1, 2, 4, 5, 7, 8] * except: [4, 5] * return: [1, 2, 7, 8] saved in total diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e02bfbafd8..c3b40aa8e0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -892,7 +892,6 @@ static void doDelayTask(void* param) { SCliMsg* pMsg = arg->param1; SCliThrdObj* pThrd = arg->param2; - cliHandleReq(pMsg, pThrd); taosMemoryFree(arg); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 01a20a466a..1bd7d0857e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -425,10 +425,19 @@ void transDQDestroy(SDelayQueue* queue) { } int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); task->func = func; task->arg = arg; - task->execTime = taosGetTimestampMs() + timeoutMs; + task->execTime = now + timeoutMs; + + HeapNode* minNode = heapMin(queue->heap); + if (minNode) { + SDelayTask* minTask = container_of(minNode, SDelayTask, node); + if (minTask->execTime < task->execTime) { + timeoutMs = minTask->execTime <= now ? 0 : now - minTask->execTime; + } + } tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); heapInsert(queue->heap, &task->node);