From f9290cf13f46ac8a0b97650116259d682e554256 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 4 Sep 2024 16:50:52 +0800 Subject: [PATCH] refactor transport --- source/dnode/mnode/impl/src/mndQuery.c | 31 +- source/libs/scheduler/src/schRemote.c | 1 + source/libs/transport/src/transCli.c | 531 ++++++------------------- source/libs/transport/src/transSvr.c | 52 +-- 4 files changed, 154 insertions(+), 461 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index c743aafd13..899439a31a 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -30,9 +30,9 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) { (void)qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } -int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { - int32_t code = -1; - SMnode *pMnode = pMsg->info.node; +int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo *pInfo) { + int32_t code = -1; + SMnode *pMnode = pMsg->info.node; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb, .pWorkerCb = pInfo->workerCb}; @@ -67,26 +67,25 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { return code; } - -static FORCE_INLINE void mnodeFreeSBatchRspMsg(void* p) { +static FORCE_INLINE void mnodeFreeSBatchRspMsg(void *p) { if (NULL == p) { return; } - SBatchRspMsg* pRsp = (SBatchRspMsg*)p; + SBatchRspMsg *pRsp = (SBatchRspMsg *)p; rpcFreeCont(pRsp->msg); } int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { - int32_t code = 0; - int32_t rspSize = 0; - SBatchReq batchReq = {0}; - SBatchMsg req = {0}; + int32_t code = 0; + int32_t rspSize = 0; + SBatchReq batchReq = {0}; + SBatchMsg req = {0}; SBatchRspMsg rsp = {0}; - SBatchRsp batchRsp = {0}; - SRpcMsg reqMsg = *pMsg; - void *pRsp = NULL; - SMnode *pMnode = pMsg->info.node; + SBatchRsp batchRsp = {0}; + SRpcMsg reqMsg = *pMsg; + void *pRsp = NULL; + SMnode *pMnode = pMsg->info.node; if ((code = tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) != 0) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -94,7 +93,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { goto _exit; } - int32_t msgNum = taosArrayGetSize(batchReq.pMsgs); + int32_t msgNum = taosArrayGetSize(batchReq.pMsgs); if (msgNum >= MAX_META_MSG_IN_BATCH) { code = TSDB_CODE_INVALID_MSG; mError("too many msgs %d in mnode batch meta req", msgNum); @@ -108,7 +107,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { } for (int32_t i = 0; i < msgNum; ++i) { - SBatchMsg* req = taosArrayGet(batchReq.pMsgs, i); + SBatchMsg *req = taosArrayGet(batchReq.pMsgs, i); reqMsg.msgType = req->msgType; reqMsg.pCont = req->msg; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 9215254f9c..14118f189b 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -982,6 +982,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(code); } trans->pHandle = (void *)refId; + pMsgSendInfo->msgInfo.handle = trans->pHandle; } if (pJob && pTask) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a3cd6eacd0..4a7cf8da42 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -80,6 +80,7 @@ typedef struct SCliConn { HeapNode node; // for heap int8_t inHeap; + int32_t reqRefCnt; uint32_t clientIp; uint32_t serverIp; @@ -172,6 +173,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); static void cliSendBatch_shareConnCb(uv_write_t* req, int status); void cliSendBatch_shareConn(SCliConn* pConn); int32_t cliSend2(SCliConn* conn); +bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); // register conn timer static void cliConnTimeout(uv_timer_t* handle); // register timer for read @@ -190,7 +192,7 @@ static void cliAsyncCb(uv_async_t* handle); // static void cliIdleCb(uv_idle_t* handle); // static void cliPrepareCb(uv_prepare_t* handle); -static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); +// static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); static void cliSendBatchCb(uv_write_t* req, int status); SCliBatch* cliGetHeadFromList(SCliBatchList* pList); @@ -202,11 +204,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); void cliResetConnTimer(SCliConn* conn); -static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void cliDestroy(uv_handle_t* handle); -static int32_t cliSend(SCliConn* pConn); -static void cliSendBatch(SCliConn* pConn); -static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); +// static int32_t cliSend(SCliConn* pConn); +// static void cliSendBatch(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); @@ -459,16 +461,6 @@ void cliResetConnTimer(SCliConn* conn) { void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); } -bool cliShouldAddConnToPool(SCliConn* conn) { - SCliThrd* pThrd = conn->hostThrd; - bool empty = transQueueEmpty(&conn->reqs); - if (empty) { - (void)delConnFromHeapCache(pThrd->connHeapCache, conn); - } - - return empty; -} - int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int32_t code = 0; for (int i = 0; i < transQueueSize(&conn->reqs); i++) { @@ -485,6 +477,7 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int8_t cliMayRecycleConn(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; if (transQueueSize(&conn->reqs) == 0) { + (void)delConnFromHeapCache(pThrd->connHeapCache, conn); addConnToPool(pThrd->pool, conn); return 1; } @@ -494,12 +487,13 @@ int8_t cliMayRecycleConn(SCliConn* conn) { int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) { pResp->contLen = transContLenFromMsg(pHead->msgLen); pResp->pCont = transContFromHead((char*)pHead); - pResp->code = pReq->msg.code; - pResp->msgType = pReq->msg.msgType; + pResp->code = pHead->code; + pResp->msgType = pHead->msgType; pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; - pResp->info.traceId = pReq->msg.info.traceId; - pResp->info.hasEpSet = pReq->msg.info.hasEpSet; - pResp->info.cliVer = pReq->msg.info.cliVer; + pResp->info.traceId = pHead->traceId; + pResp->info.hasEpSet = pHead->hasEpSet; + pResp->info.cliVer = htonl(pHead->compatibilityVer); + pResp->info.seqNum = htonl(pHead->seqNum); return 0; } void cliHandleResp2(SCliConn* conn) { @@ -525,17 +519,27 @@ void cliHandleResp2(SCliConn* conn) { return; } + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + SCliReq* pReq = NULL; int32_t seq = htonl(pHead->seqNum); code = cliGetReqBySeq(conn, seq, &pReq); if (code != 0) { - tDebug("%s conn %p recv unexpected packet, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + if (cliConnRmReleaseReq(conn, pHead)) { + return; + } else { + } + tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, + tstrerror(code)); // TODO: notify cb + + if (cliMayRecycleConn(conn)) { + return; + } return; } - pHead->code = htonl(pHead->code); - pHead->msgLen = htonl(pHead->msgLen); // TODO handle release req // if (cliRecvReleaseReq(conn, pHead)) { // return; @@ -545,6 +549,9 @@ void cliHandleResp2(SCliConn* conn) { code = cliBuildRespFromCont(pReq, &resp, pHead); if (code != 0) { tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); + } else { + tDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d", CONN_GET_INST_LABEL(conn), conn, + TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq); } code = cliNotifyCb(conn, pReq, &resp); @@ -1054,6 +1061,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } + uv_read_stop(conn->stream); conn->seq = 0; int32_t code = allocConnRef(conn, true); @@ -1240,7 +1248,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); - TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception); + // TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception); transQueuePush(&pConn->reqs, pReq); @@ -1348,60 +1356,6 @@ static void cliDestroy(uv_handle_t* handle) { taosMemoryFree(conn); } -// static bool cliHandleNoResp(SCliConn* conn) { -// bool res = false; -// if (!transQueueEmpty(&conn->reqs)) { -// SCliReq* pReq = transQueueGet(&conn->reqs, 0); -// if (REQUEST_NO_RESP(&pReq->msg)) { -// (void)transQueuePop(&conn->reqs); -// destroyReq(pReq); -// res = true; -// } -// if (res == true) { -// if (cliMaySendCachedMsg(conn) == false) { -// SCliThrd* thrd = conn->hostThrd; -// addConnToPool(thrd->pool, conn); -// res = false; -// } else { -// res = true; -// } -// } -// } -// return res; -// } -static void cliSendCb(uv_write_t* req, int status) { - STUB_RAND_NETWORK_ERR(status); - - // SCliConn* pConn = transReqQueueRemove(req); - // if (pConn == NULL) return; - - // SCliReq* pReq = transQueueGet(&pConn->reqs, 0); - // if (pReq != NULL) { - // int64_t cost = taosGetTimestampUs() - pReq->st; - // if (cost > 1000 * 50) { - // tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); - // } - // } - // if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) { - // rpcFreeCont(pReq->msg.pCont); - // pReq->msg.pCont = 0; - // } - - // if (status == 0) { - // tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); - // } else { - // if (!uv_is_closing((uv_handle_t*)&pConn->stream)) { - // tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); - // cliHandleExcept(pConn, -1); - // } - // return; - // } - // if (cliHandleNoResp(pConn) == true) { - // tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn); - // return; - // } - // (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); -} static void cliHandleBatch_shareConnExcept(SCliConn* conn) { int32_t code = 0; @@ -1449,8 +1403,6 @@ static int32_t cliShouldSendMsg(SCliConn* conn) { for (int i = 0; i < transQueueSize(&conn->reqs); i++) { SCliReq* pReq = transQueueGet(&conn->reqs, i); if (pReq->sent == 0) { - // pReq->sent = 1; - // pReq->seq = conn->seq; return 1; } } @@ -1463,23 +1415,18 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { cliConnRmReqs(conn); if (status != 0) { - tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); + tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); if (!uv_is_closing((uv_handle_t*)&conn->stream)) { cliHandleBatch_shareConnExcept(conn); } return; } - int ret = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + taosMemoryFree(req); - if (ret != 0) { - tError("%s conn %p failed to start read, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(ret)); - cliHandleBatch_shareConnExcept(conn); - } - - if (cliShouldSendMsg(conn) == 1) { + if (!cliMayRecycleConn(conn)) { cliSendBatch_shareConn(conn); } - taosMemoryFree(req); } void cliSendBatch_shareConn(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; @@ -1489,7 +1436,6 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int32_t totalLen = 0; if (size == 0) { tError("%s conn %p not msg to send", pInst->label, pConn); - ASSERT(0); return; } uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); @@ -1542,108 +1488,24 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pCliMsg->sent = 1; pCliMsg->seq = pConn->seq; + + STraceId* trace = &pCliMsg->msg.info.traceId; + tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq); + } + if (j == 0) { + taosMemoryFree(wb); + return; } uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; pConn->shareCnt += 1; - tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, - totalLen); + tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); taosMemoryFree(wb); } -void cliSendBatch(SCliConn* pConn) { - int32_t code = 0; - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - SCliBatch* pBatch = pConn->pBatch; - int32_t wLen = pBatch->wLen; - - pBatch->pList->connCnt += 1; - - uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); - if (wb == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - goto _exception; - } - - int i = 0; - queue* h = NULL; - QUEUE_FOREACH(h, &pBatch->wq) { - SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q); - - SReqCtx* pCtx = pCliMsg->ctx; - - STransMsg* pReq = (STransMsg*)(&pCliMsg->msg); - if (pReq->pCont == 0) { - pReq->pCont = (void*)rpcMallocCont(0); - if (pReq->pCont == NULL) { - code = TSDB_CODE_OUT_OF_BUFFER; - tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - goto _exception; - } - pReq->contLen = 0; - } - pConn->seq++; - int msgLen = transMsgLenFromCont(pReq->contLen); - STransMsgHead* pHead = transHeadFromCont(pReq->pCont); - - if (pHead->comp == 0) { - pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; - pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; - pHead->msgType = pReq->msgType; - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; - memcpy(pHead->user, pInst->user, strlen(pInst->user)); - pHead->traceId = pReq->info.traceId; - pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->version = TRANS_VER; - pHead->compatibilityVer = htonl(pInst->compatibilityVer); - } - pHead->timestamp = taosHton64(taosGetTimestampUs()); - pHead->seqNum = htonl(pConn->seq); - - if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { - if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { - msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - } else { - msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); - } - wb[i++] = uv_buf_init((char*)pHead, msgLen); - } - - uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); - if (req == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - goto _exception; - } - req->data = pConn; - tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, - pBatch->wLen, pBatch->batchSize); - - code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); - if (code != 0) { - tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code)); - goto _exception; - } - - taosMemoryFree(wb); - return; - -_exception: - cliDestroyBatch(pBatch); - taosMemoryFree(wb); - pConn->pBatch = NULL; - return; -} - -// int32_t cliSend2(SCliConn* pConn) {} int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; transQueuePush(&pConn->reqs, pCliMsg); @@ -1655,79 +1517,6 @@ int32_t cliSend2(SCliConn* pConn) { cliSendBatch_shareConn(pConn); return 0; } -int32_t cliSend(SCliConn* pConn) { - SCliThrd* pThrd = pConn->hostThrd; - STrans* pInst = pThrd->pInst; - SCliReq* pCliReq = NULL; - int32_t code = cliConnFindToSendMsg(pConn, &pCliReq); - if (code != 0) { - return code; - } - - SReqCtx* pCtx = pCliReq->ctx; - - STransMsg* pReq = (STransMsg*)(&pCliReq->msg); - if (pReq->pCont == 0) { - pReq->pCont = (void*)rpcMallocCont(0); - if (pReq->pCont == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - tDebug("malloc memory: %p", pReq->pCont); - pReq->contLen = 0; - } - - int msgLen = transMsgLenFromCont(pReq->contLen); - STransMsgHead* pHead = transHeadFromCont(pReq->pCont); - - if (pHead->comp == 0) { - pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; - pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; - pHead->msgType = pReq->msgType; - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - pHead->release = REQUEST_RELEASE_HANDLE(pCliReq) ? 1 : 0; - memcpy(pHead->user, pInst->user, strlen(pInst->user)); - pHead->traceId = pReq->info.traceId; - pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->version = TRANS_VER; - pHead->compatibilityVer = htonl(pInst->compatibilityVer); - pHead->seqNum = htonl(pConn->seq++); - } - - pHead->timestamp = taosHton64(taosGetTimestampUs()); - - STraceId* trace = &pReq->info.traceId; - - if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { - if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { - msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - } else { - msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); - } - - tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); - - uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - uv_write_t* aReq = transReqQueuePush(&pConn->wreqQueue); - if (aReq == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); - } - - pCliReq->sent = 1; - int status = uv_write(aReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); - if (status != 0) { - tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), - uv_err_name(status)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _exception); - } - - return TSDB_CODE_RPC_ASYNC_IN_PROCESS; -_exception: - return code; -} static void cliDestroyBatch(SCliBatch* pBatch) { if (pBatch == NULL) return; @@ -1808,83 +1597,6 @@ _exception2: // // taosMemoryFree(conn); return code; } -static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { - if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { - return; - } - - int32_t code = 0; - if (pThrd->quit == true) { - cliDestroyBatch(pBatch); - return; - } - - STrans* pInst = pThrd->pInst; - SCliBatchList* pList = pBatch->pList; - - bool exceed = false; - SCliConn* conn = getConnFromPool(pThrd, pList->dst, &exceed); - - if (conn == NULL && exceed) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pInst->label, pBatch->wLen, - pBatch->batchSize, pInst->connLimitNum); - cliDestroyBatch(pBatch); - return; - } - if (conn == NULL) { - code = cliCreateConn(pThrd, &conn, pList->ip, pList->port); - if (code != 0) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label, - pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code)); - cliDestroyBatch(pBatch); - return; - } - - conn->pBatch = pBatch; - code = cliDoConn(pThrd, conn); - if (code != 0) { - } - return; - } - - conn->pBatch = pBatch; - cliSendBatch(conn); -} -static void cliSendBatchCb(uv_write_t* req, int status) { - STUB_RAND_NETWORK_ERR(status); - SCliConn* conn = req->data; - SCliThrd* thrd = conn->hostThrd; - SCliBatch* p = conn->pBatch; - conn->pBatch = NULL; - - SCliBatch* nxtBatch = cliGetHeadFromList(p->pList); - p->pList->connCnt -= 1; - - if (status != 0) { - tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, - p->wLen, p->batchSize, uv_err_name(status)); - - if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn, -1); - - cliHandleBatchReq(nxtBatch, thrd); - } else { - tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, - p->batchSize); - if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) { - if (nxtBatch != NULL) { - conn->pBatch = nxtBatch; - cliSendBatch(conn); - } else { - addConnToPool(thrd->pool, conn); - } - } else { - cliDestroyBatch(nxtBatch); - } - } - - cliDestroyBatch(p); - taosMemoryFree(req); -} static void cliHandleFastFail_resp(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; @@ -1963,13 +1675,8 @@ void cliConnCb(uv_connect_t* req, int status) { cliConnSetSockInfo(pConn); + addConnToHeapCache(pThrd->connHeapCache, pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - if (pConn->pBatch != NULL) { - return cliSendBatch(pConn); - } - if (pConn->inHeap) { - return cliSendBatch_shareConn(pConn); - } (void)cliSend2(pConn); } @@ -2236,71 +1943,34 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } -void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) { - int32_t code = 0; - int32_t lino = 0; - STransMsg resp = {0}; - - code = (pThrd->initCb)(pThrd, pReq, NULL); - TAOS_CHECK_GOTO(code, &lino, _exception); - - STraceId* trace = &pReq->msg.info.traceId; - STrans* pInst = pThrd->pInst; - - char addr[TSDB_FQDN_LEN + 64] = {0}; - char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); - CONN_CONSTRUCT_HASH_KEY(addr, ip, port); - - SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); - if (pConn == NULL) { - tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - bool ignore = false; - pConn = getConnFromPool(pThrd, addr, &ignore); - if (pConn != NULL) { - addConnToHeapCache(pThrd->connHeapCache, pConn); - transQueuePush(&pConn->reqs, pReq); - return cliSendBatch_shareConn(pConn); - } - } else { - tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); - transQueuePush(&pConn->reqs, pReq); - cliSendBatch_shareConn(pConn); - return; - } - - TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); - - TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception); - - transQueuePush(&pConn->reqs, pReq); - - code = cliDoConn(pThrd, pConn); -_exception: - - resp.code = code; - (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); - return; -} - void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { int32_t lino = 0; STransMsg resp = {0}; int32_t code = (pThrd->initCb)(pThrd, pReq, NULL); TAOS_CHECK_GOTO(code, &lino, _exception); + char addr[TSDB_FQDN_LEN + 64] = {0}; + char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + CONN_CONSTRUCT_HASH_KEY(addr, ip, port); + STrans* pInst = pThrd->pInst; SCliConn* pConn = NULL; - code = cliGetOrCreateConn(pThrd, pReq, &pConn); - if (code == TSDB_CODE_RPC_MAX_SESSIONS) { - TAOS_CHECK_GOTO(code, &lino, _exception); - } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { - // do nothing, notiy - return; - } else { - code = cliSendReq(pConn, pReq); + pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); + if (pConn == NULL) { + code = cliGetOrCreateConn(pThrd, pReq, &pConn); + if (code == TSDB_CODE_RPC_MAX_SESSIONS) { + TAOS_CHECK_GOTO(code, &lino, _exception); + } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + // do nothing, notiy + return; + } else { + ASSERT(code == 0); + addConnToHeapCache(pThrd->connHeapCache, pConn); + } } + code = cliSendReq(pConn, pReq); tTrace("%s conn %p ready", pInst->label, pConn); return; @@ -2311,14 +1981,7 @@ _exception: return; } -void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { - // STrans* pInst = pThrd->pInst; - // if (pInst->shareConn == 1) { - // return cliHandleReq__shareConn(pThrd, pReq); - // } else { - return cliHandleReq__noShareConn(pThrd, pReq); - //} -} +void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleReq__noShareConn(pThrd, pReq); } static void cliDoReq(queue* wq, SCliThrd* pThrd) { int count = 0; @@ -2489,23 +2152,23 @@ static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) { - cliBuildBatch(pReq, h, pThrd); - continue; - } + // if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) { + // cliBuildBatch(pReq, h, pThrd); + // continue; + // } (*cliAsyncHandle[pReq->type])(pThrd, pReq); count++; } - void** pIter = taosHashIterate(pThrd->batchCache, NULL); - while (pIter != NULL) { - SCliBatchList* batchList = (SCliBatchList*)(*pIter); - SCliBatch* batch = cliGetHeadFromList(batchList); - if (batch != NULL) { - cliHandleBatchReq(batch, pThrd); - } - pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); - } + // void** pIter = taosHashIterate(pThrd->batchCache, NULL); + // while (pIter != NULL) { + // SCliBatchList* batchList = (SCliBatchList*)(*pIter); + // SCliBatch* batch = cliGetHeadFromList(batchList); + // if (batch != NULL) { + // cliHandleBatchReq(batch, pThrd); + // } + // pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); + // } if (count >= 2) { tTrace("cli process batch size:%d", count); @@ -2555,6 +2218,20 @@ void cliConnFreeMsgs(SCliConn* conn) { cmsg->ctx->ahandle = NULL; } } + +bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) { + if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + for (int i = 0; i < transQueueSize(&conn->reqs); i++) { + SCliReq* pReq = transQueueGet(&conn->reqs, i); + if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) { + tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); + transQueueRm(&conn->reqs, i); + return true; + } + } + } + return false; +} bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; @@ -3778,7 +3455,6 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) { pReq->ctx = pCtx; pReq->type = Update; - // pReq->refId = (int64_t)pInstRef; SCliThrd* thrd = ((SCliObj*)pInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid); @@ -3898,7 +3574,10 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { if (code != 0) { tDebug("failed to get conn from heap cache for key:%s", key); return NULL; + } else { + tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt); } + return pConn; } static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { @@ -3908,7 +3587,9 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { if (code != 0) { return code; } - return transHeapInsert(p, pConn); + code = transHeapInsert(p, pConn); + tDebug("add conn %p to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap, + pConn->reqRefCnt); } static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { @@ -3961,7 +3642,9 @@ int32_t transHeapGet(SHeap* heap, SCliConn** p) { } int32_t transHeapInsert(SHeap* heap, SCliConn* p) { // impl later + p->reqRefCnt++; if (p->inHeap == 1) { + tDebug("failed to insert conn %p since already in heap", p); return TSDB_CODE_DUP_KEY; } @@ -3972,8 +3655,18 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) { int32_t transHeapDelete(SHeap* heap, SCliConn* p) { // impl later if (p->inHeap == 0) { + tDebug("failed to del conn %p since not in heap", p); return TSDB_CODE_INVALID_PARA; } - heapRemove(heap->heap, &p->node); + p->inHeap = 0; + p->reqRefCnt--; + if (p->reqRefCnt == 0) { + heapRemove(heap->heap, &p->node); + tDebug("delete conn %p delete from heap", p); + } else if (p->reqRefCnt < 0) { + tDebug("conn %p has %d reqs, not delete from heap,assert", p, p->reqRefCnt); + } else { + tDebug("conn %p has %d reqs, not delete from heap", p, p->reqRefCnt); + } return 0; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 6b5c15eae3..27a92fb483 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -662,21 +662,22 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { return TSDB_CODE_INVALID_MSG; } - if (pConn->status == ConnNormal) { - pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); - if (smsg->type == Release) pHead->msgType = 0; - } else { - if (smsg->type == Release) { - pHead->msgType = 0; - pConn->status = ConnNormal; - destroyConnRegArg(pConn); - transUnrefSrvHandle(pConn); - } else { - // set up resp msg type - pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); - } - } + // if (pConn->status == ConnNormal) { + // pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); + // if (smsg->type == Release) pHead->msgType = 0; + // } else { + // if (smsg->type == Release) { + // pHead->msgType = 0; + // pConn->status = ConnNormal; + // destroyConnRegArg(pConn); + // transUnrefSrvHandle(pConn); + // } else { + // // set up resp msg type + // pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); + // } + // } + pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead)); @@ -835,13 +836,13 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { } static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - int32_t code = reallocConnRef(pConn); - if (code != 0) { - destroyConn(pConn, true); - return true; - } - tTrace("conn %p received release request", pConn); + // int32_t code = reallocConnRef(pConn); + // if (code != 0) { + // destroyConn(pConn, true); + // return true; + // } + tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; (void)transClearBuffer(&pConn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); @@ -850,7 +851,11 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { } pConn->status = ConnRelease; - STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; + STransMsg tmsg = {.code = 0, + .info.handle = (void*)pConn, + .info.traceId = traceId, + .info.ahandle = (void*)0x9527, + .info.seqNum = htonl(pHead->seqNum)}; SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); srvMsg->msg = tmsg; srvMsg->type = Release; @@ -1590,11 +1595,6 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { int32_t code = 0; SSvrConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - code = reallocConnRef(conn); - if (code != 0) { - destroyConn(conn, true); - return; - } if (!transQueuePush(&conn->srvMsgs, msg)) { return; }