opt transport

This commit is contained in:
yihaoDeng 2024-09-08 06:58:44 +08:00
parent 5ae6b83d0e
commit 488cccd10e
2 changed files with 53 additions and 90 deletions

View File

@ -1335,6 +1335,16 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream);
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter) {
int64_t qid = *(int64_t*)pIter;
(void)taosHashRemove(pThrd->pIdConnTable, &qid, sizeof(qid));
pIter = taosHashIterate(conn->pQTable, pIter);
tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, qid);
}
cliDestroyConnMsgs(conn, true);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
@ -2278,51 +2288,6 @@ void cliConnFreeMsgs(SCliConn* conn) {
}
}
// bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
// for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
// SCliReq* pReq = transQueueGet(&conn->reqs, i);
// if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) {
// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn,
// conn->refId); transQueueRm(&conn->reqs, i); return true;
// }
// }
// }
// return false;
// }
// bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
// if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
// uint64_t ahandle = pHead->ahandle;
// SCliReq* pReq = NULL;
// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
// tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn,
// conn->refId);
// (void)transClearBuffer(&conn->readBuf);
// transFreeMsg(transContFromHead((char*)pHead));
// for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->reqs); i++) {
// SCliReq* pReq = transQueueGet(&conn->reqs, i);
// if (pReq->type == Release) {
// ASSERTS(pReq == NULL, "trans-cli recv invaid release-req");
// tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn,
// conn->refId);
// cliDestroyConn(conn, true);
// return true;
// }
// }
// cliConnFreeMsgs(conn);
// tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
// destroyReq(pReq);
// addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
// return true;
// }
// return false;
// }
static FORCE_INLINE void destroyReq(void* arg) {
SCliReq* pReq = arg;
if (pReq == NULL) {
@ -2330,7 +2295,7 @@ static FORCE_INLINE void destroyReq(void* arg) {
}
tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
destroyReqCtx(pReq->ctx);
if (pReq->ctx) destroyReqCtx(pReq->ctx);
transFreeMsg(pReq->msg.pCont);
taosMemoryFree(pReq);
}
@ -3562,11 +3527,11 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) {
if (pCli == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
}
pCli->type = FreeById;
pCli->type = Normal;
tDebug("release conn id %" PRId64 "", transpointId);
STransMsg msg = {.info.handle = (void*)transpointId};
STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId};
msg.info.qId = transpointId;
pCli->msg = msg;

View File

@ -146,8 +146,6 @@ static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle);
static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead);
/*
* time-consuming task throwed into BG work thread
*/
@ -429,12 +427,12 @@ static int8_t uvValidConn(SSvrConn* pConn) {
return forbiddenIp;
}
static int32_t uvHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
int32_t code = 0;
STrans* pInst = pConn->pInst;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qId = taosHton64(pHead->qid);
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
int64_t qId = taosHton64(pHead->qid);
if (pHead->msgType == TDMT_SCH_TASK_RELEASE && qId > 0) {
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
if (p == NULL) {
code = TSDB_CODE_RPC_NO_STATE;
tTrace("conn %p recv release, and releady release by server qid%ld", pConn, qId);
@ -498,7 +496,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
}
}
if (uvHandleReleaseReq(pConn, pHead)) {
if (uvMayHandleReleaseReq(pConn, pHead)) {
return true;
}
@ -770,7 +768,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
(void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
taosMemoryFree(pBuf);
}
int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) {
int32_t uvMayHandleReleaseResp(SSvrMsg* pMsg) {
SSvrConn* pConn = pMsg->pConn;
int64_t qid = pMsg->msg.info.qId;
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
@ -788,7 +786,7 @@ int32_t uvConnMayHandlsReleaseMsg(SSvrMsg* pMsg) {
static void uvStartSendResp(SSvrMsg* smsg) {
// impl
SSvrConn* pConn = smsg->pConn;
if (uvConnMayHandlsReleaseMsg(smsg) == TSDB_CODE_RPC_NO_STATE) {
if (uvMayHandleReleaseResp(smsg) == TSDB_CODE_RPC_NO_STATE) {
destroySmsg(smsg);
return;
}
@ -888,40 +886,40 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
taosMemoryFree(req);
}
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId;
(void)transClearBuffer(&pConn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
if (pConn->status != ConnAcquire) {
return true;
}
pConn->status = ConnRelease;
// static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
// if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
// tTrace("conn %p received release request", pConn);
// STraceId traceId = pHead->traceId;
// (void)transClearBuffer(&pConn->readBuf);
// transFreeMsg(transContFromHead((char*)pHead));
// if (pConn->status != ConnAcquire) {
// return true;
// }
// pConn->status = ConnRelease;
STransMsg tmsg = {.code = 0,
.info.handle = (void*)pConn,
.info.traceId = traceId,
.info.ahandle = (void*)0x9527,
.info.seqNum = htonl(pHead->seqNum)};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
srvMsg->pConn = pConn;
if (!transQueuePush(&pConn->srvMsgs, srvMsg)) {
return true;
}
if (pConn->regArg.init) {
tTrace("conn %p release, notify server app", pConn);
STrans* pInst = pConn->pInst;
(*pInst->cfp)(pInst->parent, &(pConn->regArg.msg), NULL);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
}
uvStartSendRespImpl(srvMsg);
return true;
}
return false;
}
// STransMsg tmsg = {.code = 0,
// .info.handle = (void*)pConn,
// .info.traceId = traceId,
// .info.ahandle = (void*)0x9527,
// .info.seqNum = htonl(pHead->seqNum)};
// SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
// srvMsg->msg = tmsg;
// srvMsg->type = Release;
// srvMsg->pConn = pConn;
// if (!transQueuePush(&pConn->srvMsgs, srvMsg)) {
// return true;
// }
// if (pConn->regArg.init) {
// tTrace("conn %p release, notify server app", pConn);
// STrans* pInst = pConn->pInst;
// (*pInst->cfp)(pInst->parent, &(pConn->regArg.msg), NULL);
// memset(&pConn->regArg, 0, sizeof(pConn->regArg));
// }
// uvStartSendRespImpl(srvMsg);
// return true;
// }
// return false;
// }
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
// only auth conn currently, add more func later