refactor transport

This commit is contained in:
Yihao Deng 2024-07-03 02:53:54 +00:00
parent cdaa4fa47e
commit 93d391beb1
2 changed files with 62 additions and 80 deletions

View File

@ -398,41 +398,13 @@ void cliResetConnTimer(SCliConn* conn) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
}
void cliHandleBatchResp(SCliConn* conn) {
ASSERT(0);
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
cliResetConnTimer(conn);
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
transMsg.code = pHead->code;
transMsg.msgType = pHead->msgType;
transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = pMsg->ctx;
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
return;
}
}
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) {
SCliMsg* pMsg = NULL;
@ -487,8 +459,6 @@ void cliHandleResp_shareConn(SCliConn* conn) {
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
return;
}
return;
}
void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
@ -1387,7 +1357,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
STrans* pTransInst = pThrd->pTransInst;
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip);
if (ipaddr == 0xffffffff) {
if (ipaddr == (uint32_t)(-1)) {
cliResetConnTimer(conn);
cliHandleFastFail(conn, -1);
return;
@ -1399,6 +1369,7 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
addr.sin_port = (uint16_t)htons(port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
@ -1406,6 +1377,7 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
cliHandleFastFail(conn, -1);
return;
}
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) {
tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
@ -1426,7 +1398,15 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
cliHandleFastFail(conn, -1);
return;
}
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
if (ret != 0) {
tError("%s conn %p failed to start timer, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliResetConnTimer(conn);
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1);
return;
}
return;
}
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
@ -1462,16 +1442,15 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
cliSendBatch(conn);
}
static void cliSendBatchCb(uv_write_t* req, int status) {
SCliConn* conn = req->data;
SCliThrd* thrd = conn->hostThrd;
SCliConn* conn = req->data;
SCliThrd* thrd = conn->hostThrd;
SCliBatch* p = conn->pBatch;
SCliBatchList* pBatchList = p->pList;
SCliBatch* nxtBatch = cliGetHeadFromList(pBatchList);
pBatchList->connCnt -= 1;
conn->pBatch = NULL;
SCliBatch* nxtBatch = cliGetHeadFromList(p->pList);
p->pList->connCnt -= 1;
if (status != 0) {
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status));
@ -1491,7 +1470,6 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
}
} else {
cliDestroyBatch(nxtBatch);
// conn release by other callback
}
}
@ -2649,23 +2627,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
cliSchedMsgToNextNode(pMsg, pThrd);
return true;
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (pMsg == NULL || pMsg->ctx == NULL) {
tTrace("%s conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
STransConnCtx* pCtx = pMsg->ctx;
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
if (retry == true) {
return -1;
}
void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) {
if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
int32_t code = pResp->code;
// return internal code app
@ -2683,6 +2645,24 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
}
}
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (pMsg == NULL || pMsg->ctx == NULL) {
tTrace("%s conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
if (retry == true) {
return -1;
}
STransConnCtx* pCtx = pMsg->ctx;
cliMayReSetRespCode(pCtx, pResp);
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
@ -2817,7 +2797,13 @@ int transReleaseCliHandle(void* handle) {
}
static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
epsetAssign(&pCtx->epSet, pEpSet);
epsetAssign(&pCtx->origEpSet, pEpSet);
@ -2827,6 +2813,12 @@ static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pRe
if (ctx != NULL) pCtx->appCtx = *ctx;
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (cliMsg == NULL) {
taosMemoryFree(pCtx);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
@ -3059,12 +3051,16 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
return TSDB_CODE_RPC_BROKEN_LINK;
}
SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) {
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
cvtAddr.cvt = true;
if (ip == NULL || fqdn == NULL) {
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_FQDN_ERROR;
}
SCvtAddr cvtAddr = {0};
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
cvtAddr.cvt = true;
for (int i = 0; i < pTransInst->numOfThreads; i++) {
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->cvtAddr = cvtAddr;

View File

@ -421,15 +421,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
return true;
}
// TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
STransMsg transMsg;
memset(&transMsg, 0, sizeof(transMsg));
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType;
@ -942,14 +934,9 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return;
}
// uv_handle_type pending = uv_pipe_pending_type(pipe);
SSvrConn* pConn = createConn(pThrd);
pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/
// uv_timer_init(pThrd->loop, &pConn->pTimer);
// pConn->pTimer.data = pConn;
pConn->hostThrd = pThrd;
@ -1107,6 +1094,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
STrans* pTransInst = pThrd->pTransInst;
pConn->refId = exh->refId;
QUEUE_INIT(&exh->q);
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
@ -1618,5 +1606,3 @@ void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
}
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }