fix(stream): set correct datablock.

This commit is contained in:
Haojun Liao 2023-07-10 10:42:38 +08:00
parent e1134423de
commit 5940bbfb33
3 changed files with 8 additions and 4 deletions

View File

@ -166,7 +166,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosArrayPush(pInfo->pBlockLists, input);
SPackedData tmp = {.pDataBlock = input};
taosArrayPush(pInfo->pBlockLists, &tmp);
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
} else {
ASSERT(0);

View File

@ -2179,7 +2179,9 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
SSDataBlock* pBlock = taosArrayGet(pInfo->pBlockLists, current);
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pData->pDataBlock;
if (pBlock->info.type == STREAM_CHECKPOINT) {
streamScanOperatorSaveCheckpoint(pInfo);
pAPI->stateStore.streamStateCommit(pInfo->pState);

View File

@ -235,14 +235,15 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
pTask->checkReqIds =taosArrayDestroy(pTask->checkReqIds);
}
if (pTask->pState) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
}
pTask->pRpcMsgList = taosArrayDestroy(pTask->pRpcMsgList);
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}