diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b7bc428901..0932241abf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -30,6 +30,7 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; void* data; + SArray* cliMsgs; queue conn; uint64_t expireTime; int hThrdIdx; @@ -106,6 +107,7 @@ static void cliAsyncCb(uv_async_t* handle); static SCliConn* cliCreateConn(SCliThrdObj* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); +static void cliSend(SCliConn* pConn); // process data read from server, add decompress etc later static void cliHandleResp(SCliConn* conn); @@ -158,6 +160,14 @@ static void destroyThrdObj(SCliThrdObj* pThrd); static void* cliWorkThread(void* arg); +bool cliMayContinueSendMsg(SCliConn* conn) { + if (taosArrayGetSize(conn->cliMsgs) > 0) { + cliSend(conn); + return true; + } else { + return false; + } +} void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -173,18 +183,18 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.ahandle = NULL; - SCliMsg* pMsg = conn->data; + SCliMsg* pMsg = NULL; + if (taosArrayGetSize(conn->cliMsgs) > 0) { + pMsg = taosArrayGetP(conn->cliMsgs, 0); + taosArrayRemove(conn->cliMsgs, 0); + } + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } - // if (rpcMsg.ahandle == NULL) { - // tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn); - // return; - //} - // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); @@ -214,12 +224,15 @@ void cliHandleResp(SCliConn* conn) { memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); tsem_post(pCtx->pSem); } + destroyCmsg(pMsg); + + if (cliMayContinueSendMsg(conn) == true) { + return; + } if (CONN_NO_PERSIST_BY_APP(conn)) { addConnToPool(pThrd->pool, conn); } - destroyCmsg(conn->data); - conn->data = NULL; uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); // start thread's timer of conn pool if not active @@ -229,7 +242,7 @@ void cliHandleResp(SCliConn* conn) { } void cliHandleExcept(SCliConn* pConn) { - if (pConn->data == NULL) { + if (taosArrayGetSize(pConn->cliMsgs) == 0) { if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { transUnrefCliHandle(pConn); return; @@ -238,32 +251,38 @@ void cliHandleExcept(SCliConn* pConn) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - SCliMsg* pMsg = pConn->data; - STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; + do { + SCliMsg* pMsg = NULL; + if (taosArrayGetSize(pConn->cliMsgs) > 0) { + pMsg = taosArrayGetP(pConn->cliMsgs, 0); + taosArrayRemove(pConn->cliMsgs, 0); + } - STransMsg transMsg = {0}; - transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; - transMsg.ahandle = NULL; + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; - } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; - } + STransMsg transMsg = {0}; + transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + transMsg.ahandle = NULL; - if (pCtx == NULL || pCtx->pSem == NULL) { - tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); - } else { - tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); - memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); - tsem_post(pCtx->pSem); - } - destroyCmsg(pConn->data); - pConn->data = NULL; + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { + transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + } else { + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + } + + if (pCtx == NULL || pCtx->pSem == NULL) { + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + } else { + tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); + memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); + tsem_post(pCtx->pSem); + } + destroyCmsg(pMsg); + tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); + } while (taosArrayGetSize(pConn->cliMsgs) > 0); - tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); transUnrefCliHandle(pConn); } @@ -398,6 +417,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; + conn->cliMsgs = taosArrayInit(2, sizeof(void*)); QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; @@ -417,6 +437,7 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); + taosArrayDestroy(conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } @@ -426,11 +447,6 @@ static void cliSendCb(uv_write_t* req, int status) { if (status == 0) { tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); - SCliMsg* pMsg = pConn->data; - if (pMsg == NULL) { - return; - } - destroyUserdata(&pMsg->msg); } else { tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); cliHandleExcept(pConn); @@ -442,7 +458,8 @@ static void cliSendCb(uv_write_t* req, int status) { void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); - SCliMsg* pCliMsg = pConn->data; + assert(taosArrayGetSize(pConn->cliMsgs) > 0); + SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0); STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; @@ -480,6 +497,7 @@ void cliSend(SCliConn* pConn) { TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; @@ -502,8 +520,8 @@ void cliConnCb(uv_connect_t* req, int status) { uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - assert(pConn->stream == req->handle); + cliSend(pConn); } @@ -521,8 +539,11 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); - destroyCmsg(pMsg); - conn->data = NULL; + while (taosArrayGetSize(conn->cliMsgs) > 0) { + SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0); + destroyCmsg(pMsg); + taosArrayRemove(conn->cliMsgs, 0); + } transDestroyBuffer(&conn->readBuf); if (conn->persist && T_REF_VAL_GET(conn) >= 2) { @@ -561,14 +582,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { - conn->data = pMsg; + taosArrayPush(conn->cliMsgs, &pMsg); conn->hThrdIdx = pCtx->hThrdIdx; - transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); - conn->data = pMsg; + taosArrayPush(conn->cliMsgs, &pMsg); + conn->hThrdIdx = pCtx->hThrdIdx; conn->ip = strdup(pMsg->ctx->ip); conn->port = pMsg->ctx->port;