fix(stream): fix memory leaks.

This commit is contained in:
Haojun Liao 2024-09-27 16:00:52 +08:00
parent b1942889d8
commit f6f979e6ea
1 changed files with 51 additions and 42 deletions

View File

@ -24,7 +24,8 @@
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState pState = streamTaskGetStatus(pTask);
@ -95,17 +96,50 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code;
}
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
SArray* pRes) {
SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId};
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
return TSDB_CODE_INVALID_PARA;
}
void* p = taosArrayGet(pRetrieveBlock->blocks, 0);
int32_t code = assignOneDataBlock(&block, p);
if (code) {
stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
p = taosArrayPush(pRes, &block);
if (p != NULL) {
(*pNumOfBlocks) += 1;
stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pRetrieveBlock->reqId);
} else {
code = terrno;
stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
pRetrieveBlock->reqId, tstrerror(code));
}
return code;
}
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
int32_t size = 0;
int32_t numOfBlocks = 0;
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
SArray* pRes = NULL;
*totalBlocks = 0;
*totalSize = 0;
while (1) {
SSDataBlock* output = NULL;
uint64_t ts = 0;
if (pRes == NULL) {
pRes = taosArrayInit(4, sizeof(SSDataBlock));
}
@ -115,8 +149,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
return code;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
resetTaskInfo(pExecutor);
@ -124,6 +156,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code;
} else {
qResetTaskCode(pExecutor);
@ -133,33 +166,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
continue;
}
code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes);
if (code) {
stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code));
continue;
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code;
}
block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->info.selfChildId;
void* p = taosArrayPush(pRes, &block);
if (p != NULL) {
numOfBlocks += 1;
} else {
stError("s-task:%s failed to add retrieve block", pTask->id.idStr);
}
stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr,
pTask->info.selfChildId, pRetrieveBlock->reqId);
}
break;
@ -174,25 +185,23 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
continue; // checkpoint block not dispatch to downstream tasks
}
SSDataBlock block = {0};
SSDataBlock block = {.info.childId = pTask->info.selfChildId};
code = assignOneDataBlock(&block, output);
if (code) {
stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
continue;
}
block.info.childId = pTask->info.selfChildId;
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
void* p = taosArrayPush(pRes, &block);
if (p == NULL) {
stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
}
} else {
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
}
// current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
@ -303,7 +312,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
bool finished = false;
const char* id = pTask->id.idStr;
if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
@ -718,7 +727,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
if(code) {
if (code) {
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
}