From 9a9a1828c123ab1d7c63ef36095f04f528dc4bd4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Nov 2024 13:47:20 +0800 Subject: [PATCH 1/6] refactor: display the time window for force_window_close. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 5 ++-- source/dnode/mnode/impl/src/mndStream.c | 10 +++++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 22 +++++++-------- source/libs/stream/src/streamHb.c | 30 ++++++++++++++------- 5 files changed, 43 insertions(+), 26 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index c9155f536c..b129cde72c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -178,7 +178,7 @@ int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); -int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows); +int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t nRows, int32_t p); int32_t mndProcessResetStatusReq(SRpcMsg *pReq); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e67e4a963b..f48e4531de 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -404,8 +404,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre return code; } - mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, - isHistoryTask); + mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, isHistoryTask); if (pStream->conf.fillHistory) { haltInitialTaskStatus(pTask, plan, isHistoryTask); @@ -461,7 +460,7 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream addNewTaskList(pStream); while (1) { - SVgObj* pVgroup; + SVgObj* pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) { break; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6336cd6e49..5d41e1506c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -21,7 +21,6 @@ #include "mndShow.h" #include "mndStb.h" #include "mndTrans.h" -#include "mndVgroup.h" #include "osMemory.h" #include "parser.h" #include "taoserror.h" @@ -1610,6 +1609,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } } + int32_t precision = TSDB_TIME_PRECISION_MILLI; + SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb); + if (pSourceDb != NULL) { + precision = pSourceDb->cfg.precision; + mndReleaseDb(pMnode, pSourceDb); + } + // add row for each task SStreamTaskIter *pIter = NULL; code = createStreamTaskIter(pStream, &pIter); @@ -1628,7 +1634,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock break; } - code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); + code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision); if (code == TSDB_CODE_SUCCESS) { numOfRows++; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f9b7644af4..ad2f71a064 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1043,7 +1043,7 @@ _end: return code; } -int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { +int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows, int32_t precision) { SColumnInfoData *pColInfo = NULL; int32_t cols = 0; int32_t code = 0; @@ -1103,14 +1103,11 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo // level char level[20 + VARSTR_HEADER_SIZE] = {0}; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - memcpy(varDataVal(level), "source", 6); - varDataSetLen(level, 6); + STR_WITH_SIZE_TO_VARSTR(level, "source", 6); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - memcpy(varDataVal(level), "agg", 3); - varDataSetLen(level, 3); + STR_WITH_SIZE_TO_VARSTR(level, "agg", 3); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - memcpy(varDataVal(level), "sink", 4); - varDataSetLen(level, 4); + STR_WITH_SIZE_TO_VARSTR(level, "sink", 4); } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1234,10 +1231,13 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2f MiB"; snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - // offset info - const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info + if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision); + } else { + const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; + snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + } } else { memset(buf, 0, tListLen(buf)); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 25cb28f77c..3a4e3eef89 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -75,6 +75,25 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { streamMutexUnlock(&pTask->lock); } +static void setProcessProgress(SStreamTask* pTask, STaskStatusEntry* pEntry) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + return; + } + + if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + pEntry->processedVer = pTask->status.latestForceWindow.skey; + } else { + if (pTask->exec.pWalReader != NULL) { + pEntry->processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1; + if (pEntry->processedVer < 0) { + pEntry->processedVer = pTask->chkInfo.processedVer; + } + + walReaderValidVersionRange(pTask->exec.pWalReader, &pEntry->verRange.minVer, &pEntry->verRange.maxVer); + } + } +} + static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) { int32_t code = 0; int32_t tlen = 0; @@ -209,16 +228,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } streamMutexUnlock(&pTask->lock); - if (pTask->exec.pWalReader != NULL) { - entry.processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1; - if (entry.processedVer < 0) { - entry.processedVer = pTask->chkInfo.processedVer; - } - - walReaderValidVersionRange(pTask->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); - } - + setProcessProgress(pTask, &entry); addUpdateNodeIntoHbMsg(pTask, pMsg); + p = taosArrayPush(pMsg->pTaskStatus, &entry); if (p == NULL) { stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId); From dee5017c5368de2c3212bd5ef7bc4192d33d9589 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Nov 2024 14:03:54 +0800 Subject: [PATCH 2/6] fix(stream): check return value. --- source/dnode/mnode/impl/src/mndScheduler.c | 51 +++++++++++++++++----- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index f48e4531de..e318529cf8 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -362,18 +362,32 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh return code; } -static void addNewTaskList(SStreamObj* pStream) { +static int32_t addNewTaskList(SStreamObj* pStream) { SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); + if (pTaskList == NULL) { + mError("failed init task list, code:%s", tstrerror(terrno)); + return terrno; + } + if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) { - mError("failed to put into array"); + mError("failed to put into array, code:%s", tstrerror(terrno)); + return terrno; } if (pStream->conf.fillHistory) { pTaskList = taosArrayInit(0, POINTER_BYTES); + if (pTaskList == NULL) { + mError("failed init task list, code:%s", tstrerror(terrno)); + return terrno; + } + if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) { - mError("failed to put into array"); + mError("failed to put into array, code:%s", tstrerror(terrno)); + return terrno; } } + + return TSDB_CODE_SUCCESS; } // set the history task id @@ -454,10 +468,11 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) { static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { void* pIter = NULL; - int32_t code = 0; SSdb* pSdb = pMnode->pSdb; - - addNewTaskList(pStream); + int32_t code = addNewTaskList(pStream); + if (code) { + return code; + } while (1) { SVgObj* pVgroup = NULL; @@ -570,8 +585,10 @@ END: } static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) { - int32_t code = 0; - addNewTaskList(pStream); + int32_t code = addNewTaskList(pStream); + if (code) { + return code; + } if (pStream->fixedSinkVgId == 0) { code = doAddShuffleSinkTask(pMnode, pStream, pEpset); @@ -676,8 +693,13 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan); + pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); + if (pStream->tasks == NULL || pStream->pHTasksList == NULL) { + mError("failed to create stream obj, code:%s", tstrerror(terrno)); + return terrno; + } if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { // add extra sink @@ -717,6 +739,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (terrno != 0) code = terrno; TAOS_RETURN(code); } + do { SArray** list = taosArrayGetLast(pStream->tasks); float size = (float)taosArrayGetSize(*list); @@ -724,7 +747,10 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (cnt <= 1) break; mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt); - addNewTaskList(pStream); + code = addNewTaskList(pStream); + if (code) { + return code; + } for (int j = 0; j < cnt; j++) { code = addAggTask(pStream, pMnode, plan, pEpset, false); @@ -750,7 +776,12 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* mDebug("doScheduleStream add final agg"); SArray** list = taosArrayGetLast(pStream->tasks); size_t size = taosArrayGetSize(*list); - addNewTaskList(pStream); + + code = addNewTaskList(pStream); + if (code) { + return code; + } + code = addAggTask(pStream, pMnode, plan, pEpset, true); if (code != TSDB_CODE_SUCCESS) { TAOS_RETURN(code); From 9220ec5a92ccb257ab5dac1557243dac72c0e4eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Nov 2024 14:18:40 +0800 Subject: [PATCH 3/6] refactor: update the default checkpoint interval. --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 95788a7ff0..47d754fd04 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -280,7 +280,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 60; +int32_t tsStreamCheckpointInterval = 300; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 20; int32_t tsMaxConcurrentCheckpoint = 1; From cfdba88c53fd42dce50000348f9ea55f9ada729d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Nov 2024 14:54:12 +0800 Subject: [PATCH 4/6] fix(stream): fix memory leak. --- source/libs/stream/src/streamCheckpoint.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9eab33cee8..d619351a93 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -972,7 +972,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } -static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) { +static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { const char* id = pTask->id.idStr; SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -984,7 +984,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA return code; } - code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + code = doFindNotSendUpstream(pTask, pList, ppNotSendList); if (code) { streamCleanBeforeQuitTmr(pTmrInfo, param); stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code)); @@ -992,7 +992,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA } // do send retrieve checkpoint trigger msg to upstream - code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList); if (code) { stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); code = 0; @@ -1064,7 +1064,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); - code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList); + code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { From 67d6458254ac4cf490e394808484c8d2eade31b6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 30 Nov 2024 00:34:13 +0800 Subject: [PATCH 5/6] fix: check return value. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index ad2f71a064..bb666eb6dd 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1233,7 +1233,11 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision); + int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision); + if (ret != 0) { + mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer); + memset(buf, 0, tListLen(buf)); + } } else { const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); From 8e319d777b3f9133824a848fed8f4982c5be9255 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 2 Dec 2024 20:02:55 +0800 Subject: [PATCH 6/6] test: update test cases. --- tests/pytest/util/dnodes.py | 4 ++-- tests/system-test/8-stream/checkpoint_info.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 29fb52e124..8d048825d0 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -39,7 +39,7 @@ class TDSimClient: "rpcDebugFlag": "135", "tmrDebugFlag": "131", "dDebugFlag":"131", - "cDebugFlag": "131", + "cDebugFlag": "135", "uDebugFlag": "131", "jniDebugFlag": "131", "qDebugFlag": "135", @@ -136,7 +136,7 @@ class TDDnode: "dDebugFlag": "131", "vDebugFlag": "131", "tqDebugFlag": "135", - "cDebugFlag": "131", + "cDebugFlag": "135", "stDebugFlag": "135", "smaDebugFlag": "131", "jniDebugFlag": "131", diff --git a/tests/system-test/8-stream/checkpoint_info.py b/tests/system-test/8-stream/checkpoint_info.py index 522017a702..6f39fa6fb1 100644 --- a/tests/system-test/8-stream/checkpoint_info.py +++ b/tests/system-test/8-stream/checkpoint_info.py @@ -21,6 +21,7 @@ from util.cluster import * import threading # should be used by -N option class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'checkpointinterval':60} #updatecfgDict = {'checkpointInterval': 60 ,} def init(self, conn, logSql, replicaVar=1): @@ -70,7 +71,7 @@ class TDTestCase: while(True): if(self.check_vnodestate()): break - sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type == "vnode"' + sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type = "vnode"' for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql): dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/" info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id)