From 5161eca70fd77425f316c302cebfe9fff42670fb Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 17 Nov 2022 09:48:34 +0800 Subject: [PATCH] fix(stream): stop scan when no output --- include/libs/stream/tstreamUpdate.h | 2 +- source/libs/executor/src/scanoperator.c | 4 +++- source/libs/stream/src/streamExec.c | 7 ++++++- source/libs/stream/src/streamUpdate.c | 7 ++++--- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 1c490852f9..ab328c6ad5 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -47,7 +47,7 @@ typedef struct SUpdateInfo { SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); -void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); +TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 229effc96b..05ccda26c6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1981,9 +1981,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pBlock != NULL) { calBlockTbName(pInfo, pBlock); if (pInfo->pUpdateInfo) { - updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); + TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } qDebug("stream recover scan get block, rows %d", pBlock->info.rows); + printDataBlock(pBlock, "scan recover"); return pBlock; } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e7f2b60704..009f7eec9a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -91,6 +91,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { void* exec = pTask->exec.executor; qSetStreamOpOpen(exec); + bool finished = false; while (1) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -106,7 +107,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (qExecTask(exec, &output, &ts) < 0) { ASSERT(0); } - if (output == NULL) break; + if (output == NULL) { + finished = true; + break; + } SSDataBlock block = {0}; assignOneDataBlock(&block, output); @@ -133,6 +137,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); } + if (finished) break; } return 0; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 199892c241..15526cd8bb 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -163,9 +163,9 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { return false; } -void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { - if (pBlock == NULL || pBlock->info.rows == 0) return; - TSKEY maxTs = -1; +TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { + if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; + TSKEY maxTs = INT64_MIN; int64_t tbUid = pBlock->info.uid; SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); @@ -186,6 +186,7 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr if (pMaxTs == NULL || *pMaxTs > maxTs) { taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); } + return maxTs; } bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {