diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b22650d249..5e62f7f116 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -274,6 +274,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tagArray = taosArrayDestroy(tagArray); taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); crTblArray = NULL; + } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; } else { SSubmitTbData tbData = {0}; tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 400f7e5320..67c05d8b99 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -327,24 +327,24 @@ typedef struct SStreamScanInfo { SExprSupp tagCalSup; int32_t primaryTsIndex; // primary time stamp slot id SReadHandle readHandle; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SColMatchInfo matchInfo; - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - SSDataBlock* pUpdateRes; // update SSDataBlock + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + SSDataBlock* pUpdateRes; // update SSDataBlock int32_t updateResIndex; int32_t blockType; // current block type int32_t validBlockIndex; // Is current data has returned? uint64_t numOfExec; // execution times STqReader* tqReader; - uint64_t groupId; + uint64_t groupId; struct SUpdateInfo* pUpdateInfo; EStreamScanMode scanMode; - struct SOperatorInfo* pStreamScanOp; - struct SOperatorInfo* pTableScanOp; + struct SOperatorInfo* pStreamScanOp; + struct SOperatorInfo* pTableScanOp; SArray* childIds; SWindowSupporter windowSup; SPartitionBySupporter partitionSup; @@ -366,12 +366,13 @@ typedef struct SStreamScanInfo { int32_t blockRecoverTotCnt; SSDataBlock* pRecoverRes; - SSDataBlock* pCreateTbRes; - int8_t igCheckUpdate; - int8_t igExpired; - void* pState; //void + SSDataBlock* pCreateTbRes; + int8_t igCheckUpdate; + int8_t igExpired; + void* pState; // void SStoreTqReader readerFn; - SStateStore stateStore; + SStateStore stateStore; + SSDataBlock* pCheckpointRes; } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 30e9d9e095..f0d5268b59 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1769,7 +1769,7 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { void* pBuf = NULL; - int32_t len = streamScanOperatorEncode(pInfo, pBuf); + int32_t len = streamScanOperatorEncode(pInfo, &pBuf); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); } @@ -2188,7 +2188,7 @@ FETCH_NEXT_BLOCK: pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); } printDataBlock(pBlock, "stream scan ck"); - return pBlock; + return pInfo->pCheckpointRes; } return NULL; @@ -2356,6 +2356,8 @@ static void destroyStreamScanOperatorInfo(void* param) { blockDataDestroy(pStreamScan->pUpdateDataRes); blockDataDestroy(pStreamScan->pCreateTbRes); taosArrayDestroy(pStreamScan->pBlockLists); + blockDataDestroy(pStreamScan->pCheckpointRes); + taosMemoryFree(pStreamScan); } @@ -2573,6 +2575,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pState = NULL; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); // for stream if (pTaskInfo->streamInfo.pState) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cc3bcf514f..55f0920d2b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2828,7 +2828,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); pInfo->numOfDatapack = 0; - break; + continue; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -3953,7 +3953,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { pAggSup->stateStore.streamStateCommit(pAggSup->pState); setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); copyDataBlock(pInfo->pCheckpointRes, pBlock); - break; + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -4255,7 +4255,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pAggSup->stateStore.streamStateCommit(pAggSup->pState); setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); pOperator->status = OP_RES_TO_RETURN; - break; + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -5543,7 +5543,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { copyDataBlock(pInfo->pCheckpointRes, pBlock); qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); pInfo->numOfDatapack = 0; - break; + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); }