enh(rpc): add rpc retry
This commit is contained in:
parent
29608b0866
commit
5f24bf5bf9
|
@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
void transDestroyAsyncPool(SAsyncPool* pool) {
|
void transDestroyAsyncPool(SAsyncPool* pool) {
|
||||||
for (int i = 0; i < pool->nAsync; i++) {
|
for (int i = 0; i < pool->nAsync; i++) {
|
||||||
uv_async_t* async = &(pool->asyncs[i]);
|
uv_async_t* async = &(pool->asyncs[i]);
|
||||||
uv_close((uv_handle_t*)async, NULL);
|
// uv_close((uv_handle_t*)async, NULL);
|
||||||
SAsyncItem* item = async->data;
|
SAsyncItem* item = async->data;
|
||||||
taosThreadMutexDestroy(&item->mtx);
|
taosThreadMutexDestroy(&item->mtx);
|
||||||
taosMemoryFree(item);
|
taosMemoryFree(item);
|
||||||
|
|
|
@ -146,7 +146,8 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister};
|
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
|
||||||
|
uvHandleRegister};
|
||||||
|
|
||||||
static void uvDestroyConn(uv_handle_t* handle);
|
static void uvDestroyConn(uv_handle_t* handle);
|
||||||
|
|
||||||
|
@ -209,12 +210,13 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
}
|
}
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||||
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
||||||
|
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||||
} else {
|
} else {
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port),
|
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
transMsg.contLen, pHead->noResp);
|
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
|
||||||
// no ref here
|
// no ref here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -354,8 +356,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
|
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr),
|
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||||
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
||||||
|
ntohs(pConn->locaddr.sin_port));
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
|
|
||||||
wb->base = msg;
|
wb->base = msg;
|
||||||
|
@ -685,9 +688,9 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
tTrace("work thread quit");
|
tTrace("work thread quit");
|
||||||
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
// uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
// uv_stop(thrd->loop);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -749,9 +752,9 @@ End:
|
||||||
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
thrd->quit = true;
|
thrd->quit = true;
|
||||||
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
// uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
// uv_stop(thrd->loop);
|
||||||
} else {
|
} else {
|
||||||
destroyAllConn(thrd);
|
destroyAllConn(thrd);
|
||||||
}
|
}
|
||||||
|
@ -802,7 +805,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
// MAKE_VALGRIND_HAPPY(pThrd->loop);
|
MAKE_VALGRIND_HAPPY(pThrd->loop);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
taosMemoryFree(pThrd->loop);
|
taosMemoryFree(pThrd->loop);
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
|
|
Loading…
Reference in New Issue