From cdfbbc298535858df1a8a65de45803cee3855b6e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 5 Mar 2024 13:28:34 +0800 Subject: [PATCH 1/3] fix(stream): set the dest epset from mnode, and set the retry error code for trans. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 16 ++++++++++++---- source/libs/stream/src/streamTask.c | 2 +- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 57fd187da3..5307ff4b05 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -111,7 +111,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); -int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); +int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3ef2f64df7..fca757006d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1811,7 +1811,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); - int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans); + int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 1ae85a2cc6..2b8fcee9fd 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -462,14 +462,22 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return TSDB_CODE_SUCCESS; } -static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { +static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { void *pBuf = NULL; int32_t len = 0; streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); - int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + terrno = code; + return code; + } + + code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); } @@ -478,14 +486,14 @@ static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroup } // build trans to update the epset -int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { +int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); taosWLockLatch(&pStream->lock); SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo); + int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo); if (code != TSDB_CODE_SUCCESS) { destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8be5b94096..6fd96f651e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -39,7 +39,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); } - // check for the dispath info and the upstream task info + // check for the dispatch info and the upstream task info int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); From 254afc070c8050cfd3fb5f8fddff4eff7841ffec Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 5 Mar 2024 15:20:11 +0800 Subject: [PATCH 2/3] fix: dynmaic config supportvnodes in ent --- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 28 +++++--------------------- 2 files changed, 6 insertions(+), 24 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 55df80ca44..fde8313228 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -586,7 +586,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSupportVnodes = tsNumOfCores * 2; tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2); - if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 79a5f5fd83..befb6d3521 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -610,7 +610,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { for(int32_t j = 0; j < tagSize; j++){ SJson* item = tjsonGetArrayItem(arrayTag, j); - *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); + *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); tjsonGetStringValue(item, "name", *(labels + j)); *(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN); @@ -626,7 +626,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { for(int32_t j = 0; j < metricLen; j++){ SJson *item = tjsonGetArrayItem(metrics, j); - char name[MONITOR_METRIC_NAME_LEN] = {0}; + char name[MONITOR_METRIC_NAME_LEN] = {0}; tjsonGetStringValue(item, "name", name); double value = 0; @@ -636,7 +636,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { tjsonGetDoubleValue(item, "type", &type); int32_t metricNameLen = strlen(name) + strlen(tableName) + 2; - char* metricName = taosMemoryMalloc(metricNameLen); + char* metricName = taosMemoryMalloc(metricNameLen); memset(metricName, 0, metricNameLen); sprintf(metricName, "%s:%s", tableName, name); @@ -669,7 +669,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { else{ mTrace("get metric from registry:%p", metric); } - + if(type == 0){ taos_counter_add(metric, value, (const char**)sample_labels); } @@ -689,7 +689,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { taosMemoryFreeClear(labels); } } - + } code = 0; @@ -1409,24 +1409,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { if (strcasecmp(cfgReq.config, "resetlog") == 0) { strcpy(dcfgReq.config, "resetlog"); #ifdef TD_ENTERPRISE - } else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) { - int32_t optLen = strlen("supportvnodes"); - int32_t flag = -1; - int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); - if (code < 0) return code; - - if (flag < 0 || flag > 4096) { - mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag); - terrno = TSDB_CODE_OUT_OF_RANGE; - goto _err_out; - } - if (flag == 0) { - flag = tsNumOfCores * 2; - } - flag = TMAX(flag, 2); - - strcpy(dcfgReq.config, "supportvnodes"); - snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) { int32_t optLen = strlen("s3blocksize"); int32_t flag = -1; From 45e779d464cf41fb60aa7e51bf940d6255c0171f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 5 Mar 2024 15:48:58 +0800 Subject: [PATCH 3/3] refactor: do some internal refactor. --- source/libs/stream/src/streamTask.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6fd96f651e..4dc01ee9b4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -622,6 +622,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; EPSET_TO_STR(pEpSet, buf); + int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -633,8 +634,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (pVgInfo->vgId == nodeId) { epsetAssign(&pVgInfo->epSet, pEpSet); - stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, - pVgInfo->taskId, nodeId, buf); + stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, + buf); break; } } @@ -642,8 +643,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; if (pDispatcher->nodeId == nodeId) { epsetAssign(&pDispatcher->epSet, pEpSet); - stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, - pDispatcher->taskId, nodeId, buf); + stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, + buf); } } else { // do nothing