fix(stream): fix stack overflow, caused by print epset.

This commit is contained in:
Haojun Liao 2024-04-10 19:54:33 +08:00
parent e261023ee6
commit fabb986aa8
6 changed files with 43 additions and 25 deletions

View File

@ -29,21 +29,7 @@ typedef struct SCorEpSet {
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
#define EPSET_TO_STR(_eps, tbuf) \
do { \
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
if (_i == (_eps)->numOfEps - 1) { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} else { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} \
} \
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
} while (0);
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len);
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);

View File

@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
}
}
void epAssign(SEp* pDst, SEp* pSrc) {
if (pSrc == NULL || pDst == NULL) {
return;
@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) {
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
pDst->port = pSrc->port;
}
void epsetSort(SEpSet* pDst) {
if (pDst->numOfEps <= 1) {
return;
@ -127,6 +129,35 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) {
return ep;
}
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) {
int len = snprintf(pBuf, bufLen, "epset:{");
if (len < 0) {
return -1;
}
for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) {
int32_t ret = 0;
if (_i == pEpSet->numOfEps - 1) {
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
} else {
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
}
if (ret < 0) {
return -1;
}
len += ret;
}
if (len < bufLen) {
/*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse);
}
return TSDB_CODE_SUCCESS;
}
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return -1;

View File

@ -1747,7 +1747,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
EPSET_TO_STR(&pCurrent->epset, buf);
epsetToStr(&pCurrent->epset, buf, tListLen(buf));
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
@ -1898,7 +1899,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
taosArrayPush(plist, pEntry);
char buf[256] = {0};
EPSET_TO_STR(&pEntry->epset, buf);
epsetToStr(&pEntry->epset, buf, tListLen(buf));
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
}
taosHashCleanup(pHash);

View File

@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
}
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
entry.nodeId = SNODE_HANDLE;
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
}
char buf[256] = {0};
EPSET_TO_STR(&epset, buf);
epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);

View File

@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
epsetToStr(pEpSet, buf, tListLen(buf));
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
epsetToStr(pEpSet, buf, tListLen(buf));
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
epsetToStr(pEpSet, buf, tListLen(buf));
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;

View File

@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
pCtx->retryNextInterval);
return;
@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (hasEpSet) {
if (rpcDebugFlag & DEBUG_TRACE) {
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}