handle rpc retry
This commit is contained in:
parent
7c57b03de7
commit
25a12d96b0
|
@ -136,7 +136,7 @@ void destroyTscObj(void *pObj) {
|
||||||
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
|
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
|
||||||
if (0 == connNum) {
|
if (0 == connNum) {
|
||||||
// TODO
|
// TODO
|
||||||
closeTransporter(pTscObj);
|
// closeTransporter(pTscObj);
|
||||||
}
|
}
|
||||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id,
|
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id,
|
||||||
pTscObj->pAppInfo->numOfConns);
|
pTscObj->pAppInfo->numOfConns);
|
||||||
|
|
|
@ -79,6 +79,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
return pRpc;
|
return pRpc;
|
||||||
}
|
}
|
||||||
void rpcClose(void* arg) {
|
void rpcClose(void* arg) {
|
||||||
|
tInfo("start to close rpc");
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)arg;
|
SRpcInfo* pRpc = (SRpcInfo*)arg;
|
||||||
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
|
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
|
||||||
transCloseExHandleMgt(pRpc->refMgt);
|
transCloseExHandleMgt(pRpc->refMgt);
|
||||||
|
|
|
@ -724,14 +724,13 @@ void cliConnCb(uv_connect_t* req, int status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
|
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
pThrd->quit = true;
|
||||||
tDebug("cli work thread %p start to quit", pThrd);
|
tDebug("cli work thread %p start to quit", pThrd);
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
destroyConnPool(pThrd->pool);
|
destroyConnPool(pThrd->pool);
|
||||||
uv_timer_stop(&pThrd->timer);
|
uv_timer_stop(&pThrd->timer);
|
||||||
uv_walk(pThrd->loop, cliWalkCb, NULL);
|
uv_walk(pThrd->loop, cliWalkCb, NULL);
|
||||||
|
|
||||||
pThrd->quit = true;
|
|
||||||
|
|
||||||
// uv_stop(pThrd->loop);
|
// uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
@ -977,7 +976,10 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int cliRBChoseIdx(STrans* pTransInst) {
|
int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
int64_t index = pTransInst->index;
|
int8_t index = pTransInst->index;
|
||||||
|
if (pTransInst->numOfThreads == 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
||||||
pTransInst->index = 0;
|
pTransInst->index = 0;
|
||||||
}
|
}
|
||||||
|
@ -1120,6 +1122,7 @@ SCliThrd* transGetWorkThrdFromHandle(int64_t handle) {
|
||||||
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
|
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
|
||||||
if (handle == 0) {
|
if (handle == 0) {
|
||||||
int idx = cliRBChoseIdx(trans);
|
int idx = cliRBChoseIdx(trans);
|
||||||
|
if (idx < 0) return NULL;
|
||||||
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
|
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
|
||||||
}
|
}
|
||||||
return transGetWorkThrdFromHandle(handle);
|
return transGetWorkThrdFromHandle(handle);
|
||||||
|
|
Loading…
Reference in New Issue