diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index af82baa6d9..531be3ea62 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -166,7 +166,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else if (type == STREAM_INPUT__CHECKPOINT) { - taosArrayPush(pInfo->pBlockLists, input); + SPackedData tmp = {.pDataBlock = input}; + taosArrayPush(pInfo->pBlockLists, &tmp); pInfo->blockType = STREAM_INPUT__CHECKPOINT; } else { ASSERT(0); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f8133882c9..30e9d9e095 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2179,7 +2179,9 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); - SSDataBlock* pBlock = taosArrayGet(pInfo->pBlockLists, current); + SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); + SSDataBlock* pBlock = pData->pDataBlock; + if (pBlock->info.type == STREAM_CHECKPOINT) { streamScanOperatorSaveCheckpoint(pInfo); pAPI->stateStore.streamStateCommit(pInfo->pState); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d0694f72e7..1684227a81 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -235,14 +235,15 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); - taosArrayDestroy(pTask->checkReqIds); - pTask->checkReqIds = NULL; + pTask->checkReqIds =taosArrayDestroy(pTask->checkReqIds); } if (pTask->pState) { streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } + pTask->pRpcMsgList = taosArrayDestroy(pTask->pRpcMsgList); + if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); }