From bf8b36901a0be5602481e22f1c148e9ceee629ec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 4 Mar 2023 23:37:50 +0800 Subject: [PATCH 1/7] add crash --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0774492dbb..7d648d69ac 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -877,7 +877,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - + conn->broken = true; QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); @@ -1262,7 +1262,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); - if (!uv_is_closing((uv_handle_t*)&conn->stream)) { + if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) { if (nxtBatch != NULL) { conn->pBatch = nxtBatch; cliSendBatch(conn); From fbad444981627d6079210c4b17ae433f9d4c1c92 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Mar 2023 10:52:16 +0800 Subject: [PATCH 2/7] change parameter --- source/client/src/clientEnv.c | 4 ++-- source/common/src/tglobal.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 79c5baf43d..e3e20ee85d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -159,9 +159,9 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; - int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5); + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); - connLimitNum = TMIN(connLimitNum, 500); + connLimitNum = TMIN(connLimitNum, 1000); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4bb64e5fd6..0d03e1a11a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,7 +41,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; -int32_t tsNumOfRpcSessions = 6000; +int32_t tsNumOfRpcSessions = 10000; int32_t tsTimeToGetAvailableConn = 500000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; From 167f4de9037241e57aea6b7243baed3b23cbb78b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Mar 2023 11:06:55 +0800 Subject: [PATCH 3/7] fix debug info --- source/libs/transport/src/transCli.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7d648d69ac..15fb20856c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1116,7 +1116,8 @@ void cliSend(SCliConn* pConn) { } if ((pHead->msgType > TDMT_VND_TMQ_MSG && pHead->msgType < TDMT_VND_TMQ_MAX_MSG) || - (pHead->msgType > TDMT_MND_MSG && pHead->msgType < TDMT_MND_MAX_MSG)) { + (pHead->msgType > TDMT_MND_MSG && pHead->msgType < TDMT_MND_MAX_MSG) || pHead->msgType == TDMT_VND_SUBMIT || + pHead->msgType == TDMT_MND_HEARTBEAT) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); @@ -2366,7 +2367,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } if ((pResp->msgType - 1 > TDMT_VND_TMQ_MSG && pResp->msgType - 1 < TDMT_VND_TMQ_MAX_MSG) || - (pResp->msgType - 1 > TDMT_MND_MSG && pResp->msgType - 1 < TDMT_MND_MAX_MSG)) { + (pResp->msgType - 1 > TDMT_MND_MSG && pResp->msgType - 1 < TDMT_MND_MAX_MSG) || + pResp->msgType - 1 == TDMT_VND_SUBMIT_RSP || pResp->msgType - 1 == TDMT_MND_HEARTBEAT_RSP) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); From 7986e1fd49edd9eac9c9676a917bb469ad7040ae Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Mar 2023 11:08:11 +0800 Subject: [PATCH 4/7] fix debug info --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 15fb20856c..99c9285018 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -692,6 +692,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } list->numOfConn++; } + tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum); return NULL; } From e6ec795b4d2c7ba10ed2ea2de1edf9bb74a2175e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Mar 2023 11:36:01 +0800 Subject: [PATCH 5/7] fix debug info --- include/common/tmsg.h | 19 ++++++++++++++----- source/libs/transport/src/transCli.c | 8 ++------ 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 41bc873a3b..9a2439f0cf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -66,6 +66,15 @@ extern int32_t tMsgDict[]; typedef uint16_t tmsg_t; +static inline bool tmsgIsValid(tmsg_t type) { + if (type < TDMT_DND_MAX_MSG || type < TDMT_MND_MAX_MSG || type < TDMT_VND_MAX_MSG || type < TDMT_SCH_MAX_MSG || + type < TDMT_STREAM_MAX_MSG || type < TDMT_MON_MAX_MSG || type < TDMT_SYNC_MAX_MSG || type < TDMT_VND_STREAM_MSG || + type < TDMT_VND_TMQ_MSG || type < TDMT_VND_TMQ_MAX_MSG) { + return true; + } else { + return false; + } +} static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); @@ -1911,10 +1920,10 @@ typedef struct { } SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { - int64_t consumerId; - char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; - SArray* topicNames; // SArray + int64_t consumerId; + char cgroup[TSDB_CGROUP_LEN]; + char clientId[256]; + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2691,7 +2700,7 @@ typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t subType; int8_t withMeta; - char* qmsg; //SubPlanToString + char* qmsg; // SubPlanToString int64_t suid; } SMqRebVgReq; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 99c9285018..c4f0b63ffc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1116,9 +1116,7 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - if ((pHead->msgType > TDMT_VND_TMQ_MSG && pHead->msgType < TDMT_VND_TMQ_MAX_MSG) || - (pHead->msgType > TDMT_MND_MSG && pHead->msgType < TDMT_MND_MAX_MSG) || pHead->msgType == TDMT_VND_SUBMIT || - pHead->msgType == TDMT_MND_HEARTBEAT) { + if (tmsgIsValid(pHead->msgType)) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); @@ -2367,9 +2365,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } - if ((pResp->msgType - 1 > TDMT_VND_TMQ_MSG && pResp->msgType - 1 < TDMT_VND_TMQ_MAX_MSG) || - (pResp->msgType - 1 > TDMT_MND_MSG && pResp->msgType - 1 < TDMT_MND_MAX_MSG) || - pResp->msgType - 1 == TDMT_VND_SUBMIT_RSP || pResp->msgType - 1 == TDMT_MND_HEARTBEAT_RSP) { + if (tmsgIsValid(pResp->msgType - 1)) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); From ee0bf9c501afb8421152dd41e94c02e9da1d6723 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Mar 2023 12:42:56 +0800 Subject: [PATCH 6/7] fix debug info --- source/libs/transport/src/transCli.c | 36 ++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c4f0b63ffc..b97597dc65 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1116,18 +1116,18 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - if (tmsgIsValid(pHead->msgType)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count + 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } + // if (tmsgIsValid(pHead->msgType)) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); + // int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + // if (NULL == 0) { + // int localCount = 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } else { + // int localCount = *count + 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } + // } tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); @@ -1523,6 +1523,18 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } + if (tmsgIsValid(pMsg->msg.msgType)) { + char buf[128] = {0}; + sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); + int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + if (NULL == 0) { + int localCount = 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } else { + int localCount = *count + 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } + } char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); From 42ba45e60d2aa7fa284ce5f2c5dd4c5718fbf0ab Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Mar 2023 13:25:00 +0800 Subject: [PATCH 7/7] fix debug info --- source/libs/transport/src/transCli.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b97597dc65..79afe95342 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -804,7 +804,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {