diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c23f7b781f..448dd7796f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -428,7 +428,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { return false; } -int32_t cliGetTimerFrom(SCliThrd* pThrd, SCliConn* pConn) { +int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) { uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -482,7 +482,7 @@ bool cliShouldAddConnToPool(SCliConn* conn) { void cliHandleResp_shareConn(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - cliResetConnTimer(conn); + // cliResetConnTimer(conn); STransMsgHead* pHead = NULL; int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 1); @@ -498,6 +498,80 @@ void cliHandleResp_shareConn(SCliConn* conn) { pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + STransMsg resp = {0}; + resp.contLen = transContLenFromMsg(pHead->msgLen); + resp.pCont = transContFromHead((char*)pHead); + resp.code = pHead->code; + resp.msgType = pHead->msgType; + resp.info.ahandle = NULL; + resp.info.traceId = pHead->traceId; + resp.info.hasEpSet = pHead->hasEpSet; + resp.info.cliVer = htonl(pHead->compatibilityVer); + + SCliReq* pReq = cliFindReqBySeq(conn, pHead->seqNum); + pReq->seq = 0; + + SReqCtx* pCtx = pReq->ctx; + resp.info.ahandle = pCtx ? pCtx->ahandle : NULL; + STraceId* trace = &resp.info.traceId; + + int32_t ret = cliNotifyCb(conn, pReq, &resp); + if (ret != 0) { + return; + } else { + destroyReq(pReq); + } +} + +int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { + int32_t code = 0; + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* p = transQueueGet(&conn->reqs, i); + if (p->seq == seq) { + transQueueRm(&conn->reqs, i); + *pReq = p; + return 0; + } + } + return TSDB_CODE_OUT_OF_RANGE; +} +void cliHandleResp2(SCliConn* conn) { + int32_t code = 0; + SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; + + cliResetConnTimer(conn); + if (pInst->shareConn) { + return cliHandleResp_shareConn(conn); + } + + STransMsgHead* pHead = NULL; + + // int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0); + if (msgLen < 0) { + taosMemoryFree(pHead); + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + // TODO: notify cb + pThrd->notifyExceptCb(pThrd, NULL, NULL); + return; + } + + if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { + tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); + // TODO: notify cb + } + + SCliReq* pReq = NULL; + int32_t seq = pHead->seqNum; + code = cliGetReqBySeq(conn, seq, &pReq); + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + // TODO handle release req + // if (cliRecvReleaseReq(conn, pHead)) { + // return; + // } + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -508,29 +582,21 @@ void cliHandleResp_shareConn(SCliConn* conn) { transMsg.info.hasEpSet = pHead->hasEpSet; transMsg.info.cliVer = htonl(pHead->compatibilityVer); - SCliReq* pReq = cliFindReqBySeq(conn, pHead->seqNum); - pReq->seq = 0; - - SReqCtx* pCtx = pReq->ctx; - transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; - STraceId* trace = &transMsg.info.traceId; - - int32_t ret = cliNotifyCb(conn, pReq, &transMsg); - if (ret != 0) { - return; - } else { - destroyReq(pReq); + if (code != 0) { + tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); } + + return; } void cliHandleResp(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; + cliResetConnTimer(conn); if (pInst->shareConn) { return cliHandleResp_shareConn(conn); } - cliResetConnTimer(conn); STransMsgHead* pHead = NULL; @@ -540,6 +606,7 @@ void cliHandleResp(SCliConn* conn) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); // TODO: notify cb + pThrd->notifyExceptCb(pThrd, NULL, NULL); return; } @@ -1267,7 +1334,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); - TAOS_CHECK_GOTO(cliGetTimerFrom(pThrd, conn), &lino, _failed); + TAOS_CHECK_GOTO(cliGetConnTimer(pThrd, conn), &lino, _failed); // read/write stream handle conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); @@ -1379,25 +1446,25 @@ static void cliSendCb(uv_write_t* req, int status) { } static void cliHandleBatch_shareConnExcept(SCliConn* conn) { - int32_t code = -1; + int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; while (!transQueueEmpty(&conn->reqs)) { SCliReq* pReq = transQueuePop(&conn->reqs); - ASSERT(pReq->type != Release); - ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); + // ASSERT(pReq->type != Release); + // ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); SReqCtx* pCtx = pReq ? pReq->ctx : NULL; - STransMsg transMsg = {0}; - transMsg.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; - transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0; - transMsg.info.ahandle = NULL; - transMsg.info.cliVer = pInst->compatibilityVer; - transMsg.info.ahandle = pCtx->ahandle; + STransMsg resp = {0}; + resp.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; + resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; + resp.info.ahandle = NULL; + resp.info.cliVer = pInst->compatibilityVer; + resp.info.ahandle = pCtx->ahandle; pReq->seq = 0; - code = cliNotifyCb(conn, pReq, &transMsg); + code = cliNotifyCb(conn, pReq, &resp); if (code != 0) { continue; } else { @@ -1484,6 +1551,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pCliMsg->sent = 1; pCliMsg->seq = pHead->seqNum; } + uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; pConn->shareCnt += 1; @@ -1784,7 +1852,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->pBatch = pBatch; code = cliDoConn(pThrd, conn); if (code != 0) { - } return; }