diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 1fdf7db60a..2f6b23a594 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -128,12 +128,12 @@ typedef struct SRpcInit { int32_t connLimitLock; int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch - int32_t batchSize; + int32_t shareConnLimit; int8_t shareConn; // 0: no share, 1. share int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait int8_t startReadTimer; - - void *parent; + + void *parent; } SRpcInit; typedef struct { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 3b755c2921..2ced1a6ac5 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -370,6 +370,7 @@ int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 1000); rpcInit.connLimitNum = connLimitNum; + rpcInit.shareConnLimit = 8; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dd06be8b0b..b0e36a153c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -404,7 +404,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; - rpcInit.batchSize = 8 * 1024; + rpcInit.shareConnLimit = 16; rpcInit.shareConn = 1; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.notWaitAvaliableConn = 0; @@ -454,7 +454,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) { rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; - rpcInit.batchSize = 8 * 1024; + rpcInit.shareConnLimit = 32; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.startReadTimer = 1; @@ -503,7 +503,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) { rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; - rpcInit.batchSize = 8 * 1024; + rpcInit.shareConnLimit = 64; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.startReadTimer = 1; @@ -560,6 +560,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; rpcInit.compressSize = tsCompressMsgSize; + rpcInit.shareConnLimit = 16; if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) { dError("failed to convert version string:%s to int", version); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 8e10357f07..cf0ccd9fb2 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -70,7 +70,7 @@ typedef struct { int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock int8_t supportBatch; // 0: no batch, 1: support batch - int32_t batchSize; + int32_t shareConnLimit; int8_t optBatchFetch; int32_t timeToGetConn; int index; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a9ca9d47ce..862f3d0adb 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -79,7 +79,10 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->connLimitLock = pInit->connLimitLock; pRpc->supportBatch = pInit->supportBatch; - pRpc->batchSize = pInit->batchSize; + pRpc->shareConnLimit = pInit->shareConnLimit; + if (pRpc->shareConnLimit <= 0) { + pRpc->shareConnLimit = BUFFER_LIMIT; + } pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4599a2d6f3..9c2ece635f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -48,7 +48,7 @@ typedef struct { queue wq; queue listq; int32_t wLen; - int32_t batchSize; // + int32_t shareConnLimit; // int32_t batch; SCliBatchList* pList; } SCliBatch; @@ -961,6 +961,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int int32_t code = 0; int32_t lino = 0; + STrans* pInst = pThrd->pInst; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); if (conn == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _failed); @@ -971,6 +972,9 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->dstAddr = taosStrdup(addr); conn->ipStr = taosStrdup(ip); conn->port = port; + if (conn->dstAddr == NULL || conn->ipStr == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _failed); + } conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -1006,8 +1010,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int TAOS_CHECK_GOTO(code, NULL, _failed); } - conn->bufSize = BUFFER_LIMIT; - conn->buf = (uv_buf_t*)taosMemoryCalloc(1, BUFFER_LIMIT * sizeof(uv_buf_t)); + conn->bufSize = pInst->shareConnLimit; + conn->buf = (uv_buf_t*)taosMemoryCalloc(1, pInst->shareConnLimit * sizeof(uv_buf_t)); if (conn->buf == NULL) { TAOS_CHECK_GOTO(terrno, NULL, _failed); } @@ -1870,7 +1874,7 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { return; } - pBatchList->batchLenLimit = pInst->batchSize; + pBatchList->batchLenLimit = pInst->shareConnLimit; SCliBatch* pBatch = NULL; code = createBatch(&pBatch, pBatchList, pReq); @@ -1895,9 +1899,9 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { } else { queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { + if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { QUEUE_PUSH(&pBatch->wq, h); - pBatch->batchSize += pReq->msg.contLen; + pBatch->shareConnLimit += pReq->msg.contLen; pBatch->wLen += 1; } else { SCliBatch* tBatch = NULL; @@ -1962,7 +1966,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* p QUEUE_PUSH(&pBatch->wq, &pReq->q); pBatch->wLen += 1; - pBatch->batchSize = pReq->msg.contLen; + pBatch->shareConnLimit = pReq->msg.contLen; pBatch->pList = pList; QUEUE_PUSH(&pList->wq, &pBatch->listq); @@ -3332,12 +3336,12 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea return code; } -static FORCE_INLINE int8_t shouldSWitchToOtherConn(int32_t reqNum, int32_t sentNum, int32_t stateNum) { +static FORCE_INLINE int8_t shouldSWitchToOtherConn(STrans* pInst, int32_t reqNum, int32_t sentNum, int32_t stateNum) { int32_t total = reqNum + sentNum; - if (stateNum >= STATE_BUFFER_LIMIT) { + if (stateNum >= pInst->shareConnLimit) { return 1; } - if (total >= BUFFER_LIMIT) { + if (total >= pInst->shareConnLimit) { return 1; } @@ -3353,11 +3357,13 @@ static FORCE_INLINE bool filterToDebug(void* e, void* arg) { static FORCE_INLINE void logConnMissHit(SCliConn* pConn) { // queue set; // QUEUE_INIT(&set); + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; pConn->heapMissHit++; tDebug("conn %p has %d reqs, %d sentout and %d status in process, total limit:%d, switch to other conn", pConn, transQueueSize(&pConn->reqsToSend), transQueueSize(&pConn->reqsSentOut), taosHashGetSize(pConn->pQTable), - BUFFER_LIMIT); - // if (transQueueSize(&pConn->reqsSentOut) >= BUFFER_LIMIT) { + pInst->shareConnLimit); + // if (transQueueSize(&pConn->reqsSentOut) >= pInst->shareConnLimit) { // transQueueRemoveByFilter(&pConn->reqsSentOut, filterToDebug, NULL, &set, 1); // } } @@ -3376,11 +3382,12 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { return NULL; } else { tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt); - int32_t reqsNum = transQueueSize(&pConn->reqsToSend); - int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut); - int32_t stateNum = taosHashGetSize(pConn->pQTable); - - if (shouldSWitchToOtherConn(reqsNum, reqsSentOut, stateNum)) { + int32_t reqsNum = transQueueSize(&pConn->reqsToSend); + int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut); + int32_t stateNum = taosHashGetSize(pConn->pQTable); + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + if (shouldSWitchToOtherConn(pInst, reqsNum, reqsSentOut, stateNum)) { logConnMissHit(pConn); return NULL; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 69e4bdd485..9f9ce26e59 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1293,7 +1293,6 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { TAOS_CHECK_GOTO(code, &lino, _end); } - // memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; @@ -1320,7 +1319,6 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { pConn->refId = exh->refId; QUEUE_INIT(&exh->q); - transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId); pConn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); @@ -1337,8 +1335,11 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } - pConn->bufSize = BUFFER_LIMIT; - pConn->buf = taosMemoryCalloc(1, sizeof(uv_buf_t)); + pConn->bufSize = pInst->shareConnLimit; + pConn->buf = taosMemoryCalloc(1, pInst->shareConnLimit * sizeof(uv_buf_t)); + if (pConn->buf == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); + } code = uv_tcp_init(pThrd->loop, pConn->pTcp); if (code != 0) { @@ -1351,6 +1352,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { pConn->pInst = pThrd->pInst; pConn->hostThrd = pThrd; + transRefSrvHandle(pConn); return pConn; _end: if (pConn) { diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index 134c911401..e73c209d55 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -117,7 +117,7 @@ int main(int argc, char *argv[]) { rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connLimitNum = 10; rpcInit.connLimitLock = 1; - rpcInit.batchSize = 16 * 1024; + rpcInit.shareConnLimit = 16 * 1024; rpcInit.supportBatch = 1; rpcDebugFlag = 135;