Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-15 16:40:31 +08:00
parent 3ea509d3c7
commit 832358bd5e
3 changed files with 57 additions and 51 deletions

View File

@ -445,8 +445,9 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId;
int32_t seqNum = htonl(pHead->seqNum);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%ld", CONN_GET_INST_LABEL(conn),
conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, qId);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%" PRId64 "",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum,
qId);
STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId));
transCtxCleanup(p);
@ -469,6 +470,9 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1);
transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
transReleaseExHandle(transGetRefMgt(), qId);
transRemoveExHandle(transGetRefMgt(), qId);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
@ -496,8 +500,9 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST
}
STraceId* trace = &pHead->traceId;
pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType);
tGDebug("%s conn %p %s received from %s, local info:%s, qid:%ld, create ahandle %p by %s", CONN_GET_INST_LABEL(conn),
conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle, TMSG_INFO(pHead->msgType));
tGDebug("%s conn %p %s received from %s, local info:%s, qid:%" PRId64 ", create ahandle %p by %s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle,
TMSG_INFO(pHead->msgType));
return 0;
}
@ -553,8 +558,8 @@ void cliHandleResp(SCliConn* conn) {
code = cliNotifyCb(conn, NULL, &resp);
return;
} else {
tDebug("%s conn %p recv unexpected packet, seqNum:%d,qId:%ld reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
qId, tstrerror(code));
tDebug("%s conn %p recv unexpected packet, seqNum:%d,qid:%" PRId64 " reason:%s", CONN_GET_INST_LABEL(conn), conn,
seq, qId, tstrerror(code));
}
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, seqNum:%d, qId:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
@ -572,8 +577,8 @@ void cliHandleResp(SCliConn* conn) {
if (code != 0) {
tGDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%ld", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(conn),
conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId);
}
code = cliNotifyCb(conn, pReq, &resp);
@ -1207,7 +1212,7 @@ int32_t cliBatchSend(SCliConn* pConn) {
pCliMsg->seq = pConn->seq;
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn,
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
}
@ -1548,17 +1553,17 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
SReqCtx* pCtx = pReq->ctx;
SCliThrd* pThrd = pConn->hostThrd;
if (pCtx == NULL) {
tDebug("%s conn %p not need to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p not need to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
return 0;
}
STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (pUserCtx == NULL) {
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("%s conn %p succ to add statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p succ to add statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} else {
transCtxMerge(pUserCtx, &pCtx->userCtx);
tDebug("%s conn %s succ to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p succ to update statue ctx, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
}
return 0;
}
@ -1575,11 +1580,11 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
if (pReq->ctx == NULL) {
return TSDB_CODE_RPC_STATE_DROPED;
}
tDebug("%s conn %p failed to get statue, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p failed to get statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else {
*pConn = pState->conn;
tDebug("%s conn %p succ to get conn of statue, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p succ to get conn of statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
}
return 0;
}
@ -1594,9 +1599,9 @@ int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn*
SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
if (code != 0) {
tDebug("%s conn %p failed to statue, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p failed to statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} else {
tDebug("%s conn %p succ to add statue, qid:%ld (1)", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p succ to add statue, qid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid);
}
(void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
@ -1615,7 +1620,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
(void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
} else if (code == TSDB_CODE_RPC_STATE_DROPED) {
STraceId* trace = &pReq->msg.info.traceId;
tWarn("%s failed to get statue, qid:%ld", pInst->label, pReq->msg.info.qId);
tWarn("%s failed to get statue, qid:%" PRId64 "", pInst->label, pReq->msg.info.qId);
destroyReq(pReq);
return;
}
@ -2603,7 +2608,7 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
if (exh == NULL) {
return NULL;
} else {
tDebug("%s conn %p got", trans->label, exh->handle);
tDebug("onn %p got", exh->handle);
}
taosWLockLatch(&exh->latch);
if (exh->pThrd == NULL && trans != NULL) {
@ -3066,7 +3071,7 @@ int32_t transAllocHandle(int64_t* refId) {
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
tDebug("trans alloc qid:%ld", exh->refId);
tDebug("trans alloc qid:%" PRId64 ", malloc:%p", exh->refId, exh);
*refId = exh->refId;
return 0;
}
@ -3098,7 +3103,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) {
pCli->msg = msg;
STraceId* trace = &pCli->msg.info.traceId;
tGDebug("%s start to free conn qid:%ld", pInst->label, transpointId);
tGDebug("%s start to free conn qid:%" PRId64 "", pInst->label, transpointId);
code = transAsyncSend(pThrd->asyncPool, &pCli->q);
if (code != 0) {

View File

@ -779,9 +779,7 @@ void transDestroyExHandle(void* handle) {
return;
}
SExHandle* eh = handle;
if (!QUEUE_IS_EMPTY(&eh->q)) {
tDebug("handle %p mem leak", handle);
}
tDebug("trans destroy qid:%" PRId64 ", memory %p", eh->refId, handle);
taosMemoryFree(handle);
}

View File

@ -106,6 +106,8 @@ typedef struct SWorkThrd {
int8_t enableIpWhiteList;
int32_t connRefMgt;
int8_t inited;
} SWorkThrd;
typedef struct SServerObj {
@ -391,24 +393,27 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (pConn->status == ConnNormal && pHead->noResp == 0) {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%ld",
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%" PRId64
"",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%ld", transLabel(pInst),
pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost,
pTransMsg->info.seqNum, pTransMsg->info.qId);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
"seqNum:%d, qid:%ld",
"seqNum:%d, qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, qid:%ld",
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, "
"qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
}
@ -438,14 +443,14 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
if (p == NULL) {
code = TSDB_CODE_RPC_NO_STATE;
tTrace("conn %p recv release, and releady release by server qid:%ld", pConn, qId);
tTrace("conn %p recv release, and releady release by server qid:%" PRId64 "", pConn, qId);
} else {
SSvrRegArg* arg = p;
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p recv release, notify server app, qid:%ld", pConn, qId);
tTrace("conn %p recv release, notify server app, qid:%" PRId64 "", pConn, qId);
(void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
tTrace("conn %p clear state,qid:%ld", pConn, qId);
tTrace("conn %p clear state,qid:%" PRId64 "", pConn, qId);
}
STransMsg tmsg = {.code = code,
@ -572,8 +577,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.qId = taosHton64(pHead->qid);
transMsg.info.msgType = pHead->msgType;
// uvMaySetConnAcquired(pConn, pHead);
uvPerfLog_receive(pConn, pHead, &transMsg);
// set up conn info
@ -668,7 +671,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn,
tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%" PRId64 "", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg);
}
@ -679,7 +682,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn,
tGDebug("%s conn %p failed to send, seqNum:%d, qid:%" PRId64 ", reason:%s", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
destroySmsg(smsg);
}
@ -750,7 +753,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
}
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%d, qid:%ld", transLabel(pInst), pConn,
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%d, qid:%" PRId64 "", transLabel(pInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId);
wb->base = (char*)pHead;
@ -840,7 +843,7 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (p == NULL) {
tError("%s conn %p already release qid:%ld", transLabel(pConn->pInst), pConn, qid);
tError("%s conn %p already release qid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid);
return TSDB_CODE_RPC_NO_STATE;
} else {
transFreeMsg(p->msg.pCont);
@ -1024,7 +1027,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
// TODO(log other failure reason)
tWarn("failed to create connect:%p, reason: %s", q, uv_err_name(nread));
taosMemoryFree(buf->base);
// uv_close((uv_handle_t*)q, NULL);
uv_close((uv_handle_t*)q, NULL);
return;
}
// free memory allocated by
@ -1338,7 +1341,7 @@ void uvConnDestroyAllState(SSvrConn* p) {
SSvrRegArg* arg = pIter;
int64_t* qid = taosHashGetKey(pIter, NULL);
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p broken, notify server app, qid%ld", p, *qid);
tTrace("conn %p broken, notify server app, qid:%" PRId64 "", p, *qid);
pIter = taosHashIterate(pQTable, pIter);
}
@ -1524,7 +1527,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
srv->pThreadObj[i] = thrd;
thrd->pInst = pInit;
thrd->quit = false;
@ -1571,7 +1573,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pipe = &(srv->pipe[i][1]); // init read
thrd->fd = fds[0];
srv->pThreadObj[i] = thrd;
thrd->inited = 1;
if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
goto End;
}
@ -1715,6 +1719,7 @@ void destroyWorkThrdObj(SWorkThrd* pThrd) {
}
transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);
taosCloseRef(pThrd->connRefMgt);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}
@ -1722,15 +1727,13 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
if (pThrd == NULL) {
return;
}
(void)taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL);
transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
if (pThrd->inited) {
sendQuitToWorkThrd(pThrd);
(void)taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL);
}
destroyWorkThrdObj(pThrd);
}
void sendQuitToWorkThrd(SWorkThrd* pThrd) {
SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));