From f6f979e6ea31c1308d4202dcbf87d683b3ceeac6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 16:00:52 +0800 Subject: [PATCH 1/3] fix(stream): fix memory leaks. --- source/libs/stream/src/streamExec.c | 93 ++++++++++++++++------------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 88e40b247b..e4dc0b5854 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,8 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, + int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState pState = streamTaskGetStatus(pTask); @@ -95,17 +96,50 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } +static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, + SArray* pRes) { + SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId}; + int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); + if (num != 1) { + stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); + return TSDB_CODE_INVALID_PARA; + } + + void* p = taosArrayGet(pRetrieveBlock->blocks, 0); + int32_t code = assignOneDataBlock(&block, p); + if (code) { + stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + p = taosArrayPush(pRes, &block); + if (p != NULL) { + (*pNumOfBlocks) += 1; + stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pRetrieveBlock->reqId); + } else { + code = terrno; + stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr, + pRetrieveBlock->reqId, tstrerror(code)); + } + + return code; +} + int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { - int32_t code = TSDB_CODE_SUCCESS; - void* pExecutor = pTask->exec.pExecutor; int32_t size = 0; int32_t numOfBlocks = 0; + int32_t code = TSDB_CODE_SUCCESS; + void* pExecutor = pTask->exec.pExecutor; SArray* pRes = NULL; *totalBlocks = 0; *totalSize = 0; while (1) { + SSDataBlock* output = NULL; + uint64_t ts = 0; + if (pRes == NULL) { pRes = taosArrayInit(4, sizeof(SSDataBlock)); } @@ -115,8 +149,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* return code; } - SSDataBlock* output = NULL; - uint64_t ts = 0; if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { resetTaskInfo(pExecutor); @@ -124,6 +156,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) { stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code)); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } else { qResetTaskCode(pExecutor); @@ -133,33 +166,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; - - int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); - if (num != 1) { - stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); - continue; - } - - code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); - if (code) { - stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); - continue; - } - - block.info.type = STREAM_PULL_OVER; - block.info.childId = pTask->info.selfChildId; - - void* p = taosArrayPush(pRes, &block); - if (p != NULL) { - numOfBlocks += 1; - } else { - stError("s-task:%s failed to add retrieve block", pTask->id.idStr); - } - - stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, - pTask->info.selfChildId, pRetrieveBlock->reqId); + code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes); + if (code) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return code; + } } break; @@ -174,26 +185,24 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* continue; // checkpoint block not dispatch to downstream tasks } - SSDataBlock block = {0}; + SSDataBlock block = {.info.childId = pTask->info.selfChildId}; code = assignOneDataBlock(&block, output); if (code) { stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); continue; } - block.info.childId = pTask->info.selfChildId; - size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); numOfBlocks += 1; void* p = taosArrayPush(pRes, &block); if (p == NULL) { stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr); + } else { + stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); } - stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); - // current output should be dispatched to down stream nodes if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); @@ -303,7 +312,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr); return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } @@ -408,7 +417,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { } } else { if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING || - status == TASK_STATUS__STOP)) { + status == TASK_STATUS__STOP)) { stError("s-task:%s invalid task status:%d", id, status); return TSDB_CODE_STREAM_INTERNAL_ERROR; } @@ -718,7 +727,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); - if(code) { + if (code) { stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code)); } @@ -833,7 +842,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (pState.state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue - } else { // todo refactor + } else { // todo refactor if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); } else { From 75a66459252287a3f00a45502c2a00cd76252fff Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 19:22:53 +0800 Subject: [PATCH 2/3] fix(stream): set the correct res block info. --- source/libs/stream/src/streamExec.c | 36 +++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e4dc0b5854..5300792338 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -98,7 +98,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, SArray* pRes) { - SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId}; + SSDataBlock block = {0}; int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); if (num != 1) { stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); @@ -112,6 +112,9 @@ static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, return code; } + block.info.type = STREAM_PULL_OVER; + block.info.childId = pTask->info.selfChildId; + p = taosArrayPush(pRes, &block); if (p != NULL) { (*pNumOfBlocks) += 1; @@ -171,6 +174,33 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } +// SSDataBlock block = {0}; +// const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; +// +// int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); +// if (num != 1) { +// stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); +// continue; +// } +// +// code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); +// if (code) { +// stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); +// continue; +// } +// +// block.info.type = STREAM_PULL_OVER; +// block.info.childId = pTask->info.selfChildId; +// +// void* p = taosArrayPush(pRes, &block); +// if (p != NULL) { +// numOfBlocks += 1; +// } else { +// stError("s-task:%s failed to add retrieve block", pTask->id.idStr); +// } +// +// stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, +// pTask->info.selfChildId, pRetrieveBlock->reqId); } break; @@ -185,13 +215,15 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* continue; // checkpoint block not dispatch to downstream tasks } - SSDataBlock block = {.info.childId = pTask->info.selfChildId}; + SSDataBlock block = {0}; code = assignOneDataBlock(&block, output); if (code) { stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); continue; } + block.info.childId = pTask->info.selfChildId; + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); numOfBlocks += 1; From 70c96783c01e13db7b1495357e5a6f73492cc1f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 19:23:24 +0800 Subject: [PATCH 3/3] refactor: remove unused code. --- source/libs/stream/src/streamExec.c | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5300792338..0eb87df9b0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -174,33 +174,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } -// SSDataBlock block = {0}; -// const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; -// -// int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); -// if (num != 1) { -// stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); -// continue; -// } -// -// code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); -// if (code) { -// stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); -// continue; -// } -// -// block.info.type = STREAM_PULL_OVER; -// block.info.childId = pTask->info.selfChildId; -// -// void* p = taosArrayPush(pRes, &block); -// if (p != NULL) { -// numOfBlocks += 1; -// } else { -// stError("s-task:%s failed to add retrieve block", pTask->id.idStr); -// } -// -// stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, -// pTask->info.selfChildId, pRetrieveBlock->reqId); } break;