fix: fix rpc quit problem

This commit is contained in:
yihaoDeng 2022-06-28 19:19:40 +08:00
parent 66e8e06861
commit bb95a3427e
2 changed files with 14 additions and 3 deletions

View File

@ -83,6 +83,7 @@ void rpcClose(void* arg) {
SRpcInfo* pRpc = (SRpcInfo*)arg; SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
taosMemoryFree(pRpc); taosMemoryFree(pRpc);
tInfo("finish to close rpc");
return; return;
} }

View File

@ -316,8 +316,9 @@ void cliHandleResp(SCliConn* conn) {
if (CONN_NO_PERSIST_BY_APP(conn)) { if (CONN_NO_PERSIST_BY_APP(conn)) {
pMsg = transQueuePop(&conn->cliMsgs); pMsg = transQueuePop(&conn->cliMsgs);
pCtx = pMsg->ctx;
transMsg.info.ahandle = pCtx->ahandle; pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} else { } else {
uint64_t ahandle = (uint64_t)pHead->ahandle; uint64_t ahandle = (uint64_t)pHead->ahandle;
@ -501,6 +502,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
static void allocConnRef(SCliConn* conn, bool update) { static void allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
transRemoveExHandle(conn->refId); transRemoveExHandle(conn->refId);
conn->refId = -1;
} }
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn; exh->handle = conn;
@ -600,6 +602,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
transRemoveExHandle(conn->refId); transRemoveExHandle(conn->refId);
conn->refId = -1;
if (clear) { if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) { if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
@ -609,8 +613,11 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
} }
} }
static void cliDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) return;
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
transRemoveExHandle(conn->refId);
taosMemoryFree(conn->ip); taosMemoryFree(conn->ip);
conn->stream->data = NULL;
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
transQueueDestroy(&conn->cliMsgs); transQueueDestroy(&conn->cliMsgs);
@ -968,7 +975,7 @@ void cliSendQuit(SCliThrd* thrd) {
} }
void cliWalkCb(uv_handle_t* handle, void* arg) { void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
uv_close(handle, NULL); uv_close(handle, cliDestroy);
} }
} }
@ -1106,8 +1113,11 @@ SCliThrd* transGetWorkThrdFromHandle(int64_t handle) {
SCliThrd* pThrd = NULL; SCliThrd* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(handle); SExHandle* exh = transAcquireExHandle(handle);
if (exh == NULL) { if (exh == NULL) {
tTrace("no, no %" PRId64 "", handle);
return NULL; return NULL;
} }
tTrace("YY %" PRId64 "", handle);
pThrd = exh->pThrd; pThrd = exh->pThrd;
transReleaseExHandle(handle); transReleaseExHandle(handle);
return pThrd; return pThrd;