From fabb986aa8cff6a5cba7e3685c8269c02eeaf70a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 19:54:33 +0800 Subject: [PATCH] fix(stream): fix stack overflow, caused by print epset. --- include/common/tmisce.h | 16 +---------- source/common/src/tmisce.c | 31 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndStream.c | 5 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 6 ++-- source/libs/stream/src/streamTask.c | 6 ++-- source/libs/transport/src/transCli.c | 4 +-- 6 files changed, 43 insertions(+), 25 deletions(-) diff --git a/include/common/tmisce.h b/include/common/tmisce.h index afb33c733a..267ca814d4 100644 --- a/include/common/tmisce.h +++ b/include/common/tmisce.h @@ -29,21 +29,7 @@ typedef struct SCorEpSet { #define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse])) -#define EPSET_TO_STR(_eps, tbuf) \ - do { \ - int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \ - for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \ - if (_i == (_eps)->numOfEps - 1) { \ - len += \ - snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ - } else { \ - len += \ - snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ - } \ - } \ - len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \ - } while (0); - +int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len); int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 1606b45eed..8558ccb447 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) { tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn)); } } + void epAssign(SEp* pDst, SEp* pSrc) { if (pSrc == NULL || pDst == NULL) { return; @@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) { tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn)); pDst->port = pSrc->port; } + void epsetSort(SEpSet* pDst) { if (pDst->numOfEps <= 1) { return; @@ -127,6 +129,35 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) { return ep; } +int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) { + int len = snprintf(pBuf, bufLen, "epset:{"); + if (len < 0) { + return -1; + } + + for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) { + int32_t ret = 0; + + if (_i == pEpSet->numOfEps - 1) { + ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); + } else { + ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); + } + + if (ret < 0) { + return -1; + } + + len += ret; + } + + if (len < bufLen) { + /*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse); + } + + return TSDB_CODE_SUCCESS; +} + int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { SJson* pJson = tjsonCreateObject(); if (pJson == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ff05db417e..8f9afb2adc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1747,7 +1747,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); char buf[256] = {0}; - EPSET_TO_STR(&pCurrent->epset, buf); + epsetToStr(&pCurrent->epset, buf, tListLen(buf)); + mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); @@ -1898,7 +1899,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { taosArrayPush(plist, pEntry); char buf[256] = {0}; - EPSET_TO_STR(&pEntry->epset, buf); + epsetToStr(&pEntry->epset, buf, tListLen(buf)); mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf); } taosHashCleanup(pHash); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a124b4052c..d5bc12f9df 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); + epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); taosArrayPush(pVgroupListSnapshot, &entry); @@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { entry.nodeId = SNODE_HANDLE; char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); + epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); taosArrayPush(pVgroupListSnapshot, &entry); sdbRelease(pSdb, pObj); @@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } char buf[256] = {0}; - EPSET_TO_STR(&epset, buf); + epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 88c8c85dec..c34e162326 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp if (pTask->info.nodeId == nodeId) { // execution task should be moved away epsetAssign(&pTask->info.epSet, pEpSet); - EPSET_TO_STR(pEpSet, buf) + epsetToStr(pEpSet, buf, tListLen(buf)); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); } @@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; - EPSET_TO_STR(pEpSet, buf); + epsetToStr(pEpSet, buf, tListLen(buf)); int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { @@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; - EPSET_TO_STR(pEpSet, buf); + epsetToStr(pEpSet, buf, tListLen(buf)); int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 062609baac..79699a755a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { STransConnCtx* pCtx = pMsg->ctx; STraceId* trace = &pMsg->msg.info.traceId; char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); + epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; @@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (hasEpSet) { if (rpcDebugFlag & DEBUG_TRACE) { char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); + epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } }