refactor transport

This commit is contained in:
Yihao Deng 2024-08-29 03:24:02 +00:00
parent a45b9b2d6f
commit 7d81d4f18e
1 changed files with 50 additions and 25 deletions

View File

@ -535,6 +535,14 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
} }
return TSDB_CODE_OUT_OF_RANGE; return TSDB_CODE_OUT_OF_RANGE;
} }
int32_t cliMayRecycleConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
if (transQueueSize(&conn->reqs) == 0) {
addConnToPool(pThrd->pool, conn);
}
return 0;
}
void cliHandleResp2(SCliConn* conn) { void cliHandleResp2(SCliConn* conn) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
@ -560,11 +568,18 @@ void cliHandleResp2(SCliConn* conn) {
if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb // TODO: notify cb
return;
} }
SCliReq* pReq = NULL; SCliReq* pReq = NULL;
int32_t seq = pHead->seqNum; int32_t seq = pHead->seqNum;
code = cliGetReqBySeq(conn, seq, &pReq); code = cliGetReqBySeq(conn, seq, &pReq);
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
// TODO: notify cb
return;
}
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
// TODO handle release req // TODO handle release req
@ -572,20 +587,28 @@ void cliHandleResp2(SCliConn* conn) {
// return; // return;
// } // }
STransMsg transMsg = {0}; STransMsg resp = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen); resp.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead); resp.pCont = transContFromHead((char*)pHead);
transMsg.code = pHead->code; resp.code = pHead->code;
transMsg.msgType = pHead->msgType; resp.msgType = pHead->msgType;
transMsg.info.ahandle = NULL; resp.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId; resp.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet; resp.info.hasEpSet = pHead->hasEpSet;
transMsg.info.cliVer = htonl(pHead->compatibilityVer); resp.info.cliVer = htonl(pHead->compatibilityVer);
resp.info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL;
if (code != 0) { if (code != 0) {
tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
} }
code = cliNotifyCb(conn, pReq, &resp);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
STraceId* trace = &resp.info.traceId;
tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn));
// retry, notify
} else {
}
(void)cliMayRecycleConn(conn);
return; return;
} }
void cliHandleResp(SCliConn* conn) { void cliHandleResp(SCliConn* conn) {
@ -594,9 +617,9 @@ void cliHandleResp(SCliConn* conn) {
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
cliResetConnTimer(conn); cliResetConnTimer(conn);
if (pInst->shareConn) { // if (pInst->shareConn) {
return cliHandleResp_shareConn(conn); // return cliHandleResp_shareConn(conn);
} // }
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
@ -2715,6 +2738,9 @@ int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
int32_t notfiyCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { int32_t notfiyCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
// impl later // impl later
SCliThrd* pThrd = thrd;
STrans* pInst = pThrd->pInst;
return 0; return 0;
} }
@ -3176,7 +3202,7 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false); noDelay = cliResetEpset(pCtx, pResp, false);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn); // transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
@ -3184,16 +3210,16 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
addConnToPool(pThrd->pool, pConn); // addConnToPool(pThrd->pool, pConn);
} else if (code == TSDB_CODE_SYN_RESTORING) { } else if (code == TSDB_CODE_SYN_RESTORING) {
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
addConnToPool(pThrd->pool, pConn); // addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
} else { } else {
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false); noDelay = cliResetEpset(pCtx, pResp, false);
addConnToPool(pThrd->pool, pConn); // addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
} }
if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
@ -3283,15 +3309,14 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
if (pReq == NULL || pReq->ctx == NULL) { // if (pReq == NULL || pReq->ctx == NULL) {
tTrace("%s conn %p handle resp", pInst->label, pConn); // tTrace("%s conn %p handle resp", pInst->label, pConn);
pInst->cfp(pInst->parent, pResp, NULL); // pInst->cfp(pInst->parent, pResp, NULL);
return 0; // return 0;
} // }
bool retry = cliMayRetry(pConn, pReq, pResp); if (cliMayRetry(pConn, pReq, pResp)) {
if (retry == true) { return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
return -1;
} }
cliMayResetRespCode(pReq, pResp); cliMayResetRespCode(pReq, pResp);