diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 34aa74454a..814c7c060c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -176,6 +176,7 @@ typedef struct SExecTaskInfo { int64_t owner; // if it is in execution int32_t code; + int64_t version; // used for stream to record wal version SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; STableListInfo tableqinfoList; // this is a table list diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 81eed0cc3a..56c438e4e6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1602,6 +1602,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG SSDataBlock* pBlock = pbInfo->pRes; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + // set output datablock version + pBlock->info.version = pTaskInfo->version; + blockDataCleanup(pBlock); if (!hasDataInGroupInfo(pGroupResInfo)) { return; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce423a141a..f3917dfeb1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1084,6 +1084,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.rows = pBlock->info.rows; pInfo->pRes->info.uid = pBlock->info.uid; pInfo->pRes->info.type = STREAM_NORMAL; + pInfo->pRes->info.version = pBlock->info.version; uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupIdPre) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index bc2c812e62..19abb88df7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1560,6 +1560,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } + if (pBlock->info.type == STREAM_NORMAL) { + //set input version + pTaskInfo->version = pBlock->info.version; + } + if (pInfo->scalarSupp.pExprInfo != NULL) { SExprSupp* pExprSup = &pInfo->scalarSupp; projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);