From 6fe2a9b10dff26679ebc6833ecc0d97c15c5fb28 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 4 Mar 2023 18:09:40 +0800 Subject: [PATCH 1/4] add debug --- source/libs/transport/src/transCli.c | 60 +++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 274d153adc..a2516f6ff7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -13,7 +13,6 @@ */ #include "transComm.h" -#include "tutil.h" typedef struct { int32_t numOfConn; @@ -121,6 +120,9 @@ typedef struct SCliThrd { SCliMsg* stopMsg; bool quit; + + int newConnCount; + SHashObj* msgCount; } SCliThrd; typedef struct SCliObj { @@ -423,6 +425,21 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } + // if (TMSG_INFO(pHead->msgType - 1) != 0) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pHead->msgType - 1)); + // int* count = taosHashGet(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1))); + // if (NULL == 0) { + // int localCount = 1; + // taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount, + // sizeof(localCount)); + // } else { + // int localCount = *count - 1; + // taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount, + // sizeof(localCount)); + // } + // } + STraceId* trace = &transMsg.info.traceId; tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); @@ -1098,6 +1115,19 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } + if (TMSG_INFO(pHead->msgType) != 0) { + char buf[128] = {0}; + sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); + int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); + if (NULL == 0) { + int localCount = 1; + taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + } else { + int localCount = *count + 1; + taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + } + } + tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); @@ -1173,6 +1203,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(pList->port); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); + pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1546,6 +1577,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(port); tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); + pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1735,6 +1767,17 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); + void* pIter = taosHashIterate(pThrd->msgCount, NULL); + while (pIter != NULL) { + int* count = pIter; + size_t len = 0; + char* key = taosHashGetKey(pIter, &len); + tDebug("key: %s count: %d", key, *count); + + pIter = taosHashIterate(pThrd->msgCount, pIter); + } + tDebug("conn count: %d", pThrd->newConnCount); + int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { cliNoBatchDealReq(&wq, pThrd); @@ -1969,6 +2012,9 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; + + pThrd->newConnCount = 0; + pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); return pThrd; } static void destroyThrdObj(SCliThrd* pThrd) { @@ -2316,6 +2362,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } + if ((pResp->msgType - 1) > 0) { + char buf[128] = {0}; + sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); + int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); + if (NULL == 0) { + int localCount = 0; + taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + } else { + int localCount = *count - 1; + taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + } + } if (pCtx->pSem != NULL) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { From 8f63de5a0ad4be615687544ea8cd5ca173d36b37 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 4 Mar 2023 18:30:53 +0800 Subject: [PATCH 2/4] add debug --- source/libs/transport/src/transCli.c | 66 ++++++++++++++-------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a2516f6ff7..9ed7953e90 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1115,18 +1115,18 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - if (TMSG_INFO(pHead->msgType) != 0) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); - int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); - if (NULL == 0) { - int localCount = 1; - taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count + 1; - taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - } - } + // if (pHead->msgType != 0) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); + // int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); + // if (NULL == 0) { + // int localCount = 1; + // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + //} else { + // int localCount = *count + 1; + // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + //} + //} tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); @@ -1767,15 +1767,15 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - void* pIter = taosHashIterate(pThrd->msgCount, NULL); - while (pIter != NULL) { - int* count = pIter; - size_t len = 0; - char* key = taosHashGetKey(pIter, &len); - tDebug("key: %s count: %d", key, *count); + // void* pIter = taosHashIterate(pThrd->msgCount, NULL); + // while (pIter != NULL) { + // int* count = pIter; + // size_t len = 0; + // char* key = taosHashGetKey(pIter, &len); + // tDebug("key: %s count: %d", key, *count); - pIter = taosHashIterate(pThrd->msgCount, pIter); - } + // pIter = taosHashIterate(pThrd->msgCount, pIter); + //} tDebug("conn count: %d", pThrd->newConnCount); int8_t supportBatch = pTransInst->supportBatch; @@ -2014,7 +2014,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->quit = false; pThrd->newConnCount = 0; - pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); return pThrd; } static void destroyThrdObj(SCliThrd* pThrd) { @@ -2362,18 +2362,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } - if ((pResp->msgType - 1) > 0) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); - int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); - if (NULL == 0) { - int localCount = 0; - taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count - 1; - taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - } - } + // if ((pResp->msgType - 1) > 0) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); + // int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); + // if (NULL == 0) { + // int localCount = 0; + // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + //} else { + // int localCount = *count - 1; + // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); + //} + //} if (pCtx->pSem != NULL) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { From b4e8130633710f11e261d73959409c6e0c154e6b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 4 Mar 2023 18:48:37 +0800 Subject: [PATCH 3/4] add debug --- source/libs/transport/src/transCli.c | 68 ++++++++++++++-------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9ed7953e90..e7da5b1c69 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1115,18 +1115,18 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - // if (pHead->msgType != 0) { - // char buf[128] = {0}; - // sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); - // int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); - // if (NULL == 0) { - // int localCount = 1; - // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - //} else { - // int localCount = *count + 1; - // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - //} - //} + if (pHead->msgType != 0) { + char buf[128] = {0}; + sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); + int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + if (NULL == 0) { + int localCount = 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } else { + int localCount = *count + 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } + } tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); @@ -1767,16 +1767,18 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - // void* pIter = taosHashIterate(pThrd->msgCount, NULL); - // while (pIter != NULL) { - // int* count = pIter; - // size_t len = 0; - // char* key = taosHashGetKey(pIter, &len); - // tDebug("key: %s count: %d", key, *count); + void* pIter = taosHashIterate(pThrd->msgCount, NULL); + while (pIter != NULL) { + int* count = pIter; + size_t len = 0; + char* key = taosHashGetKey(pIter, &len); + if (*count != 0) { + tDebug("key: %s count: %d", key, *count); + } - // pIter = taosHashIterate(pThrd->msgCount, pIter); - //} - tDebug("conn count: %d", pThrd->newConnCount); + pIter = taosHashIterate(pThrd->msgCount, pIter); + } + tDebug("all conn count: %d", pThrd->newConnCount); int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { @@ -2362,18 +2364,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } - // if ((pResp->msgType - 1) > 0) { - // char buf[128] = {0}; - // sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); - // int* count = taosHashGet(pThrd->msgCount, buf, strlen(buf)); - // if (NULL == 0) { - // int localCount = 0; - // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - //} else { - // int localCount = *count - 1; - // taosHashPut(pThrd->msgCount, buf, strlen(buf), &localCount, sizeof(localCount)); - //} - //} + if ((pResp->msgType - 1) > 0) { + char buf[128] = {0}; + sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); + int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + if (NULL == 0) { + int localCount = 0; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } else { + int localCount = *count - 1; + taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + } + } if (pCtx->pSem != NULL) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { From 566d233c1cbb14b8acc5fdea98f3f1edbebf75af Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 4 Mar 2023 19:28:03 +0800 Subject: [PATCH 4/4] add debug --- source/libs/transport/src/transCli.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e7da5b1c69..0774492dbb 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1115,7 +1115,8 @@ void cliSend(SCliConn* pConn) { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - if (pHead->msgType != 0) { + if ((pHead->msgType > TDMT_VND_TMQ_MSG && pHead->msgType < TDMT_VND_TMQ_MAX_MSG) || + (pHead->msgType > TDMT_MND_MSG && pHead->msgType < TDMT_MND_MAX_MSG)) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pHead->msgType)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); @@ -2364,7 +2365,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } - if ((pResp->msgType - 1) > 0) { + if ((pResp->msgType - 1 > TDMT_VND_TMQ_MSG && pResp->msgType - 1 < TDMT_VND_TMQ_MAX_MSG) || + (pResp->msgType - 1 > TDMT_MND_MSG && pResp->msgType - 1 < TDMT_MND_MAX_MSG)) { char buf[128] = {0}; sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));