From 142bc9046d66d03f8ae6976ba33cf0548e7c28b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 May 2024 15:13:00 +0800 Subject: [PATCH 1/4] fix(stream): fix memory leak. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 047b169ec9..3c8d6838a3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -193,7 +193,7 @@ static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); } } else { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } return code; } From 0c61fb20d603fd81d3ff1d2c3e3594eeb7455f1b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 May 2024 17:22:29 +0800 Subject: [PATCH 2/4] fix(stream): check if outputQ is full before execute the scan history task. if it is full, idle for 5 sec. --- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- source/libs/stream/src/streamExec.c | 24 +++++++++++++++++------- source/libs/stream/src/streamQueue.c | 2 +- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 44fe81ac09..48549fce42 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1639,11 +1639,11 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2fMiB"; - sprintf(buf, sinkStr, pe->sinkDataSize); + snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } STR_TO_VARSTR(vbuf, buf); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3c8d6838a3..8ac187a2bb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -184,13 +184,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { +static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; if (taosArrayGetSize(pRes) > 0) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); code = doOutputResultBlockImpl(pTask, pStreamBlocks); - if (code != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { // should not have error code + stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); } } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); @@ -268,6 +268,17 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } + // output queue is full, idle for 5 sec. + if (streamQueueIsFull(pTask->outputq.queue)) { + stWarn("s-task:%s outputQ is full, idle for 5sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); + } + + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); + } + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -284,9 +295,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } // dispatch the generated results - /*int32_t code = */handleResultBlocks(pTask, pRes, size); - - int64_t el = taosGetTimestampMs() - st; + /*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size); // downstream task input queue is full, try in 5sec if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { @@ -297,6 +306,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } + int64_t el = taosGetTimestampMs() - st; if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); @@ -558,7 +568,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); - streamTaskSetIdleInfo(pTask, 500); + streamTaskSetIdleInfo(pTask, 1000); return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5596eb3dee..247baea16f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -381,7 +381,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc } } - return TSDB_CODE_SUCCESS; + return code; } int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, From eb1f09f47fd7e2d1d28711f70e1a53e22370709e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 May 2024 17:25:22 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ac187a2bb..8da737783c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -297,11 +297,6 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { // dispatch the generated results /*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size); - // downstream task input queue is full, try in 5sec - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { - return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); - } - if (finished) { return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } From 8c0a74e7b2890b010f4525954f579d788786c912 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 May 2024 09:19:09 +0800 Subject: [PATCH 4/4] fix(stream):adjust the idle time. --- source/libs/stream/src/streamExec.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8da737783c..934ff898a9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -270,8 +270,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { // output queue is full, idle for 5 sec. if (streamQueueIsFull(pTask->outputq.queue)) { - stWarn("s-task:%s outputQ is full, idle for 5sec and retry", id); - return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); + stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE); } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {