refactor(rpc): refactor timeout
This commit is contained in:
parent
b83963ab1d
commit
2f66f50c04
|
@ -68,7 +68,7 @@ extern "C" {
|
||||||
*/
|
*/
|
||||||
void iIntersection(SArray *interResults, SArray *finalResult);
|
void iIntersection(SArray *interResults, SArray *finalResult);
|
||||||
|
|
||||||
/* multi sorted result intersection
|
/* multi sorted result union
|
||||||
* input: [1, 2, 4, 5]
|
* input: [1, 2, 4, 5]
|
||||||
* [2, 3, 4, 5]
|
* [2, 3, 4, 5]
|
||||||
* [1, 4, 5]
|
* [1, 4, 5]
|
||||||
|
@ -76,7 +76,7 @@ void iIntersection(SArray *interResults, SArray *finalResult);
|
||||||
*/
|
*/
|
||||||
void iUnion(SArray *interResults, SArray *finalResult);
|
void iUnion(SArray *interResults, SArray *finalResult);
|
||||||
|
|
||||||
/* sorted array
|
/* see example
|
||||||
* total: [1, 2, 4, 5, 7, 8]
|
* total: [1, 2, 4, 5, 7, 8]
|
||||||
* except: [4, 5]
|
* except: [4, 5]
|
||||||
* return: [1, 2, 7, 8] saved in total
|
* return: [1, 2, 7, 8] saved in total
|
||||||
|
|
|
@ -892,7 +892,6 @@ static void doDelayTask(void* param) {
|
||||||
|
|
||||||
SCliMsg* pMsg = arg->param1;
|
SCliMsg* pMsg = arg->param1;
|
||||||
SCliThrdObj* pThrd = arg->param2;
|
SCliThrdObj* pThrd = arg->param2;
|
||||||
|
|
||||||
cliHandleReq(pMsg, pThrd);
|
cliHandleReq(pMsg, pThrd);
|
||||||
|
|
||||||
taosMemoryFree(arg);
|
taosMemoryFree(arg);
|
||||||
|
|
|
@ -425,10 +425,19 @@ void transDQDestroy(SDelayQueue* queue) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
||||||
|
uint64_t now = taosGetTimestampMs();
|
||||||
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
||||||
task->func = func;
|
task->func = func;
|
||||||
task->arg = arg;
|
task->arg = arg;
|
||||||
task->execTime = taosGetTimestampMs() + timeoutMs;
|
task->execTime = now + timeoutMs;
|
||||||
|
|
||||||
|
HeapNode* minNode = heapMin(queue->heap);
|
||||||
|
if (minNode) {
|
||||||
|
SDelayTask* minTask = container_of(minNode, SDelayTask, node);
|
||||||
|
if (minTask->execTime < task->execTime) {
|
||||||
|
timeoutMs = minTask->execTime <= now ? 0 : now - minTask->execTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
||||||
heapInsert(queue->heap, &task->node);
|
heapInsert(queue->heap, &task->node);
|
||||||
|
|
Loading…
Reference in New Issue