opt transport

This commit is contained in:
yihaoDeng 2024-09-07 20:08:48 +08:00
parent 94891d5bff
commit 5ae6b83d0e
2 changed files with 91 additions and 61 deletions

View File

@ -209,7 +209,7 @@ static void cliSendBatchCb(uv_write_t* req, int status);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static int32_t allocConnRef(SCliConn* conn, bool update);
@ -508,12 +508,15 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->info.hasEpSet = pHead->hasEpSet;
pResp->info.cliVer = htonl(pHead->compatibilityVer);
pResp->info.seqNum = htonl(pHead->seqNum);
int64_t qid = taosHton64(pHead->qid);
pResp->info.handle = (void*)qid;
return 0;
}
int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
int64_t qId = taosHton64(pHead->qid);
code = taosHashRemove(conn->pQTable, &qId, sizeof(qId));
if (code != 0) {
@ -524,7 +527,8 @@ int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) {
if (code != 0) {
tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId);
}
tDebug("%s conn %p release req %ld", CONN_GET_INST_LABEL(conn), conn, qId);
STraceId* trace = &pHead->traceId;
tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId);
for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReqs = transQueueGet(&conn->reqs, i);
@ -1957,9 +1961,11 @@ int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState == NULL) {
tDebug("failed to get statue, qid:%ld", qid);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else {
*pConn = pState->conn;
tDebug("succ to get conn of statue, qid:%ld", qid);
}
return code;
}
@ -1972,10 +1978,24 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
}
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState != 0) {
tDebug("succ to get conn %p of statue, qid:%ld", pConn, qid);
ASSERT(0);
}
SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
if (code != 0) {
tDebug("failed to add conn %p of statue, qid:%ld", pConn, qid);
} else {
tDebug("succ to add conn %p of statue, qid:%ld (1)", pConn, qid);
}
int32_t dummy = 0;
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &dummy, sizeof(dummy));
if (code != 0) {
tDebug("failed to add conn %p of statue, qid:%ld", pConn, qid);
} else {
tDebug("succ to add conn %p of statue, qid:%ld(2)", pConn, qid);
}
return code;
}
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
@ -2007,8 +2027,8 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
addConnToHeapCache(pThrd->connHeapCache, pConn);
}
}
code = cliMayUpdateState(pThrd, pReq, pConn);
}
code = cliMayUpdateState(pThrd, pReq, pConn);
code = cliSendReq(pConn, pReq);
tTrace("%s conn %p ready", pInst->label, pConn);
@ -2258,51 +2278,50 @@ void cliConnFreeMsgs(SCliConn* conn) {
}
}
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) {
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
transQueueRm(&conn->reqs, i);
return true;
}
}
}
return false;
}
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
uint64_t ahandle = pHead->ahandle;
SCliReq* pReq = NULL;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn,
conn->refId);
// bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
// for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
// SCliReq* pReq = transQueueGet(&conn->reqs, i);
// if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) {
// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn,
// conn->refId); transQueueRm(&conn->reqs, i); return true;
// }
// }
// }
// return false;
// }
// bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
// uint64_t ahandle = pHead->ahandle;
// SCliReq* pReq = NULL;
// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
// tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn,
// conn->refId);
(void)transClearBuffer(&conn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
// (void)transClearBuffer(&conn->readBuf);
// transFreeMsg(transContFromHead((char*)pHead));
for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->type == Release) {
ASSERTS(pReq == NULL, "trans-cli recv invaid release-req");
tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn,
conn->refId);
cliDestroyConn(conn, true);
return true;
}
}
// for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) {
// SCliReq* pReq = transQueueGet(&conn->reqs, i);
// if (pReq->type == Release) {
// ASSERTS(pReq == NULL, "trans-cli recv invaid release-req");
// tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn,
// conn->refId);
// cliDestroyConn(conn, true);
// return true;
// }
// }
cliConnFreeMsgs(conn);
// cliConnFreeMsgs(conn);
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
destroyReq(pReq);
// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
// destroyReq(pReq);
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
return true;
}
return false;
}
// addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
// return true;
// }
// return false;
// }
static FORCE_INLINE void destroyReq(void* arg) {
SCliReq* pReq = arg;
@ -3086,6 +3105,7 @@ int32_t transReleaseCliHandle(void* handle) {
.info.handle = handle,
.info.ahandle = (void*)0x9527,
.info.qId = (int64_t)handle};
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx));
@ -3518,7 +3538,7 @@ int32_t transAllocHandle(int64_t* refId) {
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
tDebug("pre alloc refId %" PRId64 "", exh->refId);
tDebug("alloc qid:%ld", exh->refId);
*refId = exh->refId;
return 0;
}

View File

@ -390,21 +390,26 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (pConn->status == ConnNormal && pHead->noResp == 0) {
// transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pInst),
pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%ld",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pInst), pConn,
TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%ld", transLabel(pInst),
pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost,
pTransMsg->info.seqNum, pTransMsg->info.qId);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost));
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
"seqNum:%d, qid:%ld",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pInst),
pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp,
pTransMsg->code, (int)(cost));
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, qid:%ld",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
}
}
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn,
@ -502,7 +507,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
transMsg.info.qId = taosHton64(pHead->qid);
if (transMsg.info.qId > 0) {
// int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg));
@ -531,6 +535,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.forbiddenIp = forbiddenIp;
transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
transMsg.info.seqNum = htonl(pHead->seqNum);
transMsg.info.qId = taosHton64(pHead->qid);
// uvMaySetConnAcquired(pConn, pHead);
@ -767,8 +772,8 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
}
int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) {
SSvrConn* pConn = pMsg->pConn;
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qid = pMsg->msg.info.qId;
int64_t qid = pMsg->msg.info.qId;
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (p == NULL) {
tError("%s conn %p already release qid %ld", transLabel(pConn->pInst), pConn, qid);
@ -1736,8 +1741,13 @@ int32_t transReleaseSrvHandle(void* handle) {
SWorkThrd* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
STransMsg tmsg = {
.msgType = TDMT_SCH_TASK_RELEASE, .code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE,
.code = 0,
.info.handle = exh,
.info.ahandle = NULL,
.info.refId = refId,
.info.qId = qId,
.info.traceId = info->traceId};
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
if (m == NULL) {
@ -1748,7 +1758,7 @@ int32_t transReleaseSrvHandle(void* handle) {
m->msg = tmsg;
m->type = Normal;
tDebug("%s conn %p start to %p, qId:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
tDebug("%s conn %p start to send %s, qid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
qId);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);