From 51f3ff3207b1c87c4f9613604661bc4d26df1c37 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 31 Aug 2024 09:00:03 +0800 Subject: [PATCH] fix unit test --- source/libs/transport/src/transCli.c | 114 ++++++++------------------- 1 file changed, 34 insertions(+), 80 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6afb1fac00..ceba448613 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -454,6 +454,7 @@ void cliResetConnTimer(SCliConn* conn) { conn->timer = NULL; } } + void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) { @@ -479,49 +480,6 @@ bool cliShouldAddConnToPool(SCliConn* conn) { return empty; } -void cliHandleResp_shareConn(SCliConn* conn) { - SCliThrd* pThrd = conn->hostThrd; - STrans* pInst = pThrd->pInst; - // cliResetConnTimer(conn); - - STransMsgHead* pHead = NULL; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 1); - - if (msgLen <= 0) { - taosMemoryFree(pHead); - tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); - return; - } - if (transDecompressMsg((char**)&pHead, msgLen) < 0) { - tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); - } - pHead->code = htonl(pHead->code); - pHead->msgLen = htonl(pHead->msgLen); - - STransMsg resp = {0}; - resp.contLen = transContLenFromMsg(pHead->msgLen); - resp.pCont = transContFromHead((char*)pHead); - resp.code = pHead->code; - resp.msgType = pHead->msgType; - resp.info.ahandle = NULL; - resp.info.traceId = pHead->traceId; - resp.info.hasEpSet = pHead->hasEpSet; - resp.info.cliVer = htonl(pHead->compatibilityVer); - - SCliReq* pReq = cliFindReqBySeq(conn, pHead->seqNum); - pReq->seq = 0; - - SReqCtx* pCtx = pReq->ctx; - resp.info.ahandle = pCtx ? pCtx->ahandle : NULL; - STraceId* trace = &resp.info.traceId; - - int32_t ret = cliNotifyCb(conn, pReq, &resp); - if (ret != 0) { - return; - } else { - destroyReq(pReq); - } -} int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int32_t code = 0; @@ -536,27 +494,36 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { return TSDB_CODE_OUT_OF_RANGE; } -int32_t cliMayRecycleConn(SCliConn* conn) { +int8_t cliMayRecycleConn(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; if (transQueueSize(&conn->reqs) == 0) { addConnToPool(pThrd->pool, conn); + return 1; } return 0; } + +int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) { + pResp->contLen = transContLenFromMsg(pHead->msgLen); + pResp->pCont = transContFromHead((char*)pHead); + pResp->code = pReq->msg.code; + pResp->msgType = pReq->msg.msgType; + pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; + pResp->info.traceId = pReq->msg.info.traceId; + pResp->info.hasEpSet = pReq->msg.info.hasEpSet; + pResp->info.cliVer = pReq->msg.info.cliVer; + return 0; +} void cliHandleResp2(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; cliResetConnTimer(conn); - if (pInst->shareConn) { - return cliHandleResp_shareConn(conn); - } - - STransMsgHead* pHead = NULL; // int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0); + STransMsgHead* pHead = NULL; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0); if (msgLen < 0) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); @@ -588,38 +555,31 @@ void cliHandleResp2(SCliConn* conn) { // } STransMsg resp = {0}; - resp.contLen = transContLenFromMsg(pHead->msgLen); - resp.pCont = transContFromHead((char*)pHead); - resp.code = pHead->code; - resp.msgType = pHead->msgType; - resp.info.ahandle = NULL; - resp.info.traceId = pHead->traceId; - resp.info.hasEpSet = pHead->hasEpSet; - resp.info.cliVer = htonl(pHead->compatibilityVer); - resp.info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; - + code = cliBuildRespFromCont(pReq, &resp, pHead); if (code != 0) { tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); } + code = cliNotifyCb(conn, pReq, &resp); if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { STraceId* trace = &resp.info.traceId; tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn)); // retry, notify } else { + destroyReq(pReq); } - (void)cliMayRecycleConn(conn); - return; + if (cliMayRecycleConn(conn)) { + return; + } + (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } + void cliHandleResp(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; cliResetConnTimer(conn); - // if (pInst->shareConn) { - // return cliHandleResp_shareConn(conn); - // } STransMsgHead* pHead = NULL; @@ -1686,7 +1646,6 @@ int32_t cliSend(SCliConn* pConn) { STrans* pInst = pThrd->pInst; SCliReq* pCliReq = NULL; int32_t code = cliConnFindToSendMsg(pConn, &pCliReq); - if (code != 0) { return code; } @@ -1699,7 +1658,6 @@ int32_t cliSend(SCliConn* pConn) { if (pReq->pCont == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - tDebug("malloc memory: %p", pReq->pCont); pReq->contLen = 0; } @@ -1719,12 +1677,10 @@ int32_t cliSend(SCliConn* pConn) { pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; pHead->compatibilityVer = htonl(pInst->compatibilityVer); + pHead->seqNum = pConn->seq++; } - pHead->timestamp = taosHton64(taosGetTimestampUs()); - if (pHead->persist == 1) { - CONN_SET_PERSIST_BY_APP(pConn); - } + pHead->timestamp = taosHton64(taosGetTimestampUs()); STraceId* trace = &pReq->info.traceId; @@ -1741,24 +1697,22 @@ int32_t cliSend(SCliConn* pConn) { TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); - if (req == NULL) { - tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - cliHandleExcept(pConn, -1); - return TSDB_CODE_OUT_OF_MEMORY; + uv_write_t* aReq = transReqQueuePush(&pConn->wreqQueue); + if (aReq == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } pCliReq->sent = 1; - int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); + int status = uv_write(aReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), uv_err_name(status)); - cliHandleExcept(pConn, -1); - return TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _exception); } return TSDB_CODE_RPC_ASYNC_IN_PROCESS; +_exception: + return code; } static void cliDestroyBatch(SCliBatch* pBatch) {