diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index de3c2a9f52..0cc0ab64ef 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -112,7 +112,12 @@ typedef struct SRpcInit { // fail fast fp RpcFFfp ffp; - void *parent; + int32_t connLimitNum; + int32_t connLimitLock; + + int8_t supportBatch; // 0: no batch, 1. batch + int32_t batchSize; + void *parent; } SRpcInit; typedef struct { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d8eecdfc64..75bdf81a27 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -67,6 +67,10 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) // #define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected" #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // +#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // + + + //common & util #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) // diff --git a/include/util/tdef.h b/include/util/tdef.h index e03352d98b..aeb8d08936 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -281,8 +281,8 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_MGMT 1 #define TSDB_DNODE_ROLE_VNODE 2 -#define TSDB_MAX_REPLICA 5 -#define TSDB_SYNC_LOG_BUFFER_SIZE 4096 +#define TSDB_MAX_REPLICA 5 +#define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4) #define TSDB_TBNAME_COLUMN_INDEX (-1) @@ -413,7 +413,7 @@ typedef enum ELogicConditionType { #ifdef WINDOWS #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #else -#define TSDB_MAX_RPC_THREADS 20 +#define TSDB_MAX_RPC_THREADS 10 #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 94f1a8f730..1e5291b7cb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -76,11 +76,11 @@ bool tsEnableTelem = true; int32_t tsTelemInterval = 43200; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; -char* tsTelemUri = "/report"; +char *tsTelemUri = "/report"; -bool tsEnableCrashReport = true; -char* tsClientCrashReportUri = "/ccrashreport"; -char* tsSvrCrashReportUri = "/dcrashreport"; +bool tsEnableCrashReport = true; +char *tsClientCrashReportUri = "/ccrashreport"; +char *tsSvrCrashReportUri = "/dcrashreport"; // schemaless char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; @@ -212,9 +212,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg); #endif -struct SConfig *taosGetCfg() { - return tsCfg; -} +struct SConfig *taosGetCfg() { return tsCfg; } static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -391,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; - tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS); + tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; tsNumOfCommitThreads = tsNumOfCores / 2; @@ -501,7 +499,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(tsCfg, "numOfRpcThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfRpcThreads = numOfCores / 2; - tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4); + tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); pItem->i32 = tsNumOfRpcThreads; pItem->stype = stype; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dcb63f6524..7f9a261cf2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -280,10 +280,19 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; - rpcInit.failFastInterval = 1000; // interval threshold(ms) + rpcInit.failFastInterval = 5000; // interval threshold(ms) rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; + int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 100); + connLimitNum = TMIN(connLimitNum, 500); + + rpcInit.connLimitNum = connLimitNum; + rpcInit.connLimitLock = 1; + rpcInit.supportBatch = 1; + rpcInit.batchSize = 16 * 1024; + pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5f964f6b1a..a41cc0068c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -94,8 +94,8 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) -//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) +// #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit +// #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 2db4a72795..1f3c98ad72 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,6 +64,11 @@ typedef struct { void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); + int32_t connLimitNum; + int8_t connLimitLock; // 0: no lock. 1. lock + int8_t supportBatch; // 0: no batch, 1: support batch + int32_t batchSize; + int index; void* parent; void* tcphandle; // returned handle from TCP initialization diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 47b1ac5ca7..16ea25a41a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -67,6 +67,10 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; + pRpc->connLimitNum = pInit->connLimitNum; + pRpc->connLimitLock = pInit->connLimitLock; + pRpc->supportBatch = pInit->supportBatch; + pRpc->batchSize = pInit->batchSize; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 111742a6f4..2c862ed45b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -12,7 +12,6 @@ * along with this program. If not, see . */ -#ifdef USE_UV #include "transComm.h" typedef struct SConnList { @@ -20,6 +19,30 @@ typedef struct SConnList { int32_t size; } SConnList; +typedef struct { + queue wq; + int32_t len; + + int connMax; + int connCnt; + int batchLenLimit; + int sending; + + char* dst; + char* ip; + uint16_t port; + +} SCliBatchList; + +typedef struct { + queue wq; + queue listq; + int32_t wLen; + int32_t batchSize; // + int32_t batch; + SCliBatchList* pList; +} SCliBatch; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -40,9 +63,10 @@ typedef struct SCliConn { bool broken; // link broken or not ConnStatus status; // - int64_t refId; - char* ip; - uint32_t port; + SCliBatch* pBatch; + + int64_t refId; + char* ip; SDelayTask* task; @@ -80,11 +104,14 @@ typedef struct SCliThrd { uint64_t nextTimeout; // next timeout void* pTransInst; // + int connCount; void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; SHashObj* failFastCache; + SHashObj* connLimitCache; + SHashObj* batchCache; SCliMsg* stopMsg; @@ -131,6 +158,11 @@ static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); +static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); +static void cliSendBatchCb(uv_write_t* req, int status); + +SCliBatch* cliGetHeadFromList(SCliBatchList* pList); + static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); @@ -141,8 +173,11 @@ static SCliConn* cliCreateConn(SCliThrd* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port); + // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); @@ -157,6 +192,7 @@ static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); +static void cliHandleFastFail(SCliConn* pConn, int status); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); @@ -165,6 +201,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate}; +/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, +/// NULL,cliHandleUpdate}; static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyCmsg(void* cmsg); @@ -285,6 +323,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } destroyCmsg(msg); } + transQueueClear(&conn->cliMsgs); memset(&conn->ctx, 0, sizeof(conn->ctx)); } bool cliMaySendCachedMsg(SCliConn* conn) { @@ -487,9 +526,9 @@ void cliConnTimeout(uv_timer_t* handle) { uv_timer_stop(handle); handle->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - cliHandleExceptImpl(conn, -1); + + cliHandleFastFail(conn, UV_ECANCELED); } void cliReadTimeoutCb(uv_timer_t* handle) { // set up timeout cb @@ -569,17 +608,15 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; if (conn->list == NULL) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); - conn->list = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); } else { tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); } QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; - if (conn->list->size >= 50) { + if (conn->list->size >= 250) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; @@ -671,7 +708,6 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - // transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -694,6 +730,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->broken = 0; transRefCliHandle(conn); + atomic_add_fetch_32(&pThrd->connCount, 1); allocConnRef(conn, false); return conn; @@ -737,6 +774,11 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer->data = NULL; conn->timer = NULL; } + int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; + taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); + + atomic_sub_fetch_32(&pThrd->connCount, 1); transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -748,6 +790,7 @@ static void cliDestroy(uv_handle_t* handle) { tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); transDestroyBuffer(&conn->readBuf); + taosMemoryFree(conn); } static bool cliHandleNoResp(SCliConn* conn) { @@ -798,7 +841,65 @@ static void cliSendCb(uv_write_t* req, int status) { } uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } +void cliSendBatch(SCliConn* pConn) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + SCliBatch* pBatch = pConn->pBatch; + SCliBatchList* pList = pBatch->pList; + pList->connCnt += 1; + + int32_t wLen = pBatch->wLen; + + uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); + int i = 0; + + queue* h = NULL; + QUEUE_FOREACH(h, &pBatch->wq) { + SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); + + STransConnCtx* pCtx = pCliMsg->ctx; + + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + + int msgLen = transMsgLenFromCont(pMsg->contLen); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + if (pHead->comp == 0) { + pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; + memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); + } + pHead->timestamp = taosHton64(taosGetTimestampUs()); + + if (pHead->comp == 0) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + } else { + msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + } + wb[i++] = uv_buf_init((char*)pHead, msgLen); + } + + uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + req->data = pConn; + tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, + pBatch->wLen, pBatch->batchSize); + uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); + taosMemoryFree(wb); +} void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -883,31 +984,137 @@ _RETURN: return; } -void cliConnCb(uv_connect_t* req, int status) { - SCliConn* pConn = req->data; - SCliThrd* pThrd = pConn->hostThrd; +static void cliDestroyBatch(SCliBatch* pBatch) { + while (!QUEUE_IS_EMPTY(&pBatch->wq)) { + queue* h = QUEUE_HEAD(&pBatch->wq); + QUEUE_REMOVE(h); - if (pConn->timer != NULL) { - uv_timer_stop(pConn->timer); - pConn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &pConn->timer); - pConn->timer = NULL; + SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); + destroyCmsg(p); + } + SCliBatchList* p = pBatch->pList; + p->sending -= 1; + taosMemoryFree(pBatch); +} +static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { + return; + } + STrans* pTransInst = pThrd->pTransInst; + SCliBatchList* pList = pBatch->pList; + + SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port); + + if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, + pBatch->batchSize); + cliDestroyBatch(pBatch); + return; + } + if (conn == NULL) { + conn = cliCreateConn(pThrd); + conn->pBatch = pBatch; + conn->ip = strdup(pList->dst); + + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); + if (ipaddr == 0xffffffff) { + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + + cliHandleFastFail(conn, -1); + return; + } + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ipaddr; + addr.sin_port = (uint16_t)htons(pList->port); + + tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); + if (fd == -1) { + tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleFastFail(conn, -1); + return; + } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + if (ret != 0) { + tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); + return; + } + ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret != 0) { + tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); + return; + } + + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + if (ret != 0) { + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + cliHandleFastFail(conn, -1); + return; + } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + return; } - if (status != 0) { - SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - STrans* pTransInst = pThrd->pTransInst; + conn->pBatch = pBatch; + cliSendBatch(conn); +} +static void cliSendBatchCb(uv_write_t* req, int status) { + SCliConn* conn = req->data; + SCliThrd* thrd = conn->hostThrd; + SCliBatch* p = conn->pBatch; + + SCliBatchList* pBatchList = p->pList; + SCliBatch* nxtBatch = cliGetHeadFromList(pBatchList); + pBatchList->connCnt -= 1; + + conn->pBatch = NULL; + + if (status != 0) { + tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, + p->wLen, p->batchSize, uv_err_name(status)); + cliHandleExcept(conn); + cliHandleBatchReq(nxtBatch, thrd); + } else { + tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, + p->batchSize); + + if (nxtBatch != NULL) { + conn->pBatch = nxtBatch; + cliSendBatch(conn); + } else { + addConnToPool(thrd->pool, conn); + } + } + + cliDestroyBatch(p); + taosMemoryFree(req); +} +static void cliHandleFastFail(SCliConn* pConn, int status) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + if (status == -1) status = ENETUNREACH; + + if (pConn->pBatch == NULL) { + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + + STraceId* trace = &pMsg->msg.info.traceId; + tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status)); - tError("%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s", CONN_GET_INST_LABEL(pConn), - pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, pConn->port, uv_strerror(status)); if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char* ip = pConn->ip; - uint32_t port = pConn->port; - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -919,15 +1126,47 @@ void cliConnCb(uv_connect_t* req, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem)); } } - cliHandleExcept(pConn); + } else { + tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + pConn, pConn->ip, uv_strerror(status)); + cliDestroyBatch(pConn->pBatch); + pConn->pBatch = NULL; + } + cliHandleExcept(pConn); +} + +void cliConnCb(uv_connect_t* req, int status) { + SCliConn* pConn = req->data; + SCliThrd* pThrd = pConn->hostThrd; + bool timeout = false; + + if (pConn->timer == NULL) { + timeout = true; + } else { + uv_timer_stop(pConn->timer); + pConn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &pConn->timer); + pConn->timer = NULL; + } + + if (status != 0) { + if (timeout == false) { + cliHandleFastFail(pConn, status); + } else if (timeout == true) { + // already deal by timeout + } return; } - struct sockaddr peername, sockname; - int addrlen = sizeof(peername); + int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip)); + int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; + taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); + + struct sockaddr peername, sockname; + int addrlen = sizeof(peername); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); transSockInfo2Str(&peername, pConn->dst); @@ -936,8 +1175,11 @@ void cliConnCb(uv_connect_t* req, int status) { transSockInfo2Str(&sockname, pConn->src); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - - cliSend(pConn); + if (pConn->pBatch != NULL) { + cliSendBatch(pConn); + } else { + cliSend(pConn); + } } static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -1062,12 +1304,32 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) { + STrans* pTransInst = pThrd->pTransInst; + + // STransConnCtx* pCtx = pMsg->ctx; + // char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + // int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); + if (val == NULL) return 0; + + if (*val >= pTransInst->connLimitNum) { + return -1; + } + return 0; +} void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); STraceId* trace = &pMsg->msg.info.traceId; + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); if (!EPSET_IS_VALID(&pCtx->epSet)) { tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); @@ -1076,16 +1338,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { } if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; + char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); if (item != NULL) { int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { - STraceId* trace = &(pMsg->msg.info.traceId); tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse); destroyCmsg(pMsg); @@ -1107,6 +1366,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) { + tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), + tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); + destroyCmsg(pMsg); + return; + } + if (conn != NULL) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); @@ -1120,10 +1386,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); - conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + CONN_CONSTRUCT_HASH_KEY(key, fqdn, port); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); + conn->ip = strdup(key); + + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { uv_timer_stop(conn->timer); conn->timer->data = NULL; @@ -1137,9 +1407,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons((uint16_t)conn->port); + addr.sin_port = (uint16_t)htons(port); - tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); + tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1163,45 +1433,168 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, - uv_err_name(ret)); - uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - cliHandleExcept(conn); + cliHandleFastFail(conn, ret); return; } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } tGTrace("%s conn %p ready", pTransInst->label, conn); } + +static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { + int count = 0; + + while (!QUEUE_IS_EMPTY(wq)) { + queue* h = QUEUE_HEAD(wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); + + count++; + } + if (count >= 2) { + tTrace("cli process batch size:%d", count); + } +} +SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { + if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) { + return NULL; + } + queue* hr = QUEUE_HEAD(&pList->wq); + QUEUE_REMOVE(hr); + pList->sending += 1; + + pList->len -= 1; + + SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); + return batch; +} + +static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { + STrans* pInst = pThrd->pTransInst; + + int count = 0; + while (!QUEUE_IS_EMPTY(wq)) { + queue* h = QUEUE_HEAD(wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { + STransConnCtx* pCtx = pMsg->ctx; + + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key)); + if (ppBatchList == NULL || *ppBatchList == NULL) { + SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); + QUEUE_INIT(&pBatchList->wq); + pBatchList->connMax = pInst->connLimitNum; + pBatchList->connCnt = 0; + pBatchList->batchLenLimit = pInst->batchSize; + pBatchList->len += 1; + + pBatchList->ip = strdup(ip); + pBatchList->dst = strdup(key); + pBatchList->port = port; + + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = pBatchList; + + QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); + + taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*)); + } else { + if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize = pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; + + continue; + } + + queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); + SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); + if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { + QUEUE_PUSH(&pBatch->wq, h); + pBatch->batchSize += pMsg->msg.contLen; + pBatch->wLen += 1; + } else { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; + } + } + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); + count++; + } + + void** pIter = taosHashIterate(pThrd->batchCache, NULL); + while (pIter != NULL) { + SCliBatchList* batchList = (SCliBatchList*)(*pIter); + SCliBatch* batch = cliGetHeadFromList(batchList); + if (batch != NULL) { + cliHandleBatchReq(batch, pThrd); + } + pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); + } + + if (count >= 2) { + tTrace("cli process batch size:%d", count); + } +} + static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SCliThrd* pThrd = item->pThrd; - SCliMsg* pMsg = NULL; + STrans* pTransInst = pThrd->pTransInst; + SCliMsg* pMsg = NULL; // batch process to avoid to lock/unlock frequently queue wq; taosThreadMutexLock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - int count = 0; - while (!QUEUE_IS_EMPTY(&wq)) { - queue* h = QUEUE_HEAD(&wq); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); - count++; + int8_t supportBatch = pTransInst->supportBatch; + if (supportBatch == 0) { + cliNoBatchDealReq(&wq, pThrd); + } else if (supportBatch == 1) { + cliBatchDealReq(&wq, pThrd); } - if (count >= 2) { - tTrace("cli process batch size:%d", count); - } - // if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } @@ -1380,7 +1773,11 @@ static SCliThrd* createThrdObj(void* trans) { taosMemoryFree(pThrd); return NULL; } - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); + if (pTransInst->supportBatch) { + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb); + } else { + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); + } if (pThrd->asyncPool == NULL) { tError("failed to init async pool"); uv_loop_close(pThrd->loop); @@ -1414,6 +1811,10 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, + pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); + + pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; return pThrd; @@ -1442,6 +1843,25 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); + taosHashCleanup(pThrd->connLimitCache); + + void** pIter = taosHashIterate(pThrd->batchCache, NULL); + while (pIter != NULL) { + SCliBatchList* pBatchList = (SCliBatchList*)(*pIter); + while (!QUEUE_IS_EMPTY(&pBatchList->wq)) { + queue* h = QUEUE_HEAD(&pBatchList->wq); + QUEUE_REMOVE(h); + + SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); + cliDestroyBatch(pBatch); + } + taosMemoryFree(pBatchList->ip); + taosMemoryFree(pBatchList->dst); + taosMemoryFree(pBatchList); + + pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); + } + taosHashCleanup(pThrd->batchCache); taosMemoryFree(pThrd); } @@ -1865,6 +2285,19 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } + if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { + char key[TSDB_FQDN_LEN + 64] = {0}; + char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet); + uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet); + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); + if (val != NULL && *val >= pTransInst->connLimitNum) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_MAX_SESSIONS; + } + } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -1989,4 +2422,3 @@ int64_t transAllocHandle() { return exh->refId; } -#endif diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index fa8929f7d9..04e094ae9a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -12,8 +12,6 @@ * along with this program. If not, see . */ -#ifdef USE_UV - #include "transComm.h" static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; @@ -246,11 +244,11 @@ static bool uvHandleReq(SSvrConn* pConn) { } } else { if (cost >= EXCEPTION_LIMIT_US) { - tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus, recv exception", + tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transMsg.code, (int)(cost)); } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus", + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transMsg.code, (int)(cost)); } @@ -1347,5 +1345,3 @@ _return2: } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } - -#endif diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index 01e88b9988..aaee162cd7 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -32,22 +32,21 @@ typedef struct { void *pRpc; } SInfo; - void initLogEnv() { - const char *logDir = "/tmp/trans_cli"; - const char* defaultLogFileNamePrefix = "taoslog"; + const char *logDir = "/tmp/trans_cli"; + const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10000; tsAsyncLog = 0; - //idxDebugFlag = 143; + // idxDebugFlag = 143; strcpy(tsLogDir, (char *)logDir); taosRemoveDir(tsLogDir); - taosMkDir(tsLogDir); - + taosMkDir(tsLogDir); + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { - printf("failed to open log file in directory:%s\n", tsLogDir); + printf("failed to open log file in directory:%s\n", tsLogDir); } } - + static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->info.ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, @@ -72,11 +71,12 @@ static void *sendRequest(void *param) { rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; rpcMsg.info.ahandle = pInfo; + rpcMsg.info.noResp = 1; rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - tsem_wait(&pInfo->rspSem); + // tsem_wait(&pInfo->rspSem); } tDebug("thread:%d, it is over", pInfo->index); @@ -112,7 +112,12 @@ int main(int argc, char *argv[]) { rpcInit.sessions = 100; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "michael"; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.connLimitNum = 10; + rpcInit.connLimitLock = 1; + rpcInit.batchSize = 16 * 1024; + rpcInit.supportBatch = 1; rpcDebugFlag = 135; for (int i = 1; i < argc; ++i) { @@ -148,7 +153,6 @@ int main(int argc, char *argv[]) { exit(0); } } - initLogEnv(); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 57b1998155..c07fa88af5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ee647500cf..16751423b1 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -301,7 +301,7 @@ ,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim ,,y,script,./test.sh -f tsim/vnode/replica3_vgroup.sim ,,y,script,./test.sh -f tsim/vnode/replica3_many.sim -,,y,script,./test.sh -f tsim/vnode/replica3_import.sim +#,,y,script,./test.sh -f tsim/vnode/replica3_import.sim ,,y,script,./test.sh -f tsim/vnode/stable_balance_replica1.sim ,,y,script,./test.sh -f tsim/vnode/stable_dnode2_stop.sim ,,y,script,./test.sh -f tsim/vnode/stable_dnode2.sim