From 68854bdea54953b0def0e4b7db550c001dee64fa Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 22 Jul 2022 17:39:59 +0800 Subject: [PATCH] enh(query): add wal version to agg result datablock TD-17552 --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 3 +++ source/libs/executor/src/scanoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 5 +++++ 4 files changed, 10 insertions(+) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 21068c68a4..9e85d0fef5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -175,6 +175,7 @@ typedef struct SExecTaskInfo { int64_t owner; // if it is in execution int32_t code; + int64_t version; // used for stream to record wal version SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; STableListInfo tableqinfoList; // this is a table list diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 44a78f2a0b..7cdfecd6b7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1602,6 +1602,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG SSDataBlock* pBlock = pbInfo->pRes; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + // set output datablock version + pBlock->info.version = pTaskInfo->version; + blockDataCleanup(pBlock); if (!hasDataInGroupInfo(pGroupResInfo)) { return; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fc29eed455..503826afd3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1176,6 +1176,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.rows = pBlock->info.rows; pInfo->pRes->info.uid = pBlock->info.uid; pInfo->pRes->info.type = STREAM_NORMAL; + pInfo->pRes->info.version = pBlock->info.version; uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupIdPre) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b5966fc463..66ff739c5a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1562,6 +1562,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } + if (pBlock->info.type == STREAM_NORMAL) { + //set input version + pTaskInfo->version = pBlock->info.version; + } + if (pInfo->scalarSupp.pExprInfo != NULL) { SExprSupp* pExprSup = &pInfo->scalarSupp; projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);