diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 41bc873a3b..9a2439f0cf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -66,6 +66,15 @@ extern int32_t tMsgDict[]; typedef uint16_t tmsg_t; +static inline bool tmsgIsValid(tmsg_t type) { + if (type < TDMT_DND_MAX_MSG || type < TDMT_MND_MAX_MSG || type < TDMT_VND_MAX_MSG || type < TDMT_SCH_MAX_MSG || + type < TDMT_STREAM_MAX_MSG || type < TDMT_MON_MAX_MSG || type < TDMT_SYNC_MAX_MSG || type < TDMT_VND_STREAM_MSG || + type < TDMT_VND_TMQ_MSG || type < TDMT_VND_TMQ_MAX_MSG) { + return true; + } else { + return false; + } +} static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); @@ -1911,10 +1920,10 @@ typedef struct { } SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { - int64_t consumerId; - char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; - SArray* topicNames; // SArray + int64_t consumerId; + char cgroup[TSDB_CGROUP_LEN]; + char clientId[256]; + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2691,7 +2700,7 @@ typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t subType; int8_t withMeta; - char* qmsg; //SubPlanToString + char* qmsg; // SubPlanToString int64_t suid; } SMqRebVgReq; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 79c5baf43d..e3e20ee85d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -159,9 +159,9 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; - int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5); + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); - connLimitNum = TMIN(connLimitNum, 500); + connLimitNum = TMIN(connLimitNum, 1000); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4bb64e5fd6..0d03e1a11a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,7 +41,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; -int32_t tsNumOfRpcSessions = 6000; +int32_t tsNumOfRpcSessions = 10000; int32_t tsTimeToGetAvailableConn = 500000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0774492dbb..79afe95342 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -692,6 +692,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } list->numOfConn++; } + tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum); return NULL; } @@ -803,7 +804,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -877,7 +877,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - + conn->broken = true; QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); @@ -1115,19 +1115,18 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - if ((pHead->msgType > TDMT_VND_TMQ_MSG && pHead->msgType < TDMT_VND_TMQ_MAX_MSG) || - (pHead->msgType > TDMT_MND_MSG && pHead->msgType < TDMT_MND_MAX_MSG)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count + 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } + // if (tmsgIsValid(pHead->msgType)) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); + // int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + // if (NULL == 0) { + // int localCount = 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } else { + // int localCount = *count + 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } + // } tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); @@ -1262,7 +1261,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); - if (!uv_is_closing((uv_handle_t*)&conn->stream)) { + if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) { if (nxtBatch != NULL) { conn->pBatch = nxtBatch; cliSendBatch(conn); @@ -1523,6 +1522,18 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } + if (tmsgIsValid(pMsg->msg.msgType)) { + char buf[128] = {0}; + sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); + int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + if (NULL == 0) { + int localCount = 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } else { + int localCount = *count + 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } + } char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); @@ -2365,8 +2376,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } - if ((pResp->msgType - 1 > TDMT_VND_TMQ_MSG && pResp->msgType - 1 < TDMT_VND_TMQ_MAX_MSG) || - (pResp->msgType - 1 > TDMT_MND_MSG && pResp->msgType - 1 < TDMT_MND_MAX_MSG)) { + if (tmsgIsValid(pResp->msgType - 1)) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));