Merge pull request #11971 from taosdata/feature/refator_retry
refactor(rpc): refactor rpc retry way
This commit is contained in:
commit
33cc6c28ba
|
@ -28,6 +28,7 @@ extern "C" {
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
#include "theap.h"
|
||||||
#include "tidpool.h"
|
#include "tidpool.h"
|
||||||
#include "tmd5.h"
|
#include "tmd5.h"
|
||||||
#include "tmempool.h"
|
#include "tmempool.h"
|
||||||
|
@ -328,10 +329,39 @@ void transQueueClear(STransQueue* queue);
|
||||||
*/
|
*/
|
||||||
void transQueueDestroy(STransQueue* queue);
|
void transQueueDestroy(STransQueue* queue);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* delay queue based on uv loop and uv timer, and only used in retry
|
||||||
|
*/
|
||||||
|
typedef struct STaskArg {
|
||||||
|
void* param1;
|
||||||
|
void* param2;
|
||||||
|
} STaskArg;
|
||||||
|
|
||||||
|
typedef struct SDelayTask {
|
||||||
|
void (*func)(void* arg);
|
||||||
|
void* arg;
|
||||||
|
uint64_t execTime;
|
||||||
|
HeapNode node;
|
||||||
|
} SDelayTask;
|
||||||
|
|
||||||
|
typedef struct SDelayQueue {
|
||||||
|
uv_timer_t* timer;
|
||||||
|
Heap* heap;
|
||||||
|
uv_loop_t* loop;
|
||||||
|
void (*free)(void* arg);
|
||||||
|
} SDelayQueue;
|
||||||
|
|
||||||
|
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue);
|
||||||
|
|
||||||
|
void transDestroyDelayQueue(SDelayQueue* queue);
|
||||||
|
|
||||||
|
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* init global func
|
* init global func
|
||||||
*/
|
*/
|
||||||
void transThreadOnce();
|
void transThreadOnce();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -60,7 +60,7 @@ typedef struct SCliThrdObj {
|
||||||
// msg queue
|
// msg queue
|
||||||
queue msg;
|
queue msg;
|
||||||
TdThreadMutex msgMtx;
|
TdThreadMutex msgMtx;
|
||||||
|
SDelayQueue* delayQueue;
|
||||||
uint64_t nextTimeout; // next timeout
|
uint64_t nextTimeout; // next timeout
|
||||||
void* pTransInst; //
|
void* pTransInst; //
|
||||||
bool quit;
|
bool quit;
|
||||||
|
@ -837,12 +837,13 @@ static SCliThrdObj* createThrdObj() {
|
||||||
uv_loop_init(pThrd->loop);
|
uv_loop_init(pThrd->loop);
|
||||||
|
|
||||||
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
|
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
|
||||||
|
|
||||||
uv_timer_init(pThrd->loop, &pThrd->timer);
|
uv_timer_init(pThrd->loop, &pThrd->timer);
|
||||||
pThrd->timer.data = pThrd;
|
pThrd->timer.data = pThrd;
|
||||||
|
|
||||||
pThrd->pool = createConnPool(4);
|
pThrd->pool = createConnPool(4);
|
||||||
|
|
||||||
|
transCreateDelayQueue(pThrd->loop, &pThrd->delayQueue);
|
||||||
|
|
||||||
pThrd->quit = false;
|
pThrd->quit = false;
|
||||||
return pThrd;
|
return pThrd;
|
||||||
}
|
}
|
||||||
|
@ -850,12 +851,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
CLI_RELEASE_UV(pThrd->loop);
|
CLI_RELEASE_UV(pThrd->loop);
|
||||||
taosThreadMutexDestroy(&pThrd->msgMtx);
|
taosThreadMutexDestroy(&pThrd->msgMtx);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
|
|
||||||
uv_timer_stop(&pThrd->timer);
|
transDestroyDelayQueue(pThrd->delayQueue);
|
||||||
taosMemoryFree(pThrd->loop);
|
taosMemoryFree(pThrd->loop);
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
}
|
}
|
||||||
|
@ -884,6 +886,16 @@ int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
}
|
}
|
||||||
return index % pTransInst->numOfThreads;
|
return index % pTransInst->numOfThreads;
|
||||||
}
|
}
|
||||||
|
static void doDelayTask(void* param) {
|
||||||
|
STaskArg* arg = param;
|
||||||
|
|
||||||
|
SCliMsg* pMsg = arg->param1;
|
||||||
|
SCliThrdObj* pThrd = arg->param2;
|
||||||
|
|
||||||
|
cliHandleReq(pMsg, pThrd);
|
||||||
|
|
||||||
|
taosMemoryFree(arg);
|
||||||
|
}
|
||||||
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
@ -907,7 +919,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
if (pCtx->retryCount < pEpSet->numOfEps) {
|
if (pCtx->retryCount < pEpSet->numOfEps) {
|
||||||
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
||||||
cliHandleReq(pMsg, pThrd);
|
|
||||||
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
|
arg->param1 = pMsg;
|
||||||
|
arg->param2 = pThrd;
|
||||||
|
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
|
|
||||||
cliDestroy((uv_handle_t*)pConn->stream);
|
cliDestroy((uv_handle_t*)pConn->stream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -919,8 +936,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
||||||
pCtx->epSet = emsg.epSet;
|
pCtx->epSet = emsg.epSet;
|
||||||
}
|
}
|
||||||
cliHandleReq(pMsg, pThrd);
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
// release pConn
|
arg->param1 = pMsg;
|
||||||
|
arg->param2 = pThrd;
|
||||||
|
|
||||||
|
transPutTaskToDelayQueue(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
addConnToPool(pThrd, pConn);
|
addConnToPool(pThrd, pConn);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -358,4 +358,85 @@ void transQueueDestroy(STransQueue* queue) {
|
||||||
transQueueClear(queue);
|
transQueueClear(queue);
|
||||||
taosArrayDestroy(queue->q);
|
taosArrayDestroy(queue->q);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
||||||
|
SDelayTask* arg1 = container_of(a, SDelayTask, node);
|
||||||
|
SDelayTask* arg2 = container_of(b, SDelayTask, node);
|
||||||
|
if (arg1->execTime > arg2->execTime) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void transDelayQueueTimeout(uv_timer_t* timer) {
|
||||||
|
SDelayQueue* queue = timer->data;
|
||||||
|
tTrace("timer %p timeout", timer);
|
||||||
|
uint64_t timeout = 0;
|
||||||
|
do {
|
||||||
|
HeapNode* minNode = heapMin(queue->heap);
|
||||||
|
if (minNode == NULL) break;
|
||||||
|
SDelayTask* task = container_of(minNode, SDelayTask, node);
|
||||||
|
if (task->execTime <= taosGetTimestampMs()) {
|
||||||
|
heapRemove(queue->heap, minNode);
|
||||||
|
task->func(task->arg);
|
||||||
|
taosMemoryFree(task);
|
||||||
|
timeout = 0;
|
||||||
|
} else {
|
||||||
|
timeout = task->execTime - taosGetTimestampMs();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} while (1);
|
||||||
|
if (timeout != 0) {
|
||||||
|
uv_timer_start(queue->timer, transDelayQueueTimeout, timeout, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int transCreateDelayQueue(uv_loop_t* loop, SDelayQueue** queue) {
|
||||||
|
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
|
uv_timer_init(loop, timer);
|
||||||
|
|
||||||
|
Heap* heap = heapCreate(timeCompare);
|
||||||
|
|
||||||
|
SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
|
||||||
|
q->heap = heap;
|
||||||
|
q->timer = timer;
|
||||||
|
q->loop = loop;
|
||||||
|
q->timer->data = q;
|
||||||
|
|
||||||
|
*queue = q;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void transDestroyDelayQueue(SDelayQueue* queue) {
|
||||||
|
taosMemoryFree(queue->timer);
|
||||||
|
|
||||||
|
while (heapSize(queue->heap) > 0) {
|
||||||
|
HeapNode* minNode = heapMin(queue->heap);
|
||||||
|
if (minNode == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
heapRemove(queue->heap, minNode);
|
||||||
|
|
||||||
|
SDelayTask* task = container_of(minNode, SDelayTask, node);
|
||||||
|
taosMemoryFree(task);
|
||||||
|
}
|
||||||
|
heapDestroy(queue->heap);
|
||||||
|
taosMemoryFree(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
int transPutTaskToDelayQueue(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
||||||
|
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
||||||
|
|
||||||
|
task->func = func;
|
||||||
|
task->arg = arg;
|
||||||
|
task->execTime = taosGetTimestampMs() + timeoutMs;
|
||||||
|
|
||||||
|
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
|
||||||
|
heapInsert(queue->heap, &task->node);
|
||||||
|
if (heapSize(queue->heap) == 1) {
|
||||||
|
uv_timer_start(queue->timer, transDelayQueueTimeout, timeoutMs, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue