diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7bda574ff6..050069d6ea 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -107,6 +107,10 @@ typedef struct SCliConn { int32_t readerStart; queue wq; // uv_write_t queue + + queue batchSendq; + int8_t inThreadSendq; + } SCliConn; typedef struct { @@ -126,6 +130,7 @@ typedef struct SCliReq { int64_t seq; int32_t sent; //(0: no send, 1: alread sent) STransMsg msg; + int8_t inRetry; } SCliReq; @@ -164,6 +169,7 @@ typedef struct SCliThrd { SHashObj* pIdConnTable; // SArray* pQIdBuf; // tmp buf to avoid alloc buf; + queue batchSendSet; } SCliThrd; typedef struct SCliObj { @@ -192,7 +198,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); static void cliBatchSendCb(uv_write_t* req, int status); void cliBatchSendImpl(SCliConn* pConn); -static int32_t cliBatchSend(SCliConn* conn); +static int32_t cliBatchSend(SCliConn* conn, int8_t direct); void cliConnCheckTimoutMsg(SCliConn* conn); bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); // register conn timer @@ -1039,6 +1045,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int TAOS_CHECK_GOTO(initWQ(&conn->wq), NULL, _failed); + QUEUE_INIT(&conn->batchSendq); + conn->stream->data = conn; conn->connReq.data = conn; @@ -1278,7 +1286,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { } if (!cliMayRecycleConn(conn)) { - code = cliBatchSend(conn); + code = cliBatchSend(conn, 1); if (code != 0) { tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); TAOS_UNUSED(transUnrefCliHandle(conn)); @@ -1307,7 +1315,7 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg pConn->userInited = 1; return true; } -int32_t cliBatchSend(SCliConn* pConn) { +int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { int32_t code = 0; SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; @@ -1319,6 +1327,17 @@ int32_t cliBatchSend(SCliConn* pConn) { if (pConn->connnected != 1) { return 0; } + + if (!direct) { + if (pConn->inThreadSendq) { + return 0; + } + QUEUE_PUSH(&pThrd->batchSendSet, &pConn->batchSendq); + pConn->inThreadSendq = 1; + tDebug("%s conn %p batch send later", pInst->label, pConn); + return 0; + } + int32_t size = transQueueSize(&pConn->reqsToSend); int32_t totalLen = 0; @@ -1420,7 +1439,20 @@ int32_t cliBatchSend(SCliConn* pConn) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { transQueuePush(&pConn->reqsToSend, &pCliMsg->q); - return cliBatchSend(pConn); + return cliBatchSend(pConn, pCliMsg->inRetry); +} +int32_t cliSendReqPrepare(SCliConn* pConn, SCliReq* pCliMsg) { + transQueuePush(&pConn->reqsToSend, &pCliMsg->q); + + if (pConn->broken) { + return 0; + } + + if (pConn->connnected != 1) { + return 0; + } + // return cliBatchSend(pConn); + return 0; } static void cliDestroyBatch(SCliBatch* pBatch) { @@ -1577,7 +1609,7 @@ void cliConnCb(uv_connect_t* req, int status) { } tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - code = cliBatchSend(pConn); + code = cliBatchSend(pConn, 1); if (code != 0) { tDebug("%s conn %p failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); TAOS_UNUSED(transUnrefCliHandle(pConn)); @@ -1906,13 +1938,13 @@ void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleBatchReq(pTh static void cliDoReq(queue* wq, SCliThrd* pThrd) { int count = 0; + QUEUE_INIT(&pThrd->batchSendSet); while (!QUEUE_IS_EMPTY(wq)) { queue* h = QUEUE_HEAD(wq); QUEUE_REMOVE(h); SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - if (pReq->type == Quit) { pThrd->stopMsg = pReq; continue; @@ -1920,6 +1952,17 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) { (*cliAsyncHandle[pReq->type])(pThrd, pReq); count++; } + + while (!QUEUE_IS_EMPTY(&pThrd->batchSendSet)) { + queue* el = QUEUE_HEAD(&pThrd->batchSendSet); + QUEUE_REMOVE(el); + + SCliConn* conn = QUEUE_DATA(el, SCliConn, batchSendq); + conn->inThreadSendq = 0; + QUEUE_INIT(&conn->batchSendq); + cliBatchSend(conn, 1); + } + QUEUE_INIT(&pThrd->batchSendSet); if (count >= 2) { tTrace("cli process batch size:%d", count); } @@ -2467,6 +2510,11 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pInst) { } static FORCE_INLINE void doDelayTask(void* param) { STaskArg* arg = param; + + if (arg && arg->param1) { + SCliReq* pReq = arg->param1; + pReq->inRetry = 1; + } cliHandleReq((SCliThrd*)arg->param2, (SCliReq*)arg->param1); taosMemoryFree(arg); }