From 93d391beb1edcf4032c8ec75d3aea80efd8e5441 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 3 Jul 2024 02:53:54 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 124 +++++++++++++-------------- source/libs/transport/src/transSvr.c | 18 +--- 2 files changed, 62 insertions(+), 80 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index adb6223a0f..aa04bc336f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -398,41 +398,13 @@ void cliResetConnTimer(SCliConn* conn) { tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); uv_timer_stop(conn->timer); } + taosArrayPush(pThrd->timerList, &conn->timer); conn->timer->data = NULL; conn->timer = NULL; } } -void cliHandleBatchResp(SCliConn* conn) { - ASSERT(0); - SCliThrd* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - cliResetConnTimer(conn); - - STransMsgHead* pHead = NULL; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); - if (transDecompressMsg((char**)&pHead, msgLen) < 0) { - tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); - } - pHead->code = htonl(pHead->code); - pHead->msgLen = htonl(pHead->msgLen); - - STransMsg transMsg = {0}; - transMsg.contLen = transContLenFromMsg(pHead->msgLen); - transMsg.pCont = transContFromHead((char*)pHead); - transMsg.code = pHead->code; - transMsg.msgType = pHead->msgType; - transMsg.info.ahandle = NULL; - transMsg.info.traceId = pHead->traceId; - transMsg.info.hasEpSet = pHead->hasEpSet; - transMsg.info.cliVer = htonl(pHead->compatibilityVer); - - SCliMsg* pMsg = NULL; - STransConnCtx* pCtx = pMsg->ctx; - if (cliAppCb(conn, &transMsg, pMsg) != 0) { - return; - } -} +void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) { SCliMsg* pMsg = NULL; @@ -487,8 +459,6 @@ void cliHandleResp_shareConn(SCliConn* conn) { if (cliAppCb(conn, &transMsg, pMsg) != 0) { return; } - - return; } void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; @@ -1387,7 +1357,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { STrans* pTransInst = pThrd->pTransInst; uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip); - if (ipaddr == 0xffffffff) { + if (ipaddr == (uint32_t)(-1)) { cliResetConnTimer(conn); cliHandleFastFail(conn, -1); return; @@ -1399,6 +1369,7 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { addr.sin_port = (uint16_t)htons(port); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1406,6 +1377,7 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { cliHandleFastFail(conn, -1); return; } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); @@ -1426,7 +1398,15 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { cliHandleFastFail(conn, -1); return; } - uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + + ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + if (ret != 0) { + tError("%s conn %p failed to start timer, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliResetConnTimer(conn); + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + cliHandleFastFail(conn, -1); + return; + } return; } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { @@ -1462,16 +1442,15 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliSendBatch(conn); } static void cliSendBatchCb(uv_write_t* req, int status) { - SCliConn* conn = req->data; - SCliThrd* thrd = conn->hostThrd; + SCliConn* conn = req->data; + SCliThrd* thrd = conn->hostThrd; + SCliBatch* p = conn->pBatch; - - SCliBatchList* pBatchList = p->pList; - SCliBatch* nxtBatch = cliGetHeadFromList(pBatchList); - pBatchList->connCnt -= 1; - conn->pBatch = NULL; + SCliBatch* nxtBatch = cliGetHeadFromList(p->pList); + p->pList->connCnt -= 1; + if (status != 0) { tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize, uv_err_name(status)); @@ -1491,7 +1470,6 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } } else { cliDestroyBatch(nxtBatch); - // conn release by other callback } } @@ -2649,23 +2627,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { cliSchedMsgToNextNode(pMsg, pThrd); return true; } -int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - - if (pMsg == NULL || pMsg->ctx == NULL) { - tTrace("%s conn %p handle resp", pTransInst->label, pConn); - pTransInst->cfp(pTransInst->parent, pResp, NULL); - return 0; - } - - STransConnCtx* pCtx = pMsg->ctx; - - bool retry = cliGenRetryRule(pConn, pResp, pMsg); - if (retry == true) { - return -1; - } - +void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { if (pCtx->retryCode != TSDB_CODE_SUCCESS) { int32_t code = pResp->code; // return internal code app @@ -2683,6 +2645,24 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK; } } +} +int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + if (pMsg == NULL || pMsg->ctx == NULL) { + tTrace("%s conn %p handle resp", pTransInst->label, pConn); + pTransInst->cfp(pTransInst->parent, pResp, NULL); + return 0; + } + + bool retry = cliGenRetryRule(pConn, pResp, pMsg); + if (retry == true) { + return -1; + } + + STransConnCtx* pCtx = pMsg->ctx; + cliMayReSetRespCode(pCtx, pResp); STraceId* trace = &pResp->info.traceId; bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); @@ -2817,7 +2797,13 @@ int transReleaseCliHandle(void* handle) { } static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); @@ -2827,6 +2813,12 @@ static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pRe if (ctx != NULL) pCtx->appCtx = *ctx; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -3059,12 +3051,16 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { return TSDB_CODE_RPC_BROKEN_LINK; } - SCvtAddr cvtAddr = {0}; - if (ip != NULL && fqdn != NULL) { - tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip)); - tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); - cvtAddr.cvt = true; + if (ip == NULL || fqdn == NULL) { + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_FQDN_ERROR; } + + SCvtAddr cvtAddr = {0}; + tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip)); + tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); + cvtAddr.cvt = true; + for (int i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->cvtAddr = cvtAddr; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 57ea89b82a..907f132d32 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -421,15 +421,7 @@ static bool uvHandleReq(SSvrConn* pConn) { return true; } - // TODO(dengyihao): time-consuming task throwed into BG Thread - // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); - // wreq->data = pConn; - // uv_read_stop((uv_stream_t*)pConn->pTcp); - // transRefSrvHandle(pConn); - // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - - STransMsg transMsg; - memset(&transMsg, 0, sizeof(transMsg)); + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; @@ -942,14 +934,9 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - // uv_handle_type pending = uv_pipe_pending_type(pipe); - SSvrConn* pConn = createConn(pThrd); pConn->pTransInst = pThrd->pTransInst; - /* init conn timer*/ - // uv_timer_init(pThrd->loop, &pConn->pTimer); - // pConn->pTimer.data = pConn; pConn->hostThrd = pThrd; @@ -1107,6 +1094,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; + QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); @@ -1618,5 +1606,3 @@ void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { } transReleaseExHandle(transGetInstMgt(), (int64_t)thandle); } - -int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }