diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 826b572018..b740968007 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -445,8 +445,9 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead int64_t qId = taosHton64(pHead->qid); STraceId* trace = &pHead->traceId; int32_t seqNum = htonl(pHead->seqNum); - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%ld", CONN_GET_INST_LABEL(conn), - conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, qId); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%" PRId64 "", + CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, + qId); STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId)); transCtxCleanup(p); @@ -469,6 +470,9 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1); transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1); + transReleaseExHandle(transGetRefMgt(), qId); + transRemoveExHandle(transGetRefMgt(), qId); + while (!QUEUE_IS_EMPTY(&set)) { queue* el = QUEUE_HEAD(&set); QUEUE_REMOVE(el); @@ -496,8 +500,9 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST } STraceId* trace = &pHead->traceId; pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType); - tGDebug("%s conn %p %s received from %s, local info:%s, qid:%ld, create ahandle %p by %s", CONN_GET_INST_LABEL(conn), - conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle, TMSG_INFO(pHead->msgType)); + tGDebug("%s conn %p %s received from %s, local info:%s, qid:%" PRId64 ", create ahandle %p by %s", + CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle, + TMSG_INFO(pHead->msgType)); return 0; } @@ -553,8 +558,8 @@ void cliHandleResp(SCliConn* conn) { code = cliNotifyCb(conn, NULL, &resp); return; } else { - tDebug("%s conn %p recv unexpected packet, seqNum:%d,qId:%ld reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, - qId, tstrerror(code)); + tDebug("%s conn %p recv unexpected packet, seqNum:%d,qid:%" PRId64 " reason:%s", CONN_GET_INST_LABEL(conn), conn, + seq, qId, tstrerror(code)); } if (code != 0) { tDebug("%s conn %p recv unexpected packet, seqNum:%d, qId:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, @@ -572,8 +577,8 @@ void cliHandleResp(SCliConn* conn) { if (code != 0) { tGDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%ld", CONN_GET_INST_LABEL(conn), conn, - TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(conn), + conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId); } code = cliNotifyCb(conn, pReq, &resp); @@ -1207,7 +1212,7 @@ int32_t cliBatchSend(SCliConn* pConn) { pCliMsg->seq = pConn->seq; STraceId* trace = &pCliMsg->msg.info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, + tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); } @@ -1548,17 +1553,17 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) { SReqCtx* pCtx = pReq->ctx; SCliThrd* pThrd = pConn->hostThrd; if (pCtx == NULL) { - tDebug("%s conn %p not need to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p not need to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); return 0; } STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); if (pUserCtx == NULL) { code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); - tDebug("%s conn %p succ to add statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to add statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } else { transCtxMerge(pUserCtx, &pCtx->userCtx); - tDebug("%s conn %s succ to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } return 0; } @@ -1575,11 +1580,11 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { if (pReq->ctx == NULL) { return TSDB_CODE_RPC_STATE_DROPED; } - tDebug("%s conn %p failed to get statue, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p failed to get statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } else { *pConn = pState->conn; - tDebug("%s conn %p succ to get conn of statue, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to get conn of statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } return 0; } @@ -1594,9 +1599,9 @@ int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* SReqState state = {.conn = pConn, .arg = NULL}; code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); if (code != 0) { - tDebug("%s conn %p failed to statue, qid:%ld", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p failed to statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } else { - tDebug("%s conn %p succ to add statue, qid:%ld (1)", transLabel(pThrd->pInst), pConn, qid); + tDebug("%s conn %p succ to add statue, qid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid); } (void)cliHandleState_mayUpdateStateCtx(pConn, pReq); @@ -1615,7 +1620,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { (void)cliHandleState_mayUpdateStateCtx(pConn, pReq); } else if (code == TSDB_CODE_RPC_STATE_DROPED) { STraceId* trace = &pReq->msg.info.traceId; - tWarn("%s failed to get statue, qid:%ld", pInst->label, pReq->msg.info.qId); + tWarn("%s failed to get statue, qid:%" PRId64 "", pInst->label, pReq->msg.info.qId); destroyReq(pReq); return; } @@ -2603,7 +2608,7 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t if (exh == NULL) { return NULL; } else { - tDebug("%s conn %p got", trans->label, exh->handle); + tDebug("onn %p got", exh->handle); } taosWLockLatch(&exh->latch); if (exh->pThrd == NULL && trans != NULL) { @@ -3066,7 +3071,7 @@ int32_t transAllocHandle(int64_t* refId) { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); - tDebug("trans alloc qid:%ld", exh->refId); + tDebug("trans alloc qid:%" PRId64 ", malloc:%p", exh->refId, exh); *refId = exh->refId; return 0; } @@ -3098,7 +3103,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) { pCli->msg = msg; STraceId* trace = &pCli->msg.info.traceId; - tGDebug("%s start to free conn qid:%ld", pInst->label, transpointId); + tGDebug("%s start to free conn qid:%" PRId64 "", pInst->label, transpointId); code = transAsyncSend(pThrd->asyncPool, &pCli->q); if (code != 0) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index a02f26d4c6..929f645e7e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -779,9 +779,7 @@ void transDestroyExHandle(void* handle) { return; } SExHandle* eh = handle; - if (!QUEUE_IS_EMPTY(&eh->q)) { - tDebug("handle %p mem leak", handle); - } + tDebug("trans destroy qid:%" PRId64 ", memory %p", eh->refId, handle); taosMemoryFree(handle); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index df97061a88..1e9162de9a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -106,6 +106,8 @@ typedef struct SWorkThrd { int8_t enableIpWhiteList; int32_t connRefMgt; + + int8_t inited; } SWorkThrd; typedef struct SServerObj { @@ -391,24 +393,27 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* if (pConn->status == ConnNormal && pHead->noResp == 0) { if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%ld", + tGDebug( + "%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%" PRId64 + "", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); + } else { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); - } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%ld", transLabel(pInst), - pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost, - pTransMsg->info.seqNum, pTransMsg->info.qId); } } else { if (cost >= EXCEPTION_LIMIT_US) { tGDebug( "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, " - "seqNum:%d, qid:%ld", + "seqNum:%d, qid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } else { tGDebug( - "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, qid:%ld", + "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, " + "qid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } @@ -438,14 +443,14 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); if (p == NULL) { code = TSDB_CODE_RPC_NO_STATE; - tTrace("conn %p recv release, and releady release by server qid:%ld", pConn, qId); + tTrace("conn %p recv release, and releady release by server qid:%" PRId64 "", pConn, qId); } else { SSvrRegArg* arg = p; (pInst->cfp)(pInst->parent, &(arg->msg), NULL); - tTrace("conn %p recv release, notify server app, qid:%ld", pConn, qId); + tTrace("conn %p recv release, notify server app, qid:%" PRId64 "", pConn, qId); (void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); - tTrace("conn %p clear state,qid:%ld", pConn, qId); + tTrace("conn %p clear state,qid:%" PRId64 "", pConn, qId); } STransMsg tmsg = {.code = code, @@ -572,8 +577,6 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.qId = taosHton64(pHead->qid); transMsg.info.msgType = pHead->msgType; - // uvMaySetConnAcquired(pConn, pHead); - uvPerfLog_receive(pConn, pHead, &transMsg); // set up conn info @@ -668,7 +671,7 @@ void uvOnSendCb(uv_write_t* req, int status) { SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); STraceId* trace = &smsg->msg.info.traceId; - tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn, + tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%" PRId64 "", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId); destroySmsg(smsg); } @@ -679,7 +682,7 @@ void uvOnSendCb(uv_write_t* req, int status) { SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); STraceId* trace = &smsg->msg.info.traceId; - tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn, + tGDebug("%s conn %p failed to send, seqNum:%d, qid:%" PRId64 ", reason:%s", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status)); destroySmsg(smsg); } @@ -750,7 +753,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { } STraceId* trace = &pMsg->info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%d, qid:%ld", transLabel(pInst), pConn, + tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%d, qid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId); wb->base = (char*)pHead; @@ -840,7 +843,7 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) { if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); if (p == NULL) { - tError("%s conn %p already release qid:%ld", transLabel(pConn->pInst), pConn, qid); + tError("%s conn %p already release qid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid); return TSDB_CODE_RPC_NO_STATE; } else { transFreeMsg(p->msg.pCont); @@ -1024,7 +1027,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { // TODO(log other failure reason) tWarn("failed to create connect:%p, reason: %s", q, uv_err_name(nread)); taosMemoryFree(buf->base); - // uv_close((uv_handle_t*)q, NULL); + uv_close((uv_handle_t*)q, NULL); return; } // free memory allocated by @@ -1338,7 +1341,7 @@ void uvConnDestroyAllState(SSvrConn* p) { SSvrRegArg* arg = pIter; int64_t* qid = taosHashGetKey(pIter, NULL); (pInst->cfp)(pInst->parent, &(arg->msg), NULL); - tTrace("conn %p broken, notify server app, qid%ld", p, *qid); + tTrace("conn %p broken, notify server app, qid:%" PRId64 "", p, *qid); pIter = taosHashIterate(pQTable, pIter); } @@ -1524,7 +1527,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, code = TSDB_CODE_OUT_OF_MEMORY; goto End; } - srv->pThreadObj[i] = thrd; thrd->pInst = pInit; thrd->quit = false; @@ -1571,7 +1573,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pipe = &(srv->pipe[i][1]); // init read thrd->fd = fds[0]; + srv->pThreadObj[i] = thrd; + thrd->inited = 1; if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1715,6 +1719,7 @@ void destroyWorkThrdObj(SWorkThrd* pThrd) { } transAsyncPoolDestroy(pThrd->asyncPool); uvWhiteListDestroy(pThrd->pWhiteList); + taosCloseRef(pThrd->connRefMgt); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -1722,15 +1727,13 @@ void destroyWorkThrd(SWorkThrd* pThrd) { if (pThrd == NULL) { return; } - (void)taosThreadJoin(pThrd->thread, NULL); - SRV_RELEASE_UV(pThrd->loop); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL); - transAsyncPoolDestroy(pThrd->asyncPool); - - uvWhiteListDestroy(pThrd->pWhiteList); - - taosMemoryFree(pThrd->loop); - taosMemoryFree(pThrd); + if (pThrd->inited) { + sendQuitToWorkThrd(pThrd); + (void)taosThreadJoin(pThrd->thread, NULL); + SRV_RELEASE_UV(pThrd->loop); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL); + } + destroyWorkThrdObj(pThrd); } void sendQuitToWorkThrd(SWorkThrd* pThrd) { SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));